Note
Go to the end to download the full example code.
Chains¶
A Chain sequences a Study with one or
more transforms into a reproducible, cacheable
pipeline.
See also
Caching & Cluster Execution for the full guide on disk caching and cluster execution.
Basic Chain¶
Each step’s output feeds into the next. The first step is typically
a Study; subsequent steps are transforms.
import neuralset as ns
from neuralset.events import transforms
chain = ns.Chain(
steps=[
ns.Study(name="Fake2025Meg", path=ns.CACHE_FOLDER),
transforms.QueryEvents(query="type in ['Word', 'Meg']"),
]
)
events = chain.run()
print(f"Chain: {len(events)} events, types: {events.type.unique().tolist()}")
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:00<00:00, 4.35it/s]
100%|██████████| 2/2 [00:00<00:00, 4.38it/s]
100%|██████████| 2/2 [00:00<00:00, 4.38it/s]
Chain: 578 events, types: ['Meg', 'Word']
Dict-based Configuration¶
Steps accept dicts that auto-dispatch to the right class. This makes chains easy to serialize to YAML or JSON:
chain = ns.Chain(
steps=[
{"name": "Fake2025Meg", "path": str(ns.CACHE_FOLDER)},
{"name": "QueryEvents", "query": "type in ['Word', 'Meg']"},
]
)
events = chain.run()
print(f"Dict chain: {len(events)} events")
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:00<00:00, 4.40it/s]
100%|██████████| 2/2 [00:00<00:00, 4.41it/s]
100%|██████████| 2/2 [00:00<00:00, 4.41it/s]
Dict chain: 578 events
Named Steps¶
Use a dict of dicts to give each step a name. This creates an
OrderedDict — useful for referencing individual steps later:
chain = ns.Chain(
steps={
"source": {"name": "Fake2025Meg", "path": str(ns.CACHE_FOLDER)},
"filter": {"name": "QueryEvents", "query": "type in ['Word', 'Meg']"},
}
)
events = chain.run()
print(f"Named steps: {list(chain.steps.keys())}, {len(events)} events")
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:00<00:00, 4.39it/s]
100%|██████████| 2/2 [00:00<00:00, 4.39it/s]
100%|██████████| 2/2 [00:00<00:00, 4.39it/s]
Named steps: ['source', 'filter'], 578 events
Caching¶
Put infra on expensive steps (studies, heavy transforms)
and skip it on cheap ones (QueryEvents):
chain = ns.Chain(steps=[
{"name": "MyStudy", "path": "/data",
"infra": {"backend": "Cached", "folder": "/cache"}},
{"name": "EnsureTexts", "punctuation": "spacy",
"infra": {"backend": "Cached", "folder": "/cache"}},
{"name": "QueryEvents", "query": "split == 'train'"},
])
Chain-level infra caches the final output (same as the last
step’s output) and sets the remote-compute scope — the whole
chain runs as one job:
chain = ns.Chain(
steps=[study, transform1, transform2],
infra={"backend": "Cached", "folder": "/cache"},
)
Cache modes (set via infra.mode):
Mode |
Behaviour |
|---|---|
|
Use cache if available, compute otherwise |
|
Only read from cache; error if not cached |
|
Recompute this step and all downstream steps |
|
Recompute only if previous run failed |
Backend Configuration¶
Use infra.cluster / infra.backend to run steps on different
backends:
# Local execution (default)
{"infra": {"backend": "Cached", "folder": "/cache"}}
# Slurm cluster (production)
{"infra": {"backend": "Slurm", "folder": "/cache",
"gpus_per_node": 1, "partition": "gpu"}}
# Debug mode (simulates Slurm inline)
{"infra": {"backend": "SubmititDebug", "folder": "/cache"}}
See Caching & Cluster Execution for the full list of backend options.
Cache Management¶
Steps with infra provide utilities for cache inspection:
chain.has_cache() # check if result is cached
chain.clear_cache() # remove cached result
To force recomputation, set infra.mode = "force" on the
relevant step.
Step Fields in Configs¶
A pydantic field typed ns.Step accepts any of the forms above:
a single dict (one step), a list (auto-creates a Chain), or a
dict of dicts (named steps). This is how typed configs work — you
don’t need to construct a Chain explicitly:
import pydantic
class Experiment(pydantic.BaseModel):
events_builder: ns.Step
# Single study — just a dict:
exp = Experiment(events_builder={"name": "MyStudy", "path": "/data"})
# Study + transforms — a list auto-converts to Chain:
exp = Experiment(events_builder=[
{"name": "MyStudy", "path": "/data"},
{"name": "QueryEvents", "query": "type == 'Word'"},
])
events = exp.events_builder.run()
See Anatomy of a Study for a runnable example.
Next Steps¶
Back to the beginning: Events
Full API reference: API Reference
Total running time of the script: (0 minutes 1.410 seconds)