fairseq2.data.data_pipeline

The data pipeline module provides the core data processing functionality in fairseq2.

Classes

final class fairseq2.data.data_pipeline.DataPipeline[source]

Bases: object

fairseq2 native data pipeline.

The pipeline state can be persisted to the disk, allowing it to be resumed later. It is a Python Iterable, but it also contains the iterator states.

Calling iter twice will create two iterators reading from the same dataloader, and sharing the same state, so it will behave inconcistently.

__iter__()[source]

Return an iterator over the examples in the data pipeline.

The iterator will modify the internal state of the this DataPipeline, so it’s not safe to have several iterators over the same DataPipeline.

Return type:

Iterator[Any]

reset(reset_rng=False)[source]

Move back to the first example in the data pipeline.

Parameters:

reset_rng (bool) – If True, resets all random number generators in the pipeline.

property is_broken: bool

Return True if the data pipeline is broken.

If True, any future operation on this data pipeline will raise a DataPipelineError.

state_dict(strict=True)[source]

Return a dictionary containing the state of the data pipeline.

The current position of the data pipeline can be restored by passing the returned state dictionary to load_state_dict().

Parameters:

strict (bool) – If True, the internal buffers will be saved as part of state_dict. This ensures that on preemption no example will be lost, but for large buffers this can significantly increase the state size and the time to restore the data pipeline.

Return type:

dict[str, Any]

load_state_dict(state_dict)[source]

Restore the state of the data pipeline from state_dict.

Parameters:

state_dict (Mapping[str, Any]) – A state dictionary previously returned by state_dict().

static concat(pipelines)[source]

Concatenate examples from pipelines.

Parameters:

pipelines (Sequence[DataPipeline]) – The data pipelines to concatenate.

Return type:

DataPipelineBuilder

static constant(example, key=None)[source]

Repeatedly yield example.

This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.

See Pipeline Types for more details.

Parameters:
  • example (Any) – Example to yield infinitely.

  • key (str | None) – If specified, yields dictionaries as examples, where the key is key and the value is example.

Return type:

DataPipelineBuilder

static count(start=0, step=1, key=None)[source]

Count from start in steps of size step.

This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.

See Pipeline Types for more details.

Parameters:
  • start (int) – Number to start counting from.

  • step (int) – Count step size.

  • key (str | None) – If specified, yields dictionaries as examples, where the key is key and the value is the current number.

Return type:

DataPipelineBuilder

static round_robin(pipelines, stop_at_shortest=False, allow_repeats=True)[source]

Extract examples from pipelines in round robin.

See Combining Pipelines for more details.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to round robin.

  • stop_at_shortest (bool) – If True, stops round_robin when first pipeline reaches its end.

  • allow_repeats (bool) – If True, circles around finished pipelines until all pipelines reach their end. If False, does not repeat pipelines that have reached their end.

Return type:

DataPipelineBuilder

static sample(pipelines, weights=None, seed=None, allow_repeats=True)[source]

Extract examples from pipelines by sampling based on weights. Circles around pipelines until all have reached their end at least once.

See Combining Pipelines for more details.

Parameters:
  • data_pipelines – The data pipelines to sample from.

  • weights (Sequence[float] | None) – Desired distribution of pipelines. If None, use uniform distribution.

  • allow_repeats (bool) – If True, circles around finished pipelines until all pipelines reach their end. If False, does not repeat pipelines that have reached their end.

Return type:

DataPipelineBuilder

static zip(pipelines, names=None, zip_to_shortest=False, flatten=False, disable_parallelism=False)[source]

Zip together examples read from pipelines.

See Combining Pipelines for more details.

Parameters:
  • pipelines (Sequence[DataPipeline]) – The data pipelines to zip.

  • names (Sequence[str] | None) – The names to assign to the data pipelines. If None, yields examples as lists.

  • zip_to_shortest (bool) – If True, stops yielding examples after shortest pipeline terminates. Otherwise, all pipelines (that are not pseudo-infinite) must have the same number of examples.

  • flatten (bool) – If True, flatten examples from each pipeline into one dictionary or list. All pipelines must return the same type (dict or non-dict).,

  • disable_parallelism (bool) – If True, calls each data pipeline sequentially.

