Caching & Cluster Execution¶
neuralset pipelines involve expensive computation — loading neural recordings, running deep models on every stimulus, aligning timelines. The infra parameter adds disk caching and optional cluster dispatch to any pipeline component. Three infra types handle different granularities:
Backend — caches a whole Step result.
MapInfra — caches one result per item in a sequence.
TaskInfra — caches a full computation (training runs, benchmarks).
All three are pydantic models — pass a dict and pydantic instantiates the right type automatically. exca provides the underlying implementation.
Backend¶
Study, EventsTransform, and Chain are all
Steps — pydantic models with a _run() method and an
optional infra field. When infra is set, run() caches the
result of _run() to disk and can dispatch execution to a cluster.
import neuralset as ns
class Double(ns.Step):
x: int
def _run(self):
return self.x * 2
# No infra — runs every time
Double(x=5).run() # 10
# With caching — second call reads from disk
Double(x=5, infra={"backend": "Cached", "folder": "/tmp/demo"}).run()
# On a Slurm cluster
Double(x=5, infra={"backend": "Slurm", "folder": "/cache",
"gpus_per_node": 1}).run()
A Chain is itself a Step that sequences multiple steps —
each step’s output feeds the next. Each step can carry its own
infra; the chain resumes from the latest cached intermediate:
class AddOne(ns.Step):
def _run(self, value):
return value + 1
chain = ns.Chain(steps=[
Double(x=5, infra={"backend": "Cached", "folder": "/cache"}),
AddOne(infra={"backend": "Cached", "folder": "/cache"}),
])
chain.run() # Double is cached; if AddOne changes, only it re-runs
Setting infra on the chain itself makes the whole chain run as a
single Slurm job.
MapInfra¶
Extractors have a MapInfra on their infra
field. It wraps a method that processes a sequence of items,
caching each result independently. It supports batch dispatch
(processpool, Slurm) so items can be processed in parallel.
import exca
import typing as tp
class Squarer(ns.base._Module):
coeff: float = 1.0
infra: exca.MapInfra = exca.MapInfra()
@infra.apply(item_uid=str)
def process(self, items: tp.Sequence[int]) -> tp.Iterator[float]:
for item in items:
yield item ** 2 * self.coeff
# Local, no caching
list(Squarer().process([1, 2, 3])) # [1.0, 4.0, 9.0]
# With caching — each item cached individually
list(Squarer(infra={"folder": "/cache"}).process([1, 2, 3]))
# Dispatch to Slurm — items batched across jobs
list(Squarer(infra={"folder": "/cache", "cluster": "slurm",
"max_jobs": 64}).process(range(10000)))
TaskInfra¶
neuraltrain experiments use TaskInfra. It caches a complete
computation — unlike MapInfra it does not iterate over items. It
supports Slurm job arrays for parameter sweeps.
import exca
class Train(ns.base._Module):
lr: float = 1e-3
infra: exca.TaskInfra = exca.TaskInfra()
@infra.apply
def run(self):
return {"loss": 0.42 / self.lr}
# With caching
Train(lr=1e-3, infra={"folder": "/results"}).run()
# On Slurm
Train(lr=1e-3, infra={"folder": "/results", "cluster": "slurm"}).run()
In neuralset¶
Studies — Backend + MapInfra¶
Studies combine both types. infra (Backend)
caches the merged events DataFrame, while infra_timelines
(MapInfra) caches each timeline independently and defaults to
cluster="processpool" for parallel I/O.
Setting infra.folder automatically propagates to
infra_timelines.folder, so a single infra is usually enough:
import neuralset as ns
study = ns.Study(
name="Allen2022MassiveSample",
path=ns.CACHE_FOLDER,
infra={"backend": "Cached", "folder": "/cache"},
)
events = study.run() # first call: loads and caches
events = study.run() # subsequent: reads from cache
During development, disable parallel timeline loading to get clear
tracebacks instead of BrokenProcessPool errors:
study = ns.Study(
name="Allen2022MassiveSample",
path=ns.CACHE_FOLDER,
infra_timelines={"cluster": None},
)
Transforms and Chains — Backend¶
EventsTransforms enrich or filter the events
DataFrame — they are Steps too, and mostly used inside a Chain.
Some transforms are GPU-intensive: AddSummary loads a Llama model to
summarize texts.
Set infra on individual steps — cache the study locally and dispatch
AddSummary to a GPU node:
chain = ns.Chain(steps=[
{"name": "Allen2022MassiveSample", "path": str(ns.CACHE_FOLDER),
"infra": {"backend": "Cached", "folder": "/cache"}},
{"name": "AddSummary", "model_name": "meta-llama/Llama-3.2-3B-Instruct",
"infra": {"backend": "Slurm", "folder": "/cache",
"gpus_per_node": 1, "partition": "gpu"}},
{"name": "QueryEvents", "query": "timeline_index < 12"},
])
events = chain.run()
To cache and dispatch a whole chain at once, set infra on the
chain itself:
chain = ns.Chain(
steps=[
{"name": "Allen2022MassiveSample", "path": str(ns.CACHE_FOLDER),
"infra": {"backend": "Cached", "folder": "/cache"}},
{"name": "AddSummary", "model_name": "meta-llama/Llama-3.2-3B-Instruct"},
],
infra={"backend": "Slurm", "folder": "/cache",
"gpus_per_node": 1, "partition": "gpu"},
)
events = chain.run()
Extractors — MapInfra¶
Extractors use MapInfra to cache per event.
The prepare method precomputes results for all events in one
pass — Segmenter calls it automatically.
Local caching, then scaling to a cluster:
# Cache locally
# Requires: `pip install transformers` (or `pip install "neuralset[all]"`)
extractor = ns.extractors.HuggingFaceImage(
model_name="facebook/dinov2-small",
infra={"folder": "/cache"},
)
extractor.prepare(events) # computes & caches per event
vec = extractor(events, start=0, duration=1.0) # reads from cache
# Dispatch to Slurm — events batched across GPU workers
# Requires: `pip install transformers` (or `pip install "neuralset[all]"`)
extractor = ns.extractors.HuggingFaceText(
model_name="gpt2",
infra={"cluster": "slurm", "folder": "/cache",
"gpus_per_node": 1, "timeout_min": 120},
)
extractor.prepare(events) # submits jobs, waits for results
neuraltrain experiments¶
neuraltrain uses TaskInfra for full training runs.
BaseExperiment provides the base pattern:
from neuraltrain import utils
class MyExperiment(utils.BaseExperiment):
lr: float = 1e-3
# infra: TaskInfra inherited from BaseExperiment
@infra.apply
def run(self):
... # training loop
# Single run with caching
xp = MyExperiment(lr=1e-3, infra={"folder": "/results", "cluster": "slurm"})
xp.run()
# Parameter sweep — each config becomes one Slurm job
utils.run_grid(MyExperiment, "lr_sweep",
base_config={"infra": {"folder": "/results", "cluster": "slurm"}},
grid={"lr": [1e-4, 1e-3, 1e-2]})
infra.uid_folder() returns the cache directory for a specific
configuration — use it for saving checkpoints and artifacts.
Configuration reference¶
These fields are shared across all three infra types.
folderDirectory where cached results are stored.
None(default) disables caching.modeControls cache behavior:
"cached"(default) — use cache if available, compute otherwise."force"— recompute and overwrite."retry"— recompute only if previous run errored."read-only"— read from cache; error if missing.
keep_in_ramKeep results in memory after loading from disk. Useful for data accessed repeatedly; disable for large tensors.
Execution backends¶
Backend selects the execution target via the backend key:
"Cached"— local execution with disk caching."Slurm"— Slurm cluster (supportsgpus_per_node,partition,timeout_min, etc.)."SubmititDebug"— simulates Slurm locally (same pickling path, useful for debugging serialization issues).
MapInfra and TaskInfra use the cluster key instead:
"slurm", "processpool", "auto", or None (sequential).
Both accept Slurm parameters: slurm_partition, gpus_per_node,
timeout_min, cpus_per_task, slurm_constraint, etc.
Cache management¶
Backend (Steps / Chains):
step.has_cache() # True if result is cached
step.clear_cache() # remove cached result
TaskInfra:
xp.infra.uid_folder() # Path to this config's cache directory
xp.infra.iter_cached() # iterate over cached configs in the folder
MapInfra:
extractor.infra.uid_folder() # Path to the cache directory
extractor.infra.cache_dict # dict-like access to cached items
Set mode="force" to recompute, or mode="read-only" in
analysis scripts to prevent accidental recomputation.
Cache invalidation is automatic: changing any configuration field
that participates in the UID produces a new cache entry. Old
entries are not deleted — remove them manually or with clear_cache().
Quick reference¶
Component |
|
Typical configuration |
|---|---|---|
|
|
|
Study timelines |
|
|
|
|
|
|
|
|
|
|
|
neuraltrain experiment |
|
|
See also
exca documentation for the full API reference (all backend options, Slurm parameters, UID computation, custom cache types).