Hydra integration

This example shows how to build Pipeline object with Hydra using building blocks from spdl.pipeline.defs module.

The definition of the pipeline is found in "hydra_integration.yaml" file.

# Note: `_convert_: "all"` is required when using a factory function
# such as Pipe, Aggregate and Disaggregate.

_convert_: all

pipeline_cfg:
  _convert_: all
  _target_: spdl.pipeline.defs.PipelineConfig
  src:
    _target_: spdl.pipeline.defs.SourceConfig
    source:
      # range(20)
      _target_: builtins.range
      _args_: [ 20 ]
  pipes:
    # Aggregate(3, drop_last=True)
    - _target_: spdl.pipeline.defs.Aggregate
      _args_: [ 3 ]
      drop_last: true
    # Pipe(sum)
    - _target_: spdl.pipeline.defs.Pipe
      _args_:
        - _target_: builtins.sum
          _partial_: true
  sink:
    _target_: spdl.pipeline.defs.SinkConfig
    buffer_size: 3

pipeline:
  _convert_: all
  _target_: spdl.pipeline.build_pipeline
  _args_: [ "${pipeline_cfg}" ]
  num_threads: 1

Source

Source

Click here to see the source.
 1# Copyright (c) Meta Platforms, Inc. and affiliates.
 2# All rights reserved.
 3#
 4# This source code is licensed under the BSD-style license found in the
 5# LICENSE file in the root directory of this source tree.
 6
 7"""This example shows how to build :py:class:`~spdl.pipeline.Pipeline`
 8object with Hydra using building blocks from :py:mod:`spdl.pipeline.defs`
 9module.
10
11The definition of the pipeline is found in ``"hydra_integration.yaml"`` file.
12
13.. literalinclude:: ../../../examples/hydra_integration.yaml
14    :language: yaml
15
16"""
17
18__all__ = ["main"]
19
20import os
21
22import hydra
23from omegaconf import DictConfig
24
25os.environ["HYDRA_FULL_ERROR"] = "1"
26
27
28@hydra.main(version_base=None, config_path=".", config_name="hydra_integration")
29def main(cfg: DictConfig):
30    """The main entry point.
31
32    Args:
33        cfg: The configuration created from the ``"hydra_integration.yaml"`` file.
34    """
35    pipeline_cfg = hydra.utils.instantiate(cfg.pipeline_cfg)
36    print(pipeline_cfg)
37
38    pipeline = hydra.utils.instantiate(cfg.pipeline)
39    print(pipeline)
40
41    with pipeline.auto_stop():
42        for i, item in enumerate(pipeline.get_iterator(timeout=3)):
43            print(i, f"{item=}")
44
45
46if __name__ == "__main__":
47    main()

Functions

Functions

main(cfg: DictConfig)[source]

The main entry point.

Parameters:

cfg – The configuration created from the "hydra_integration.yaml" file.