spdl.pipeline.defs.Collate

class Collate(n: int)[source]

Aggregator that collects items into batches of a specified size.

This is a built-in Aggregator implementation that batches incoming items into lists of size n.

When used with Aggregate(), it will:

  1. Accumulate items until n items are collected

  2. Return the batch as a list and reset the internal buffer

  3. On flush (stream end), return any remaining items if drop_last=False

The following construction of pipe are all equivalent

  • AggregateConfig(op=Collate(N))

  • Aggregate(N)

  • PipelineBuilder.aggregate(N)

Parameters:

n – The number of items to collect before emitting a batch.

Example

from spdl.pipeline import PipelineBuilder
from spdl.pipeline.defs import Aggregate, Collate

pipeline = (
    PipelineBuilder()
    .add_source(range(10))
    .pipe(Aggregate(Collate(3)))
    .add_sink(3)
    .build(num_threads=1)
)

# Output: [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]

Methods

accumulate(item)

Add an item to the buffer and return a batch when full.

flush()

Return any remaining items in the buffer.

accumulate(item: T) list[T] | None[source]

Add an item to the buffer and return a batch when full.

Parameters:

item – The item to add to the current batch.

Returns:

A list of n items when the batch is full, or None if more items are needed to complete the batch.

flush() list[T] | None[source]

Return any remaining items in the buffer.

This method is called when the stream ends and drop_last=False.

Returns:

A list of remaining items if the buffer is non-empty, or None if the buffer is empty.