spdl.pipeline.defs.Collate¶
- class Collate(n: int)[source]¶
Aggregator that collects items into batches of a specified size.
This is a built-in
Aggregatorimplementation that batches incoming items into lists of sizen.When used with
Aggregate(), it will:Accumulate items until
nitems are collectedReturn the batch as a list and reset the internal buffer
On flush (stream end), return any remaining items if
drop_last=False
The following construction of pipe are all equivalent
AggregateConfig(op=Collate(N))Aggregate(N)PipelineBuilder.aggregate(N)
- Parameters:
n – The number of items to collect before emitting a batch.
Example
from spdl.pipeline import PipelineBuilder from spdl.pipeline.defs import Aggregate, Collate pipeline = ( PipelineBuilder() .add_source(range(10)) .pipe(Aggregate(Collate(3))) .add_sink(3) .build(num_threads=1) ) # Output: [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]
Methods
accumulate(item)Add an item to the buffer and return a batch when full.
flush()Return any remaining items in the buffer.