spdl.pipeline.PipelineBuilder¶
- class PipelineBuilder[source]¶
Build
Pipeline
object.See
Pipeline
for details.Methods
add_sink
([buffer_size, queue_class])Attach a buffer to the end of the pipeline.
add_source
(source, *[, queue_class])Attach an iterator to the source buffer.
aggregate
(num_items, /, *[, drop_last, ...])Buffer the items in the pipeline.
build
(*, num_threads[, report_stats_interval])Build the pipeline.
disaggregate
(*[, hooks, queue_class])Disaggregate the items in the pipeline.
pipe
(op, /, *[, concurrency, executor, ...])Apply an operation to items in the pipeline.
- add_sink(buffer_size: int = 3, queue_class: type[AsyncQueue[U]] | None = None) PipelineBuilder[T, U] [source]¶
Attach a buffer to the end of the pipeline.
│ ┌▼┐ │ │ buffer └─┘
- Parameters:
buffer_size – The size of the buffer. Pass
0
for unlimited buffering.queue_class – A queue class, used to connect this stage and the next stage. Must be a subclassing type (not an instance) of
AsyncQueue
. Default:StatsQueue
.
- add_source(source: Iterable[T] | AsyncIterable[T], *, queue_class: type[AsyncQueue[T]] | None = None) PipelineBuilder[T, U] [source]¶
Attach an iterator to the source buffer.
┌─────────────────┐ │ (Async)Iterator │ └───────┬─────────┘ ▼
- Parameters:
source –
A lightweight iterator that generates data.
Warning
The source iterator must be lightweight as it is executed in async event loop. If the iterator performs a blocking operation, the entire pipeline will be blocked.
queue_class – A queue class, used to connect this stage and the next stage. Must be a subclassing type (not an instance) of
AsyncQueue
. Default:StatsQueue
.
- aggregate(num_items: int, /, *, drop_last: bool = False, hooks: list[PipelineHook] | None = None, queue_class: type[AsyncQueue[T]] | None = None) PipelineBuilder[T, U] [source]¶
Buffer the items in the pipeline.
- Parameters:
num_items – The number of items to buffer.
drop_last – Drop the last aggregation if it has less than
num_aggregate
items.hooks – See
pipe()
.queue_class – A queue class, used to connect this stage and the next stage. Must be a subclassing type (not an instance) of
AsyncQueue
. Default:StatsQueue
.
- build(*, num_threads: int, report_stats_interval: float = -1) Pipeline[U] [source]¶
Build the pipeline.
- Parameters:
num_threads – The number of threads in the thread pool attached to async event loop.
report_stats_interval –
When provided, report the pipline performance stats every given interval. Unit: [sec]
This is only effective if there is no custom hook or custom AsyncQueue provided for stages. The argument is passed to
TaskStatsHook
andStatusQueue
.If a custom stage hook is provided and stats report is needed, you can instantiate
TaskStatsHook
and include it in the hooks provided toPipelineBuilder.pipe()
.Similarly if you are providing a custom
AsyncQueue
class, you need to implement the same logic by your self.
- disaggregate(*, hooks: list[PipelineHook] | None = None, queue_class: type[AsyncQueue[T_]] | None = None) PipelineBuilder[T, U] [source]¶
Disaggregate the items in the pipeline.
- Parameters:
hooks – See
pipe()
.queue_class – A queue class, used to connect this stage and the next stage. Must be a subclassing type (not an instance) of
AsyncQueue
. Default:StatsQueue
.
- pipe(op: Callable[[T_], U_] | Callable[[T_], Iterable[U_]] | Callable[[T_], Awaitable[U_]] | Callable[[T_], AsyncIterable[U_]], /, *, concurrency: int = 1, executor: Executor | None = None, name: str | None = None, hooks: list[PipelineHook] | None = None, output_order: str = 'completion', queue_class: type[AsyncQueue[U_]] | None = None) PipelineBuilder[T, U] [source]¶
Apply an operation to items in the pipeline.
│ ┌─────▼─────┐ │ Operation │ └─────┬─────┘ ▼
- Parameters:
op –
A function applied to items in the queue. The function must take exactly one argument, which is the output from the upstream. If passing around multiple objects, take them as a tuple or use
dataclass
and define a custom protocol.Optionally, the op can be a generator function, async function or async generator function.
If
op
is (async) generator, the items yielded are put in the output queue separately.Warning
If
op
is synchronous geneartor, andexecutor
is an instance ofconcurrent.futures.ProcessPoolExecutor
, the output items are not put in the output queue until the generator is exhausted.Async generator, or synchronous generator without
ProcessPoolExecutor
does not have this issue, and the yielded items are put in the output queue immediately.Tip
When passing an async op, make sure that the op does not call sync function inside. If calling a sync function, use
asyncio.loop.run_in_executor()
orasyncio.to_thread()
to delegate the execution to the thread pool.concurrency – The maximum number of async tasks executed concurrently.
executor –
A custom executor object to be used to convert the synchronous operation into asynchronous one. If
None
, the default executor is used.It is invalid to provide this argument when the given op is already async.
name – The name (prefix) to give to the task.
hooks – Hook objects to be attached to the stage. Hooks are intended for collecting stats of the stage. If
None
, a default hook,TaskStatsHook
is used.output_order – If
"completion"
(default), the items are put to output queue in the order their process is completed. If"input"
, then the items are put to output queue in the order given in the input queue.queue_class – A queue class, used to connect this stage and the next stage. Must be a subclassing type (not an instance) of
AsyncQueue
. Default:StatsQueue
.