Studies

A Study is an interface to an external dataset: it knows how to download data, iterate over recording sessions, and load events. Most built-in studies are registered through the neuralfetch package — no data is bundled, only the code that accesses it.

Note

The examples below use Fake2025Meg which includes audio events. If you haven’t already:

pip install 'neuralset[tutorials]'

What is a Study?

A Study encapsulates three responsibilities:

  1. Download raw data to a local path (download())

  2. Iterate timelines — each timeline is one recording session for one subject (iter_timelines())

  3. Load events for each timeline into a validated events DataFrame (run())

A timeline groups events that share a common time axis. Timelines are useful because they allow (1) loading data from simultaneous recordings (e.g. fMRI + EEG) and (2) keeping separate sessions (e.g. pre- and post-training).

Browsing the Catalog

Study.catalog() returns all registered studies (from neuralfetch and any other installed package):

import neuralset as ns

all_studies = ns.Study.catalog()
print(f"{len(all_studies)} studies available")
19 studies available

Loading a Study

Instantiate a Study by name. Study(name="X", ...) automatically finds and returns the concrete subclass X — it is equivalent to X(...) directly. Available studies are registered through the neuralfetch package — see the neuralfetch documentation for the full catalog.

Call run() to load all timelines and concatenate them into a single validated events DataFrame. The subject, timeline, and study columns are added automatically.

# ns.CACHE_FOLDER defaults to ~/.cache/neuralset/
study = ns.Study(name="Fake2025Meg", path=ns.CACHE_FOLDER)
events = study.run()

print(f"{len(events)} events across {events.subject.nunique()} subjects")
print(events[["type", "start", "duration", "timeline"]].head(8).to_string())
  0%|          | 0/2 [00:00<?, ?it/s]
 50%|█████     | 1/2 [00:00<00:00,  4.00it/s]
100%|██████████| 2/2 [00:00<00:00,  4.16it/s]
100%|██████████| 2/2 [00:00<00:00,  4.13it/s]
1588 events across 2 subjects
       type      start    duration                     timeline
0       Meg  42.955971  277.715346  Fake2025Meg:subject=sample0
1      Text  46.578924  210.637433  Fake2025Meg:subject=sample0
2     Audio  46.578924  210.637375  Fake2025Meg:subject=sample0
3  Sentence  46.578924    2.264551  Fake2025Meg:subject=sample0
4      Word  46.578924    0.200000  Fake2025Meg:subject=sample0
5  Stimulus  46.578924    0.006660  Fake2025Meg:subject=sample0
6      Word  47.191629    0.200000  Fake2025Meg:subject=sample0
7     Image  47.191629    0.200000  Fake2025Meg:subject=sample0

Inspecting Timelines

Before loading data, study_summary() shows one row per timeline with metadata columns:

summary = study.study_summary(apply_query=False)
print(summary[["subject", "timeline"]].to_string())
               subject                     timeline
0  Fake2025Meg/sample0  Fake2025Meg:subject=sample0
1  Fake2025Meg/sample1  Fake2025Meg:subject=sample1

Querying Timelines

Use the query parameter to filter which timelines get loaded. The query string is evaluated by query_with_index(), which auto-generates virtual index columns such as timeline_index and subject_timeline_index so you can slice by position:

study_q = ns.Study(
    name="Fake2025Meg",
    path=ns.CACHE_FOLDER,
    query="timeline_index < 1",
)
events_q = study_q.run()
print(f"With query: {events_q.timeline.nunique()} timelines, {len(events_q)} events")
With query: 1 timelines, 794 events

Downloading Data

For real studies, call study.download() before run() to fetch the raw data to the study path. This is a one-time operation:

study = ns.Study(name="MyStudy2025", path="/data")
study.download()        # fetches data to path
events = study.run()    # loads and validates events

Many curated studies handle downloading from OpenNeuro automatically.

Tip

path can be a shared parent folder — each study resolves its own subfolder automatically. This lets all studies share one path, so you only need to configure it once.

Caching

Study loading can be cached via the infra parameter, avoiding repeated I/O and validation on subsequent runs:

study = ns.Study(
    name="Fake2025Meg",
    path=ns.CACHE_FOLDER,
    infra={"backend": "Cached", "folder": "/cache"},
)
events = study.run()  # first call: loads and caches
events = study.run()  # subsequent calls: reads from cache

Cache modes (set via infra.mode):

Mode

Behaviour

"cached" (default)

Use cache if available, compute otherwise

"force"

Recompute this step and all downstream steps

"retry"

Recompute only if previous run failed

"read-only"

Only read from cache; error if not cached

See Caching & Cluster Execution for all backend options and cluster configuration.

Tip

By default, Study loads timelines in parallel (infra_timelines=MapInfra(cluster="processpool")). This is fast but makes exceptions hard to read (they appear as BrokenProcessPool). During development or debugging, disable parallelism with infra_timelines={"cluster": None} to get clear tracebacks.

Create Your Own (optional)

To create a custom study, subclass Study and implement iter_timelines() and _load_timeline_events(). If your data uses non-standard event types, define them as Event subclasses — they are automatically registered and can carry any typed fields you need:

import typing as tp

import pandas as pd

from neuralset.events import etypes


class FaceStimulus(etypes.Event):
    """Custom event for face stimuli."""

    identity: str = ""
    expression: str = "neutral"


class FaceStudy(ns.Study):
    """A minimal study generating synthetic face events."""

    def model_post_init(self, log__: tp.Any) -> None:
        super().model_post_init(log__)
        # deactivate multiprocessing (inline classes can't be pickled by workers)
        self.infra_timelines.cluster = None

    def iter_timelines(self) -> tp.Iterator[dict[str, tp.Any]]:
        for subject in ["alice", "bob"]:
            yield dict(subject=subject)

    def _load_timeline_events(self, timeline: dict[str, tp.Any]) -> pd.DataFrame:
        rows = [
            dict(
                type="FaceStimulus",
                start=float(i),
                duration=2.0,
                identity=f"face_{i:03d}",
                expression=expr,
            )
            for i, expr in enumerate(["happy", "neutral", "sad"])
        ]
        return pd.DataFrame(rows)

The same run() interface works for custom studies:

import tempfile
from pathlib import Path

custom_study = FaceStudy(path=Path(tempfile.mkdtemp()))
custom_events = custom_study.run()

print(custom_events[["type", "start", "identity", "expression", "timeline"]].to_string())
  0%|          | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 183.35it/s]
           type  start  identity expression                 timeline
0  FaceStimulus    0.0  face_000      happy  FaceStudy:subject=alice
1  FaceStimulus    1.0  face_001    neutral  FaceStudy:subject=alice
2  FaceStimulus    2.0  face_002        sad  FaceStudy:subject=alice
3  FaceStimulus    0.0  face_000      happy    FaceStudy:subject=bob
4  FaceStimulus    1.0  face_001    neutral    FaceStudy:subject=bob
5  FaceStimulus    2.0  face_002        sad    FaceStudy:subject=bob

For a full worked example with real MEG data, channel positions, and neuralfetch integration, see the neuralfetch extending tutorial.

Next Steps

Total running time of the script: (0 minutes 0.831 seconds)

Gallery generated by Sphinx-Gallery