spdl.pipeline.defs.Pipe¶
- Pipe(op: Callable[[T], U] | Callable[[T], Iterable[U]] | Callable[[T], Awaitable[U]] | Callable[[T], AsyncIterable[U]] | SupportsGetItem[T, U], /, *, concurrency: int = 1, executor: Executor | None = None, name: str | None = None, output_order: str = 'completion') PipeConfig[T, U] [source]¶
Create a
PipeConfig
.A pipe applys a function or mapping to the inocming item.
- Parameters:
op –
A function, callable or container with
__getitem__
method (such as dict, list and tuple). If it’s function or callable, it is inovked with the input from the input queue. If it’s container type, the input is passed to__getitem__
method.The function or callable must take exactly one argument, which is the output from the upstream. If passing around multiple objects, take them as a tuple or use
dataclass
and define a custom protocol.If the result of applying
op
to an input item isNone
, the pipeline skips absorb the result and it won’t be propagated to the downstream stages.Optionally, the op can be a generator function, async function or async generator function.
If
op
is (async) generator, the items yielded are put in the output queue separately.Warning
If
op
is synchronous geneartor, andexecutor
is an instance ofconcurrent.futures.ProcessPoolExecutor
, the output items are not put in the output queue until the generator is exhausted.Async generator, or synchronous generator without
ProcessPoolExecutor
does not have this issue, and the yielded items are put in the output queue immediately.Tip
When passing an async op, make sure that the op does not call sync function inside. If calling a sync function, use
asyncio.loop.run_in_executor()
orasyncio.to_thread()
to delegate the execution to the thread pool.concurrency – The maximum number of async tasks executed concurrently.
executor –
A custom executor object to be used to convert the synchronous operation into asynchronous one. If
None
, the default executor is used.It is invalid to provide this argument when the given op is already async.
name – The name (prefix) to give to the task.
output_order – If
"completion"
(default), the items are put to output queue in the order their process is completed. If"input"
, then the items are put to output queue in the order given in the input queue.
- Returns:
The config object.