spdl.pipeline.defs.Aggregator¶
- class Aggregator[source]¶
Abstract base class for custom aggregation operations.
When you pass an
Aggregatorinstance toAggregate(), the pipeline will:Call
accumulate()for each incoming itemCall
flush()when the stream ends (ifdrop_last=False)
The
drop_lastparameter controls whetherflush()is called:drop_last=False(default):flush()is called at EOF, allowing you to emit any remaining buffered itemsdrop_last=True:flush()is NOT called at EOF, effectively dropping incomplete batches
Example
from spdl.pipeline import PipelineBuilder from spdl.pipeline.defs import Aggregator class SizeBasedAggregator(Aggregator): '''Aggregate strings when total size exceeds threshold.''' def __init__(self, threshold: int = 10): self.threshold = threshold self.buffer: list[str] = [] self.size = 0 def accumulate(self, item: str) -> str | None: '''Add item to buffer and emit when threshold reached.''' self.buffer.append(item) self.size += len(item) if self.size >= self.threshold: result = "".join(self.buffer) self.buffer = [] self.size = 0 return result return None # Skip until threshold reached def flush(self) -> str | None: '''Emit remaining buffer when stream ends.''' if self.buffer: result = "".join(self.buffer) self.buffer = [] self.size = 0 return result return None # Nothing to emit # Use with drop_last=False (default) to emit remaining items pipeline = ( PipelineBuilder() .add_source(["a", "bb", "ccc", "dddd", "e", "ff"]) .aggregate(SizeBasedAggregator(threshold=10)) .build() ) # Use with drop_last=True to drop incomplete batches pipeline = ( PipelineBuilder() .add_source(["a", "bb", "ccc", "dddd", "e", "ff"]) .aggregate(SizeBasedAggregator(threshold=10), drop_last=True) .build() )
See also
Aggregate()Function to create aggregation configurations.
CollateThe aggregator used when calling
Aggregate()orspdl.pipeline.PipelineBuilder.aggregate()with an integer.
Methods
accumulate(item)Process an incoming item and optionally emit an aggregated result.
flush()Emit any remaining buffered items when the stream ends.
- abstract accumulate(item: Any) Any | None[source]¶
Process an incoming item and optionally emit an aggregated result.
This method is called for each item in the stream. It should add the item to an internal buffer and return an aggregated result when ready, or return
Noneto skip emission and continue buffering.- Parameters:
item – An item from the input stream to process.
- Returns:
The aggregated result to emit, or
Noneto skip emission.
- abstract flush() Any | None[source]¶
Emit any remaining buffered items when the stream ends.
This method is called when the stream ends (EOF reached) and
drop_last=False. It should emit any remaining buffered items that haven’t been emitted yet byaccumulate().- Returns:
The final aggregated result from remaining items, or
Noneif there’s nothing to emit.
Note
This method is ONLY called when
drop_last=False. Whendrop_last=True, this method is NOT called, and any remaining buffered items are effectively dropped.