spdl.pipeline.PipelineHook¶
- class PipelineHook[source]¶
Base class for hooks to be used in the pipeline.
PipelineHook
can add custom actions when executing pipeline. It is useful for logging and profiling the pipeline execution.A hook consists of two async context managers.
stage_hook
andtask_hook
.stage_hook
is executed once when the pipeline is initialized and finalized.task_hook
is executed for each task.The following diagram illustrates this.
flowchart TD subgraph TaskGroup[Tasks ] subgraph Task1[Task 1] direction TB s1["__aenter__() from task_hook()"] --> task1["task()"] task1 --> e1["__aexit__() from task_hook()"] e1 -.-> |"If task() succeeded, \nand __aexit__() did not fail"| q1["Result Queued"] end subgraph ... direction TB foo[...] end subgraph TaskN[Task N] direction TB sN["__aenter__() from task_hook()"] --> taskN["task()"] taskN --> eN["__aexit__() from task_hook()"] eN -.-> |"If task() succeeded, \nand __aexit__() did not fail"| qN["Result Queued"] end end StageStart["__aenter__() from stage_hook()"] --> TaskGroup TaskGroup --> StageComplete["__aexit__() from stage_hook()"]To add custom hook, subclass this class and override
task_hook
and optionallystage_hook
method, and pass an instance to methods such asspdl.pipeline.Pipeline.pipe()
.Tip
When implementing a hook, you can decide how to react to task/stage failure, by choosing the location of specific logics.
See
contextlib.contextmanager
for detail, andspdl.pipeline.TaskStatsHook
for an example implementation.@asynccontextmanager async def stage_hook(self): # Add initialization logic here ... try: yield # Add logic that should be executed only when stage succeeds ... except Exception as e: # Add logic to react to specific exceptions ... finally: # Add logic that should be executed even if stage fails ...
Important
When implementing a task hook, make sure that
StopAsyncIteration
exception is not absorbed. Otherwise, if pipe is given an async generator the pipeline might run forever.@asynccontextmanager async def stage_hook(self): # Add initialization logic here ... try: yield # Add logic that should be executed only when stage succeeds ... except StopAsyncIteration: # When passing async generator to `pipe`, StopAsyncIteration is raised # from inside and will be caught here. # Do no absort it and propagate it to the other. # Usually, you do not want to do anything here. raise except Exception as e: # Add logic to react to specific exceptions ... finally: # Add logic that should be executed even if stage fails ...
Methods
Perform custom action when the pipeline stage is initialized and completed.
Perform custom action before and after task is executed.
- stage_hook()[source]¶
Perform custom action when the pipeline stage is initialized and completed.
Important
This method has to be async context manager. So when overriding the method, make sure to use
async
keyword and@asynccontextmanager
decorator.@asynccontextmanager async def stage_hook(self): # Add custom logic here
Caution
If this hook raises an exception, the pipeline is aborted.
- abstract task_hook()[source]¶
Perform custom action before and after task is executed.
Important
This method has to be async context manager. So when overriding the method, make sure to use
async
keyword and@asynccontextmanager
decorator.@asynccontextmanager async def stask_hook(self): # Add custom logic here
Note
This method is called as part of the task. Even if this method raises an exception, the pipeline is not aborted. However, the data being processed is dropped.