spdl.pipeline.PipelineHook

class PipelineHook[source]

Base class for hooks to be used in the pipeline.

PipelineHook can add custom actions when executing pipeline. It is useful for logging and profiling the pipeline execution.

A hook consists of two async context managers. stage_hook and task_hook.

stage_hook is executed once when the pipeline is initialized and finalized. task_hook is executed for each task.

The following diagram illustrates this.

flowchart TD subgraph TaskGroup[Tasks ] subgraph Task1[Task 1] direction TB s1["__aenter__() from task_hook()"] --> task1["task()"] task1 --> e1["__aexit__() from task_hook()"] e1 -.-> |"If task() succeeded, \nand __aexit__() did not fail"| q1["Result Queued"] end subgraph ... direction TB foo[...] end subgraph TaskN[Task N] direction TB sN["__aenter__() from task_hook()"] --> taskN["task()"] taskN --> eN["__aexit__() from task_hook()"] eN -.-> |"If task() succeeded, \nand __aexit__() did not fail"| qN["Result Queued"] end end StageStart["__aenter__() from stage_hook()"] --> TaskGroup TaskGroup --> StageComplete["__aexit__() from stage_hook()"]

To add custom hook, subclass this class and override task_hook and optionally stage_hook method, and pass an instance to methods such as spdl.pipeline.Pipeline.pipe().

Tip

When implementing a hook, you can decide how to react to task/stage failure, by choosing the location of specific logics.

See contextlib.contextmanager for detail, and spdl.pipeline.TaskStatsHook for an example implementation.

@asynccontextmanager
async def stage_hook(self):
    # Add initialization logic here
    ...

    try:
        yield
        # Add logic that should be executed only when stage succeeds
        ...
    except Exception as e:
        # Add logic to react to specific exceptions
        ...
    finally:
        # Add logic that should be executed even if stage fails
        ...

Important

When implementing a task hook, make sure that StopAsyncIteration exception is not absorbed. Otherwise, if pipe is given an async generator the pipeline might run forever.

@asynccontextmanager
async def stage_hook(self):
    # Add initialization logic here
    ...

    try:
        yield
        # Add logic that should be executed only when stage succeeds
        ...
    except StopAsyncIteration:
        # When passing async generator to `pipe`, StopAsyncIteration is raised
        # from inside and will be caught here.
        # Do no absort it and propagate it to the other.
        # Usually, you do not want to do anything here.
        raise
    except Exception as e:
        # Add logic to react to specific exceptions
        ...
    finally:
        # Add logic that should be executed even if stage fails
        ...

Methods

stage_hook()

Perform custom action when the pipeline stage is initialized and completed.

task_hook()

Perform custom action before and after task is executed.

stage_hook()[source]

Perform custom action when the pipeline stage is initialized and completed.

Important

This method has to be async context manager. So when overriding the method, make sure to use async keyword and @asynccontextmanager decorator.

@asynccontextmanager
async def stage_hook(self):
    # Add custom logic here

Caution

If this hook raises an exception, the pipeline is aborted.

abstract task_hook()[source]

Perform custom action before and after task is executed.

Important

This method has to be async context manager. So when overriding the method, make sure to use async keyword and @asynccontextmanager decorator.

@asynccontextmanager
async def stask_hook(self):
    # Add custom logic here

Note

This method is called as part of the task. Even if this method raises an exception, the pipeline is not aborted. However, the data being processed is dropped.