Return type:

DataPipelineBuilder

final class fairseq2.data.data_pipeline.DataPipelineBuilder[source]

Bases: object

API to create DataPipeline

bucket(bucket_size, drop_remainder=False)[source]

Combine a number of consecutive examples into a single example.

See Combining Pipelines for more details.

Parameters:
  • bucket_size (int) – The number of examples to combine.

  • drop_remainder (bool) – If True, drops the last bucket in case it has fewer than bucket_size examples.

Return type:

Self

bucket_by_length(bucket_sizes, selector=None, min_data_len=1, skip_below_min_examples=False, skip_above_max_examples=False, drop_remainder=False)[source]

Combine examples of similar shape into batches.

Return type:

Self

collate(pad_value=None, pad_to_multiple=1, overrides=None)[source]

Concatenate a list of inputs into a single inputs.

This is equivalent to calling .map(Collater()). See fairseq2.data.Collater for details.

Return type:

Self

dynamic_bucket(threshold, cost_fn, bucket_creation_fn=None, min_num_examples=None, max_num_examples=None, drop_remainder=False)[source]

Combine a number of consecutive examples into a single example based on cumulative cost of examples, as measured by user-provided cost_fn.

Yields a bucket once the cumulative cost produced by cost_fn meets or exceeds threshold.

Parameters:
  • threshold (float) – Threshold for cumulative cost to trigger bucketing.

  • cost_fn (Callable[[Any], float]) – Cost function that outputs cost for a particular example.

  • bucket_creation_fn (Callable[[Sequence[Any]], tuple[Sequence[Sequence[Any]], Sequence[Any]]] | None) – Function for customizing bucket creation. Called with the bucket of examples that caused the cost threshold to be exceeded. Expected to return a tuple of (new_buckets, remainder), where the internal buffer is set to remainder and new_buckets is a list of buckets to be yielded. If None, defaults to the identity function.

  • min_num_examples (int | None) – Minimum number of examples per bucket.

  • max_num_examples (int | None) – Maximum number of examples per bucket.

  • drop_remainder (bool) – If True, drops the last bucket in case it has fewer than min_num_examples examples or the cumulative cost has not reached threshold yet.

Return type:

Self

filter(predicate)[source]

Filter examples from data pipeline and keep only those who match predicate.

Parameters:

predicate (Callable[[Any], Any]) – The predicate used to select examples to keep.

Return type:

Self

map(fn, selector=None, num_parallel_calls=1)[source]

Apply fn to each example.

Example usage:

data = [2, 5]
data.map(lambda x: x + 10)
# yields: 12, 15
data.map(lambda x: x + 10, num_parallel_calls=8)
# same results but will use more cores
data = [{"a": 2, "b": 1}, {"a": 5, "b": 3}]
data.map(lambda x: x + 10, selector="a")
# yields: {"a": 12, "b": 1}, {"a": 15, "b": 3}
data.map(lambda x: x + 10, selector="a,b")
# yields: {"a": 12, "b": 11}, {"a": 15, "b": 13}
Parameters:
  • fn (Callable[[Any], Any] | Sequence[Callable[[Any], Any]]) – The function to apply. If it’s a list of function, they will be automatically chained. .map([f1, f2]) is the more efficient version of .map(f1).map(f2)

  • selector (str | None) – The column to apply the function to. Several columns can be specified by separating them with a “,”. See Column Selection for more details.

  • num_parallel_calls (int) – The number of examples to process in parallel.

Return type:

Self

prefetch(num_examples)[source]

Prefetch examples in the background while the current example is being processed.

Parameters:

num_examples (int) – The number of examples to prefetch.

Return type:

Self

repeat(num_repeats=None, reset_rng=False)[source]

Repeats the sequence of pipeline examples num_repeats times.

Parameters:
  • num_repeats (int | None) – The number of times to repeat examples. If None, repeats infinitely.

  • reset_rng (bool) – If True, upon repeats, resets all random number generators in pipeline.

Return type:

Self

shard(shard_idx, num_shards, allow_uneven=False)[source]

Read only 1/num_shards of the examples in the data pipeline.

Parameters:
  • shard_idx (int) – The shard index.

  • num_shards (int) – The number of shards.

