Hydra integration

This example shows how to build Pipeline object with Hydra using building blocks from spdl.pipeline.defs module.

The definition of the pipeline is found in "hydra_integration.yaml" file.

# Note: `_convert_: "all"` is required when using a factory function
# such as Pipe, Aggregate and Disaggregate.

_convert_: all

pipeline_cfg:
  _convert_: all
  _target_: spdl.pipeline.defs.PipelineConfig
  src:
    _target_: spdl.pipeline.defs.SourceConfig
    source:
      # range(20)
      _target_: builtins.range
      _args_: [ 20 ]
  pipes:
    # Aggregate(3, drop_last=True)
    - _target_: spdl.pipeline.defs.Aggregate
      _args_: [ 3 ]
      drop_last: true
    # Pipe(sum)
    - _target_: spdl.pipeline.defs.Pipe
      _args_:
        - _target_: builtins.sum
          _partial_: true
  sink:
    _target_: spdl.pipeline.defs.SinkConfig
    buffer_size: 3

pipeline:
  _convert_: all
  _target_: spdl.pipeline.build_pipeline
  _args_: [ "${pipeline_cfg}" ]
  num_threads: 1

Source

Source

Click here to see the source.
 1# Copyright (c) Meta Platforms, Inc. and affiliates.
 2# All rights reserved.
 3#
 4# This source code is licensed under the BSD-style license found in the
 5# LICENSE file in the root directory of this source tree.
 6
 7# pyre-strict
 8
 9"""This example shows how to build :py:class:`~spdl.pipeline.Pipeline`
10object with Hydra using building blocks from :py:mod:`spdl.pipeline.defs`
11module.
12
13The definition of the pipeline is found in ``"hydra_integration.yaml"`` file.
14
15.. literalinclude:: ../../../examples/hydra_integration.yaml
16    :language: yaml
17
18"""
19
20__all__ = ["main"]
21
22import os
23
24import hydra
25from omegaconf import DictConfig
26
27os.environ["HYDRA_FULL_ERROR"] = "1"
28
29
30@hydra.main(version_base=None, config_path=".", config_name="hydra_integration")
31def main(cfg: DictConfig) -> None:
32    """The main entry point.
33
34    Args:
35        cfg: The configuration created from the ``"hydra_integration.yaml"`` file.
36    """
37    pipeline_cfg = hydra.utils.instantiate(cfg.pipeline_cfg)
38    print(pipeline_cfg)
39
40    pipeline = hydra.utils.instantiate(cfg.pipeline)
41    print(pipeline)
42
43    with pipeline.auto_stop():
44        for i, item in enumerate(pipeline.get_iterator(timeout=3)):
45            print(i, f"{item=}")
46
47
48if __name__ == "__main__":
49    main()

API Reference

Functions

main(cfg: DictConfig) None[source]

The main entry point.

Parameters:

cfg – The configuration created from the "hydra_integration.yaml" file.