Note
Go to the end to download the full example code.
Transforms¶
An EventsTransform modifies the
events DataFrame — filtering rows, adding columns, splitting into
train/test, linking text context, etc. Transforms sit between a
Study (which produces events) and extractors
(which consume them).
Applying a Transform¶
Load events from a study, then apply a transform.
QueryEvents is the most common
one — it filters events after the study has been fully loaded and
cached. (Compare with Study(query=...) which filters timelines
before loading — coarser but avoids building unwanted timelines.)
Because it’s fast, QueryEvents itself doesn’t need caching:
import neuralset as ns
from neuralset.events import transforms
study = ns.Study(name="Fake2025Meg", path=ns.CACHE_FOLDER)
events = study.run()
print(f"Before: {len(events)} events, types: {events.type.unique().tolist()}")
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:00<00:00, 4.38it/s]
100%|██████████| 2/2 [00:00<00:00, 4.36it/s]
100%|██████████| 2/2 [00:00<00:00, 4.36it/s]
Before: 1588 events, types: ['Meg', 'Text', 'Audio', 'Sentence', 'Word', 'Stimulus', 'Image']
query = transforms.QueryEvents(query="type in ['Word', 'Meg']")
events = query(events)
print(f"After QueryEvents: {len(events)} events, types: {events.type.unique().tolist()}")
After QueryEvents: 578 events, types: ['Meg', 'Word']
In Practice: Chain¶
Applying transforms one-by-one is fine for exploration, but in real
configs you want a single object that bundles a study with its
transforms. Chain sequences a study and
transforms into a reproducible, cacheable pipeline:
chain = ns.Chain(
steps=[
{"name": "Fake2025Meg", "path": str(ns.CACHE_FOLDER)},
{"name": "QueryEvents", "query": "type in ['Word', 'Meg']"},
],
)
events = chain.run()
print(f"Chain: {len(events)} events, types: {events.type.unique().tolist()}")
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:00<00:00, 4.37it/s]
100%|██████████| 2/2 [00:00<00:00, 4.34it/s]
100%|██████████| 2/2 [00:00<00:00, 4.35it/s]
Chain: 578 events, types: ['Meg', 'Word']
The dict form ({"name": "QueryEvents", "query": "..."} ) is
equivalent to QueryEvents(query="...") — it dispatches by name.
This makes configs easy to serialize to YAML or JSON.
See also
Chains — full details on Chain composition, caching
placement, and how to configure pipelines effectively via Step
fields.
Available Transforms¶
Transform |
Purpose |
|---|---|
|
Filter events using a pandas query string |
|
Drop events with missing values in a given field |
|
Add a column with a default value, optionally update rows via query |
|
Select events by unique-value index in a column |
|
Cross-subject or cross-modality alignment |
|
Guarantee |
|
Match words to sentences, create |
|
Build a causal |
|
Full pipeline: sentence assignment + split + context |
|
Train/val/test splitting by sentence (prevents data leakage) |
|
K-fold splitting |
|
Split long media events into fixed-duration chunks |
|
Modify loading parameters for events with dynamic filepaths |
|
Extract audio tracks from Video events as separate Audio events |
Caching¶
Transforms support caching via the infra parameter. This avoids
recomputing expensive transforms (e.g. text processing) on every run:
transform = transforms.AddSentenceToWords(
infra={"backend": "Cached", "folder": "/cache"},
)
events = transform(events) # cached on subsequent calls
See exca for all caching options.
Advanced: Alignment and Chunking¶
Alignment —
Suppose three subjects each heard the word “cat” at different times.
You want to compare or average their brain responses to that word.
AlignEvents does this by
regrouping events: for each unique stimulus (here each word), it
builds a virtual timeline — as if all subjects had
been recorded in the same session — by gathering their neural
recordings together and time-shifting them so the stimulus starts
at t=0.
After alignment you can segment normally and the resulting dataset will have one segment per stimulus × subject, ready for cross-subject averaging or comparison.
transforms.AlignEvents(
trigger_type="Word",
trigger_field="text",
types_to_align="Meg",
)
Chunking —
ChunkEvents splits long media
events (Audio, Video) into shorter segments, both to avoid
out-of-memory errors and to prevent data leakage when extractors
(e.g. transformer models) see more context than the segment window:
transforms.ChunkEvents(
event_type_to_chunk="Audio",
max_duration=5.0, # seconds
min_duration=2.0, # avoid tiny leftover chunks
)
Create Your Own¶
Subclass EventsTransform and
implement _run(events) -> events:
import pandas as pd
class DropShortEvents(transforms.EventsTransform):
min_duration: float = 0.1
def _run(self, events: pd.DataFrame) -> pd.DataFrame:
return events.loc[events["duration"] >= self.min_duration]
demo_events = ns.events.standardize_events(
pd.DataFrame(
[
dict(type="Word", start=1.0, duration=0.3, text="hello", timeline="run-01"),
dict(type="Word", start=2.0, duration=0.05, text="x", timeline="run-01"),
dict(type="Action", start=3.0, duration=0.02, timeline="run-01"),
]
)
)
filtered = DropShortEvents(min_duration=0.1).run(demo_events)
print(f"{len(demo_events)} -> {len(filtered)} events")
3 -> 1 events
Next Steps¶
Extract features from events: Extractors
Full pipeline composition with Chain: Chains
Total running time of the script: (0 minutes 0.959 seconds)