spdl.pipeline.defs.Aggregate

Aggregate(input: int | Aggregator, /, *, drop_last: bool = False) AggregateConfig[Any][source]

Create a AggregateConfig object for aggregation.

The aggregation buffers the incoming items and emits once enough items are buffered.

Parameters:
  • input

    Either an integer specifying the number of items to buffer, or an Aggregator instance for custom aggregation logic.

    • If int: Buffers that many items before emitting as a list.

    • If Aggregator: Custom aggregation using the accumulate() and flush() methods. The Aggregator abstraction handles EOF automatically, so you don’t need to explicitly check for EOF markers in your implementation.

      The pipeline will call accumulate() for each item and flush() when the stream ends (if drop_last=False).

  • drop_last

    Drop the last aggregation if incomplete.

    • When drop_last=False (default):
      • For int input: Emits the last batch even if it has fewer items

      • For Aggregator input: Calls flush() at EOF to emit remaining buffered items

    • When drop_last=True:
      • For int input: Drops the last batch if it has fewer items

      • For Aggregator input: Does NOT call flush(), effectively dropping incomplete batches

Returns:

The config object.

Example

Simple batching (buffering N items):

from spdl.pipeline import PipelineBuilder

# Buffer 3 items at a time
pipeline = (
    PipelineBuilder()
    .add_source(range(10))
    .aggregate(3)
    .build()
)
# Produces: [0,1,2], [3,4,5], [6,7,8], [9]

# Drop last incomplete batch
pipeline = (
    PipelineBuilder()
    .add_source(range(10))
    .aggregate(3, drop_last=True)
    .build()
)
# Produces: [0,1,2], [3,4,5], [6,7,8]

Custom aggregation using Aggregator:

from spdl.pipeline import PipelineBuilder
from spdl.pipeline.defs import Aggregator

class SizeBasedAggregator(Aggregator):
    '''Aggregate strings when total size exceeds threshold.'''
    def __init__(self, threshold: int):
        self.threshold = threshold
        self.buffer: list[str] = []
        self.size = 0

    def accumulate(self, item: str) -> str | None:
        self.buffer.append(item)
        self.size += len(item)

        if self.size >= self.threshold:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

    def flush(self) -> str | None:
        if self.buffer:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

pipeline = (
    PipelineBuilder()
    .add_source(["a", "bb", "ccc", "dddd", "e", "ff"])
    .aggregate(SizeBasedAggregator(threshold=10))
    .build()
)

See also

Aggregator

Abstract base class for custom aggregation operations.

Collate

The aggregator used when the input is integer.

Example: Pipeline definitions

Illustrates how to build a complex pipeline.