Return type:

Self

shuffle(shuffle_window, seed=None)[source]

Shuffle examples using a fixed sized buffer.

Parameters:

shuffle_window (int) – The size of the intermediate buffer used for shuffling. Examples will be randomly sampled from this buffer, and selected examples will be replaced with new examples. If 0, all examples will be loaded into memory for full shuffling.

Return type:

Self

skip(num_examples)[source]

Skip num_examples examples.

Return type:

Self

take(num_examples)[source]

Return at most num_examples examples.

Return type:

Self

yield_from(fn)[source]

Map every example to a data pipeline and yield the examples returned from the mapped data pipelines.

Parameters:

fn (Callable[[Any], DataPipeline]) – The function to map examples to data pipelines.

Return type:

Self

and_return(max_num_warnings=0)[source]

Return a new DataPipeline instance.

Return type:

DataPipeline

final class fairseq2.data.data_pipeline.Collater(pad_value=None, pad_to_multiple=1, overrides=None)[source]

Bases: object

Concatenate a list of inputs into a single inputs.

Used to create batches. If all tensors in the input example have the same last dimension, Collater returns the concatenated tensors.

Otherwise pad_value is required, and the last dimension of the batch will be made long enough to fit the longest tensor, rounded up to pad_to_multiple. The returned batch is then a dictionary with the following keys:

{
    "is_ragged": True/False # True if padding was needed
    "seqs": [[1, 4, 5, 0], [1, 2, 3, 4]]  # "(Tensor) concatenated and padded tensors from the input
    "seq_lens": [3, 4]  # A tensor describing the original length of each input tensor
}

Collater preserves the shape of the original data. For a tuple of lists, it returns a tuple of batches. For a dict of lists, it returns a dict of lists.

Parameters:
  • pad_value (int | None) – When concatenating tensors of different lengths, the value used to pad the shortest tensor

  • pad_to_multiple (int) – Always pad to a length of that multiple.

  • overrides (Sequence[CollateOptionsOverride] | None) – List of overrides CollateOptionsOverride. Allows to override pad_value and pad_to_multiple for specific columns.

__call__(data)[source]

Concatenate the input tensors

Return type:

Any

class fairseq2.data.data_pipeline.CollateOptionsOverride(selector, pad_value=None, pad_to_multiple=1)[source]

Bases: object

Overrides how the collater should create batch for a particular column.

Useful if not all columns should use the same padding idx, or padding multiple. See Collater for details.

Parameters:

selector (str) – The columns this overrides applies to. See Column Selection for details on how to specify columns.

__init__(selector, pad_value=None, pad_to_multiple=1)[source]
final class fairseq2.data.data_pipeline.FileMapper(root_dir=None, cached_fd_count=None)[source]

Bases: object

For a given file name, returns the file content as bytes.

The file name can also specify a slice of the file in bytes: FileMapper("big_file.txt:1024:48") will read 48 bytes at offset 1024.

Parameters:
  • root_dir (Path | None) – Root directory for looking up relative file names. Warning, this is not enforced, FileMapper will happily read any file on the system.

  • cached_fd_count (int | None) – Enables an LRU cache on the last cached_fd_count files read. FileMapper will memory map all the cached file, so this is especially useful for reading several slices of the same file.

__init__(root_dir=None, cached_fd_count=None)[source]
__call__(pathname)[source]

Parses the pathname and returns the file bytes.

Returns:

A dict with the following keys:

{
    "path": "the/path.txt" # the relative path of the file
    "data": MemoryBlock  # a memory block with the content of the file. You can use `bytes` to get a regular python object.
}

Return type:

FileMapperOutput

class fairseq2.data.data_pipeline.SequenceData[source]

Bases: TypedDict

class fairseq2.data.data_pipeline.FileMapperOutput[source]

Bases: TypedDict

Functions

fairseq2.data.data_pipeline.create_bucket_sizes(*, max_num_elements, max_seq_len, min_seq_len=1, num_seqs_multiple_of=1)[source]

Create optimal bucket sizes for DataPipeline.bucket_by_length().

