spdl.pipeline.defs.Aggregate¶
- Aggregate(input: int | Aggregator, /, *, drop_last: bool = False) AggregateConfig[Any][source]¶
Create a
AggregateConfigobject for aggregation.The aggregation buffers the incoming items and emits once enough items are buffered.
- Parameters:
input –
Either an integer specifying the number of items to buffer, or an
Aggregatorinstance for custom aggregation logic.If
int: Buffers that many items before emitting as a list.If
Aggregator: Custom aggregation using theaccumulate()andflush()methods. TheAggregatorabstraction handles EOF automatically, so you don’t need to explicitly check for EOF markers in your implementation.The pipeline will call
accumulate()for each item andflush()when the stream ends (ifdrop_last=False).
drop_last –
Drop the last aggregation if incomplete.
- When
drop_last=False(default): For
intinput: Emits the last batch even if it has fewer itemsFor
Aggregatorinput: Callsflush()at EOF to emit remaining buffered items
- When
- When
drop_last=True: For
intinput: Drops the last batch if it has fewer itemsFor
Aggregatorinput: Does NOT callflush(), effectively dropping incomplete batches
- When
- Returns:
The config object.
Example
Simple batching (buffering N items):
from spdl.pipeline import PipelineBuilder # Buffer 3 items at a time pipeline = ( PipelineBuilder() .add_source(range(10)) .aggregate(3) .build() ) # Produces: [0,1,2], [3,4,5], [6,7,8], [9] # Drop last incomplete batch pipeline = ( PipelineBuilder() .add_source(range(10)) .aggregate(3, drop_last=True) .build() ) # Produces: [0,1,2], [3,4,5], [6,7,8]
Custom aggregation using
Aggregator:from spdl.pipeline import PipelineBuilder from spdl.pipeline.defs import Aggregator class SizeBasedAggregator(Aggregator): '''Aggregate strings when total size exceeds threshold.''' def __init__(self, threshold: int): self.threshold = threshold self.buffer: list[str] = [] self.size = 0 def accumulate(self, item: str) -> str | None: self.buffer.append(item) self.size += len(item) if self.size >= self.threshold: result = "".join(self.buffer) self.buffer, self.size = [], 0 return result return None def flush(self) -> str | None: if self.buffer: result = "".join(self.buffer) self.buffer, self.size = [], 0 return result return None pipeline = ( PipelineBuilder() .add_source(["a", "bb", "ccc", "dddd", "e", "ff"]) .aggregate(SizeBasedAggregator(threshold=10)) .build() )
See also
AggregatorAbstract base class for custom aggregation operations.
CollateThe aggregator used when the input is integer.
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.