Hydra integration¶
This example shows how to build Pipeline
object with Hydra using building blocks from spdl.pipeline.defs
module.
The definition of the pipeline is found in "hydra_integration.yaml" file.
# Note: `_convert_: "all"` is required when using a factory function
# such as Pipe, Aggregate and Disaggregate.
_convert_: all
pipeline_cfg:
_convert_: all
_target_: spdl.pipeline.defs.PipelineConfig
src:
_target_: spdl.pipeline.defs.SourceConfig
source:
# range(20)
_target_: builtins.range
_args_: [ 20 ]
pipes:
# Aggregate(3, drop_last=True)
- _target_: spdl.pipeline.defs.Aggregate
_args_: [ 3 ]
drop_last: true
# Pipe(sum)
- _target_: spdl.pipeline.defs.Pipe
_args_:
- _target_: builtins.sum
_partial_: true
sink:
_target_: spdl.pipeline.defs.SinkConfig
buffer_size: 3
pipeline:
_convert_: all
_target_: spdl.pipeline.build_pipeline
_args_: [ "${pipeline_cfg}" ]
num_threads: 1
Source¶
Source
Click here to see the source.
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7# pyre-strict
8
9"""This example shows how to build :py:class:`~spdl.pipeline.Pipeline`
10object with Hydra using building blocks from :py:mod:`spdl.pipeline.defs`
11module.
12
13The definition of the pipeline is found in ``"hydra_integration.yaml"`` file.
14
15.. literalinclude:: ../../../examples/hydra_integration.yaml
16 :language: yaml
17
18"""
19
20__all__ = ["main"]
21
22import os
23
24import hydra
25from omegaconf import DictConfig
26
27os.environ["HYDRA_FULL_ERROR"] = "1"
28
29
30@hydra.main(version_base=None, config_path=".", config_name="hydra_integration")
31def main(cfg: DictConfig) -> None:
32 """The main entry point.
33
34 Args:
35 cfg: The configuration created from the ``"hydra_integration.yaml"`` file.
36 """
37 pipeline_cfg = hydra.utils.instantiate(cfg.pipeline_cfg)
38 print(pipeline_cfg)
39
40 pipeline = hydra.utils.instantiate(cfg.pipeline)
41 print(pipeline)
42
43 with pipeline.auto_stop():
44 for i, item in enumerate(pipeline.get_iterator(timeout=3)):
45 print(i, f"{item=}")
46
47
48if __name__ == "__main__":
49 main()
API Reference¶
Functions