Parameters:
  • max_num_elements (int) – The maximum number of elements that each bucket can contain.

  • max_seq_len (int) – The maximum sequence length.

  • min_seq_len (int) – The minimum sequence length.

  • num_seqs_multiple_of (int) – The number of sequences contained in each bucket must be a multiple of this value.

Return type:

list[tuple[int, int]]

fairseq2.data.data_pipeline.get_last_failed_example()[source]
Return type:

Any

fairseq2.data.data_pipeline.list_files(path, pattern=None)[source]

List recursively all files under path that matches pattern.

Parameters:
  • path (Path) – The path to traverse.

  • pattern (str | None) – If non-empty, a pattern that follows the syntax of fnmatch.

Return type:

DataPipelineBuilder

fairseq2.data.data_pipeline.read_sequence(seq)[source]

Read every element in seq.

Parameters:

seq (Sequence[Any]) – The sequence to read.

Return type:

DataPipelineBuilder

fairseq2.data.data_pipeline.read_zipped_records(path)[source]

Read each file in a zip archive

Return type:

DataPipelineBuilder

fairseq2.data.data_pipeline.read_iterator(iterator, reset_fn, infinite, skip_pickling_check=False)[source]

Read each element of iterator.

Iterators must be pickleable in order for state_dict saving/reloading to work. To avoid surprises, by default, upon creation, read_iterator will perform a test pickle of iterator to ensure pickleability and will raise an error if pickling fails. To skip this pickling check performed upon creation (if, for example, you do not need to save/reload the state_dict or want to avoid incurring the cost of pickling on creation), set skip_pickling_check to True.

Parameters:
  • iterator (T) – The iterator to read.

  • reset_fn (Callable[[T], T]) – Function to reset iterator.

  • infinite (bool) – Whether iterator is infinite or not.

  • skip_pickling_check (bool) – Whether to skip the pickling check or not.

Return type:

DataPipelineBuilder

Exceptions

class fairseq2.data.data_pipeline.DataPipelineError[source]

Bases: Exception

Raised when an error occurs while reading from a data pipeline.

class fairseq2.data.data_pipeline.ByteStreamError[source]

Bases: Exception

Raised when a dataset file can’t be read.

class fairseq2.data.data_pipeline.RecordError[source]

Bases: Exception

Raised when a corrupt record is encountered while reading a dataset.

Examples

Creating a Basic Pipeline

from fairseq2.data import read_sequence, DataPipeline

# Create a simple pipeline that processes numbers
pipeline = (
    read_sequence([1, 2, 3, 4, 5])
    .map(lambda x: x * 2)
    .filter(lambda x: x > 5)
    .and_return()
)

# Iterate over the results
for item in pipeline:
    print(item)  # Outputs: 6, 8, 10

Using Column Selection

# Process structured data with column selection
data = [
    {"text": "Hello", "label": 1},
    {"text": "World", "label": 0}
]

pipeline = (
    read_sequence(data)
    .map(lambda x: x.upper(), selector="text")
    .and_return()
)

# Results will have uppercase text but unchanged labels
# [{"text": "HELLO", "label": 1}, {"text": "WORLD", "label": 0}]

Combining Pipelines

# Create two pipelines
p1 = read_sequence([1, 2, 3]).and_return()
p2 = read_sequence(['a', 'b', 'c']).and_return()

# Zip them together with names
combined = DataPipeline.zip(
    [p1, p2],
    names=["numbers", "letters"]
).and_return()

# Results: [
#   {"numbers": 1, "letters": 'a'},
#   {"numbers": 2, "letters": 'b'},
#   {"numbers": 3, "letters": 'c'}
# ]

Using Bucketing

from fairseq2.data import create_bucket_sizes

# Create optimal bucket sizes for sequence processing
bucket_sizes = create_bucket_sizes(
    max_num_elements=1024,
    max_seq_len=128,
    min_seq_len=1,
    num_seqs_multiple_of=8
)

# Use bucketing in a pipeline
pipeline = (
    read_sequence(data)
    .bucket_by_length(
        bucket_sizes,
        selector="text",
        drop_remainder=False
    )
    .and_return()
)

State Management

# Save pipeline state
state = pipeline.state_dict()

# Load pipeline state
new_pipeline = create_pipeline()  # Create a new pipeline
new_pipeline.load_state_dict(state)  # Restore the state

See Also