Data Pipeline

The Data stage is where neuraltrain connects to neuralset. You start from a study chain plus a Segmenter, then produce train/val/test PyTorch loaders for the rest of the training pipeline.

This tutorial covers:

  • what the Data config is responsible for

  • the study chain Segmenter DataLoader flow

  • customising split policies and extractor layouts

  • worker and infrastructure settings

See also

Project Example – full example project

The Data config

The example project defines a Data pydantic model with four fields:

  • study: a neuralset Step (usually a chain of study + split)

  • segmenter: a ns.dataloader.Segmenter with extractors and the segment window

  • batch_size and num_workers

class Data(pydantic.BaseModel):
    study: ns.Step
    segmenter: ns.dataloader.Segmenter
    batch_size: int = 64
    num_workers: int = 0

Its build() method runs the study chain, applies the segmenter, prepares extractors, and wraps each split into a DataLoader:

def build(self) -> dict[str, DataLoader]:
    events = self.study.run()
    dataset = self.segmenter.apply(events)
    dataset.prepare()
    loaders = {}
    for split, shuffle in [("train", True), ("val", False), ("test", False)]:
        ds = dataset.select(dataset.triggers["split"] == split)
        loaders[split] = DataLoader(ds, collate_fn=ds.collate_fn, ...)
    return loaders

The default study chain

In the default config, the study is a two-step chain:

  1. Mne2013Sample – loads the MNE sample dataset

  2. SklearnSplit – adds a split column via stratified splitting

Let’s look at how this is configured:

default_data_config = {
    "study": [
        {
            "name": "Mne2013Sample",
            "path": "/tmp/data/mne2013sample",
            "query": None,
        },
        {
            "name": "SklearnSplit",
            "valid_split_ratio": 0.2,
            "test_split_ratio": 0.2,
            "valid_random_state": 87,
            "split_by": "_index",
        },
    ],
    "segmenter": {
        "extractors": {
            "input": {
                "name": "MegExtractor",
                "frequency": 120.0,
                "filter": (0.5, 25.0),
                "baseline": (0.0, 0.1),
                "scaler": "RobustScaler",
                "clamp": 16.0,
            },
            "target": {
                "name": "EventField",
                "event_types": "Stimulus",
                "event_field": "code",
            },
        },
        "trigger_query": "type == 'Stimulus'",
        "start": -0.1,
        "duration": 0.5,
    },
    "batch_size": 16,
}

for section, value in default_data_config.items():
    print(f"{section}: {value}\n")
study: [{'name': 'Mne2013Sample', 'path': '/tmp/data/mne2013sample', 'query': None}, {'name': 'SklearnSplit', 'valid_split_ratio': 0.2, 'test_split_ratio': 0.2, 'valid_random_state': 87, 'split_by': '_index'}]

segmenter: {'extractors': {'input': {'name': 'MegExtractor', 'frequency': 120.0, 'filter': (0.5, 25.0), 'baseline': (0.0, 0.1), 'scaler': 'RobustScaler', 'clamp': 16.0}, 'target': {'name': 'EventField', 'event_types': 'Stimulus', 'event_field': 'code'}}, 'trigger_query': "type == 'Stimulus'", 'start': -0.1, 'duration': 0.5}

batch_size: 16

The Segmenter

The Segmenter is the neuralset object that turns an events DataFrame into a SegmentDataset. It holds extractors, a trigger query, and the segment time window. Let’s instantiate one:

import neuralset as ns

segmenter = ns.dataloader.Segmenter(
    extractors={
        "input": {
            "name": "MegExtractor",
            "frequency": 120.0,
            "filter": (0.5, 25.0),
            "baseline": (0.0, 0.1),
            "scaler": "RobustScaler",
            "clamp": 16.0,
        },
        "target": {
            "name": "EventField",
            "event_types": "Stimulus",
            "event_field": "code",
        },
    },
    trigger_query="type == 'Stimulus'",
    start=-0.1,
    duration=0.5,
)

print(segmenter)
start=-0.1 duration=0.5 trigger_query="type == 'Stimulus'" stride=None stride_drop_incomplete=True extractors={'input': MegExtractor(**
{ 'aggregation': 'single',
  'allow_maxshield': False,
  'allow_missing': False,
  'apply_hilbert': False,
  'apply_proj': False,
  'baseline': (0.0, 0.1),
  'bipolar_ref': None,
  'channel_order': 'unique',
  'clamp': 16.0,
  'drop_bads': False,
  'event_types': 'Meg',
  'fill_non_finite': None,
  'filter': (0.5, 25.0),
  'frequency': 120.0,
  'infra': { 'cluster': None,
             'conda_env': None,
             'cpus_per_task': 10,
             'folder': None,
             'forbid_single_item_computation': False,
             'gpus_per_node': None,
             'job_name': None,
             'keep_in_ram': True,
             'logs': '{folder}/logs/{user}/%j',
             'max_jobs': 128,
             'max_pickle_size_gb': None,
             'mem_gb': None,
             'min_samples_per_job': 1,
             'mode': 'cached',
             'nodes': 1,
             'permissions': 511,
             'slurm_account': None,
             'slurm_additional_parameters': None,
             'slurm_constraint': None,
             'slurm_partition': None,
             'slurm_qos': None,
             'slurm_use_srun': False,
             'tasks_per_node': 1,
             'timeout_min': 120,
             'version': '1',
             'workdir': None},
  'mne_cpus': -1,
  'name': 'MegExtractor',
  'notch_filter': None,
  'offset': 0.0,
  'picks': ('meg',),
  'scale_factor': None,
  'scaler': 'RobustScaler'}
), 'target': EventField(**
{ 'aggregation': 'single',
  'allow_missing': False,
  'event_field': 'code',
  'event_types': 'Stimulus',
  'frequency': 0.0,
  'name': 'EventField'}
)} padding=None drop_incomplete=False drop_unused_events=True

Total running time of the script: (0 minutes 0.003 seconds)

Gallery generated by Sphinx-Gallery