DataPipeline

final class fairseq2.data.DataPipeline[source]

Bases: object

fairseq2 native data pipeline.

The pipeline state can be persisted to the disk, allowing it to be resumed later. It is a Python Iterable, but it also contains the iterator states.

Calling iter twice will create two iterators reading from the same dataloader, and sharing the same state, so it will behave inconcistently.

__iter__()[source]

Return an iterator over the examples in the data pipeline.

The iterator will modify the internal state of the this DataPipeline, so it’s not safe to have several iterators over the same DataPipeline.

Return type:

Iterator[Any]

static concat(pipelines)[source]

Concatenate examples from pipelines.

Parameters:

pipelines (Sequence[DataPipeline]) – The data pipelines to concatenate.

Return type:

DataPipelineBuilder

static constant(example, key=None)[source]

Repeatedly yield example.

This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.

See Pseudo-infinite and Infinite Pipelines for more details.

Parameters:
  • example (Any) – Example to yield infinitely.

  • key (str | None) – If specified, yields dictionaries as examples, where the key is key and the value is example.

Return type:

DataPipelineBuilder

static count(start=0, step=1, key=None)[source]

Count from start in steps of size step.

This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.

See Pseudo-infinite and Infinite Pipelines for more details.

Parameters:
  • start (int) – Number to start counting from.

  • step (int) – Count step size.

  • key (str | None) – If specified, yields dictionaries as examples, where the key is key and the value is the current number.

Return type:

DataPipelineBuilder

load_state_dict(state_dict)[source]

Restore the state of the data pipeline from state_dict.

Parameters:

state_dict (Mapping[str, Any]) – A state dictionary previously returned by state_dict().

reset(reset_rng=False)[source]

Move back to the first example in the data pipeline.

Parameters:

reset_rng (bool) – If True, resets all random number generators in the pipeline.

static round_robin(pipelines, stop_at_shortest=False, allow_repeats=True)[source]

Extract examples from pipelines in round robin.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to round robin.

  • stop_at_shortest (bool) – If True, stops round_robin when first pipeline reaches its end.

  • allow_repeats (bool) – If True, circles around finished pipelines until all pipelines reach their end. If False, does not repeat pipelines that have reached their end.

Return type:

DataPipelineBuilder

static sample(pipelines, weights=None, seed=None, allow_repeats=True)[source]

Extract examples from pipelines by sampling based on weights. Circles around pipelines until all have reached their end at least once.

Parameters:
  • data_pipelines – The data pipelines to sample from.

  • weights (Sequence[float] | None) – Desired distribution of pipelines. If None, use uniform distribution.

  • allow_repeats (bool) – If True, circles around finished pipelines until all pipelines reach their end. If False, does not repeat pipelines that have reached their end.

Return type:

DataPipelineBuilder

state_dict(strict=True)[source]

Return a dictionary containing the state of the data pipeline.

The current position of the data pipeline can be restored by passing the returned state dictionary to load_state_dict().

Parameters:

strict (bool) – If True, the internal buffers will be saved as part of state_dict. This ensures that on preemption no example will be lost, but for large buffers this can significantly increase the state size and the time to restore the data pipeline.

Return type:

Dict[str, Any]

static zip(pipelines, names=None, zip_to_shortest=False, flatten=False, disable_parallelism=False)[source]

Zip together examples read from pipelines.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to zip.

  • names (Sequence[str] | None) – The names to assign to the data pipelines. If None, yields examples as lists.

  • zip_to_shortest (bool) – If True, stops yielding examples after shortest pipeline terminates. Otherwise, all pipelines (that are not pseudo-infinite) must have the same number of examples.

  • flatten (bool) – If True, flatten examples from each pipeline into one dictionary or list. All pipelines must return the same type (dict or non-dict).,

  • disable_parallelism (bool) – If True, calls each data pipeline sequentially.

Return type:

DataPipelineBuilder

property is_broken: bool

Return True if the data pipeline is broken.

If True, any future operation on this data pipeline will raise a DataPipelineError.