DataPipeline

class fairseq2.data.DataPipeline[source]

Bases: Iterable[Any]

fairseq2 native data pipeline.

The pipeline state can be persisted to the disk, allowing it to be resumed later. It is a Python Iterable, but it also contains the iterator states.

Calling iter twice will create two iterators reading from the same dataloader, and sharing the same state, so it will behave inconcistently.

__iter__()[source]

Return an iterator over the examples in the data pipeline.

The iterator will modify the internal state of the this DataPipeline, so it’s not safe to have several iterators over the same DataPipeline.

Return type:

Iterator[Any]

static concat(pipelines)[source]

Concatenate examples from pipelines.

Parameters:

pipelines (Sequence[DataPipeline]) – The data pipelines to concatenate.

Return type:

DataPipelineBuilder

load_state_dict(state_dict, strict=True)[source]

Restore the state of the data pipeline from state_dict.

Parameters:
reset()[source]

Move back to the first example in the data pipeline.

static round_robin(pipelines, stop_at_shortest=False)[source]

Extract examples from pipelines in round robin.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to round robin.

  • stop_at_shortest (bool) – If True, stop round_robin when first pipeline reaches its end. If False, circle around finished pipelines until all pipelines reach their end.

Return type:

DataPipelineBuilder

static sample(pipelines, weights=None, stop_at_shortest=False)[source]

Extract examples from pipelines by sampling based on weights.

Parameters:
  • data_pipelines – The data pipelines to sample from.

  • weights (Sequence[float] | None) – Desired distribution of pipelines. If None, use uniform distribution.

  • stop_at_shortest (bool) – If True, stop sampling when first pipeline reaches its end. If False, circle around finished pipelines until all pipelines reach their end.

Return type:

DataPipelineBuilder

state_dict()[source]

Return a dictionary containing the state of the data pipeline.

The current position of the data pipeline can be restored by passing the returned state dictionary to load_state_dict().

Return type:

Dict[str, Any]

static zip(pipelines, names=None, zip_to_shortest=False, flatten=False, disable_parallelism=False)[source]

Zip together examples read from pipelines.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to zip.

  • names (Sequence[str] | None) – The names to assign to the data pipelines.

  • flatten (bool) –

  • disable_parallelism (bool) – If True, calls each data pipeline sequentially.

Return type:

DataPipelineBuilder

property is_broken: bool

Return True if the data pipeline is broken.

If True, any future operation on this data pipeline will raise a DataPipelineError.