Hydra integration¶
This example shows how to build Pipeline
object with Hydra using building blocks from spdl.pipeline.defs
module.
The definition of the pipeline is found in "hydra_integration.yaml"
file.
# Note: `_convert_: "all"` is required when using a factory function
# such as Pipe, Aggregate and Disaggregate.
_convert_: all
pipeline_cfg:
_convert_: all
_target_: spdl.pipeline.defs.PipelineConfig
src:
_target_: spdl.pipeline.defs.SourceConfig
source:
# range(20)
_target_: builtins.range
_args_: [ 20 ]
pipes:
# Aggregate(3, drop_last=True)
- _target_: spdl.pipeline.defs.Aggregate
_args_: [ 3 ]
drop_last: true
# Pipe(sum)
- _target_: spdl.pipeline.defs.Pipe
_args_:
- _target_: builtins.sum
_partial_: true
sink:
_target_: spdl.pipeline.defs.SinkConfig
buffer_size: 3
pipeline:
_convert_: all
_target_: spdl.pipeline.build_pipeline
_args_: [ "${pipeline_cfg}" ]
num_threads: 1
Source¶
Source
Click here to see the source.
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7"""This example shows how to build :py:class:`~spdl.pipeline.Pipeline`
8object with Hydra using building blocks from :py:mod:`spdl.pipeline.defs`
9module.
10
11The definition of the pipeline is found in ``"hydra_integration.yaml"`` file.
12
13.. literalinclude:: ../../../examples/hydra_integration.yaml
14 :language: yaml
15
16"""
17
18__all__ = ["main"]
19
20import os
21
22import hydra
23from omegaconf import DictConfig
24
25os.environ["HYDRA_FULL_ERROR"] = "1"
26
27
28@hydra.main(version_base=None, config_path=".", config_name="hydra_integration")
29def main(cfg: DictConfig):
30 """The main entry point.
31
32 Args:
33 cfg: The configuration created from the ``"hydra_integration.yaml"`` file.
34 """
35 pipeline_cfg = hydra.utils.instantiate(cfg.pipeline_cfg)
36 print(pipeline_cfg)
37
38 pipeline = hydra.utils.instantiate(cfg.pipeline)
39 print(pipeline)
40
41 with pipeline.auto_stop():
42 for i, item in enumerate(pipeline.get_iterator(timeout=3)):
43 print(i, f"{item=}")
44
45
46if __name__ == "__main__":
47 main()
Functions¶
Functions