spdl.pipeline.defs.Aggregator

class Aggregator[source]

Abstract base class for custom aggregation operations.

When you pass an Aggregator instance to Aggregate(), the pipeline will:

  1. Call accumulate() for each incoming item

  2. Call flush() when the stream ends (if drop_last=False)

The drop_last parameter controls whether flush() is called:

  • drop_last=False (default): flush() is called at EOF, allowing you to emit any remaining buffered items

  • drop_last=True: flush() is NOT called at EOF, effectively dropping incomplete batches

Example

from spdl.pipeline import PipelineBuilder
from spdl.pipeline.defs import Aggregator

class SizeBasedAggregator(Aggregator):
    '''Aggregate strings when total size exceeds threshold.'''
    def __init__(self, threshold: int = 10):
        self.threshold = threshold
        self.buffer: list[str] = []
        self.size = 0

    def accumulate(self, item: str) -> str | None:
        '''Add item to buffer and emit when threshold reached.'''
        self.buffer.append(item)
        self.size += len(item)

        if self.size >= self.threshold:
            result = "".join(self.buffer)
            self.buffer = []
            self.size = 0
            return result
        return None  # Skip until threshold reached

    def flush(self) -> str | None:
        '''Emit remaining buffer when stream ends.'''
        if self.buffer:
            result = "".join(self.buffer)
            self.buffer = []
            self.size = 0
            return result
        return None  # Nothing to emit

# Use with drop_last=False (default) to emit remaining items
pipeline = (
    PipelineBuilder()
    .add_source(["a", "bb", "ccc", "dddd", "e", "ff"])
    .aggregate(SizeBasedAggregator(threshold=10))
    .build()
)

# Use with drop_last=True to drop incomplete batches
pipeline = (
    PipelineBuilder()
    .add_source(["a", "bb", "ccc", "dddd", "e", "ff"])
    .aggregate(SizeBasedAggregator(threshold=10), drop_last=True)
    .build()
)

See also

Aggregate()

Function to create aggregation configurations.

Collate

The aggregator used when calling Aggregate() or spdl.pipeline.PipelineBuilder.aggregate() with an integer.

Methods

accumulate(item)

Process an incoming item and optionally emit an aggregated result.

flush()

Emit any remaining buffered items when the stream ends.

abstract accumulate(item: Any) Any | None[source]

Process an incoming item and optionally emit an aggregated result.

This method is called for each item in the stream. It should add the item to an internal buffer and return an aggregated result when ready, or return None to skip emission and continue buffering.

Parameters:

item – An item from the input stream to process.

Returns:

The aggregated result to emit, or None to skip emission.

abstract flush() Any | None[source]

Emit any remaining buffered items when the stream ends.

This method is called when the stream ends (EOF reached) and drop_last=False. It should emit any remaining buffered items that haven’t been emitted yet by accumulate().

Returns:

The final aggregated result from remaining items, or None if there’s nothing to emit.

Note

This method is ONLY called when drop_last=False. When drop_last=True, this method is NOT called, and any remaining buffered items are effectively dropped.