fairseq2.data.data_pipeline¶
The data pipeline module provides the core data processing functionality in fairseq2.
Classes¶
- final class fairseq2.data.data_pipeline.DataPipeline[source]¶
Bases:
object
fairseq2 native data pipeline.
The pipeline state can be persisted to the disk, allowing it to be resumed later. It is a Python Iterable, but it also contains the iterator states.
Calling iter twice will create two iterators reading from the same dataloader, and sharing the same state, so it will behave inconcistently.
- __iter__()[source]¶
Return an iterator over the examples in the data pipeline.
The iterator will modify the internal state of the this DataPipeline, so it’s not safe to have several iterators over the same DataPipeline.
- reset(reset_rng=False)[source]¶
Move back to the first example in the data pipeline.
- Parameters:
reset_rng (bool) – If
True
, resets all random number generators in the pipeline.
- property is_broken: bool¶
Return
True
if the data pipeline is broken.If
True
, any future operation on this data pipeline will raise aDataPipelineError
.
- state_dict(strict=True)[source]¶
Return a dictionary containing the state of the data pipeline.
The current position of the data pipeline can be restored by passing the returned state dictionary to
load_state_dict()
.
- load_state_dict(state_dict)[source]¶
Restore the state of the data pipeline from
state_dict
.- Parameters:
state_dict (Mapping[str, Any]) – A state dictionary previously returned by
state_dict()
.
- static concat(pipelines)[source]¶
Concatenate examples from
pipelines
.- Parameters:
pipelines (Sequence[DataPipeline]) – The data pipelines to concatenate.
- Return type:
- static constant(example, key=None)[source]¶
Repeatedly yield
example
.This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.
See Pipeline Types for more details.
- Parameters:
- Return type:
- static count(start=0, step=1, key=None)[source]¶
Count from
start
in steps of sizestep
.This pipeline is pseudo-infinite; when used with functions that combine pipelines (e.g. sample, round_robin, zip), it will yield examples only as long as other pipelines yield examples.
See Pipeline Types for more details.
- Parameters:
- Return type:
- static round_robin(pipelines, stop_at_shortest=False, allow_repeats=True)[source]¶
Extract examples from
pipelines
in round robin.See Combining Pipelines for more details.
- Parameters:
pipelines (Sequence[DataPipeline]) – The data pipelines to round robin.
stop_at_shortest (bool) – If
True
, stops round_robin when first pipeline reaches its end.allow_repeats (bool) – If
True
, circles around finished pipelines until all pipelines reach their end. IfFalse
, does not repeat pipelines that have reached their end.
- Return type:
- static sample(pipelines, weights=None, seed=None, allow_repeats=True)[source]¶
Extract examples from
pipelines
by sampling based onweights
. Circles around pipelines until all have reached their end at least once.See Combining Pipelines for more details.
- Parameters:
data_pipelines – The data pipelines to sample from.
weights (Sequence[float] | None) – Desired distribution of pipelines. If
None
, use uniform distribution.allow_repeats (bool) – If
True
, circles around finished pipelines until all pipelines reach their end. IfFalse
, does not repeat pipelines that have reached their end.
- Return type:
- static zip(pipelines, names=None, zip_to_shortest=False, flatten=False, disable_parallelism=False)[source]¶
Zip together examples read from
pipelines
.See Combining Pipelines for more details.
- Parameters:
pipelines (Sequence[DataPipeline]) – The data pipelines to zip.
names (Sequence[str] | None) – The names to assign to the data pipelines. If
None
, yields examples as lists.zip_to_shortest (bool) – If
True
, stops yielding examples after shortest pipeline terminates. Otherwise, all pipelines (that are not pseudo-infinite) must have the same number of examples.flatten (bool) – If
True
, flatten examples from each pipeline into one dictionary or list. All pipelines must return the same type (dict or non-dict).,disable_parallelism (bool) – If
True
, calls each data pipeline sequentially.
- Return type:
- final class fairseq2.data.data_pipeline.DataPipelineBuilder[source]¶
Bases:
object
API to create DataPipeline
- bucket(bucket_size, drop_remainder=False)[source]¶
Combine a number of consecutive examples into a single example.
See Combining Pipelines for more details.
- bucket_by_length(bucket_sizes, selector=None, min_data_len=1, skip_below_min_examples=False, skip_above_max_examples=False, drop_remainder=False)[source]¶
Combine examples of similar shape into batches.
- Return type:
- collate(pad_value=None, pad_to_multiple=1, overrides=None)[source]¶
Concatenate a list of inputs into a single inputs.
This is equivalent to calling .map(Collater()). See
fairseq2.data.Collater
for details.- Return type:
- dynamic_bucket(threshold, cost_fn, bucket_creation_fn=None, min_num_examples=None, max_num_examples=None, drop_remainder=False)[source]¶
Combine a number of consecutive examples into a single example based on cumulative cost of examples, as measured by user-provided
cost_fn
.Yields a bucket once the cumulative cost produced by
cost_fn
meets or exceedsthreshold
.- Parameters:
threshold (float) – Threshold for cumulative cost to trigger bucketing.
cost_fn (Callable[[Any], float]) – Cost function that outputs cost for a particular example.
bucket_creation_fn (Callable[[Sequence[Any]], tuple[Sequence[Sequence[Any]], Sequence[Any]]] | None) – Function for customizing bucket creation. Called with the bucket of examples that caused the cost threshold to be exceeded. Expected to return a tuple of
(new_buckets, remainder)
, where the internal buffer is set toremainder
andnew_buckets
is a list of buckets to be yielded. IfNone
, defaults to the identity function.min_num_examples (int | None) – Minimum number of examples per bucket.
max_num_examples (int | None) – Maximum number of examples per bucket.
drop_remainder (bool) – If
True
, drops the last bucket in case it has fewer thanmin_num_examples
examples or the cumulative cost has not reachedthreshold
yet.
- Return type:
- filter(predicate)[source]¶
Filter examples from data pipeline and keep only those who match
predicate
.
- map(fn, selector=None, num_parallel_calls=1)[source]¶
Apply
fn
to each example.Example usage:
data = [2, 5] data.map(lambda x: x + 10) # yields: 12, 15 data.map(lambda x: x + 10, num_parallel_calls=8) # same results but will use more cores data = [{"a": 2, "b": 1}, {"a": 5, "b": 3}] data.map(lambda x: x + 10, selector="a") # yields: {"a": 12, "b": 1}, {"a": 15, "b": 3} data.map(lambda x: x + 10, selector="a,b") # yields: {"a": 12, "b": 11}, {"a": 15, "b": 13}
- Parameters:
fn (Callable[[Any], Any] | Sequence[Callable[[Any], Any]]) – The function to apply. If it’s a list of function, they will be automatically chained.
.map([f1, f2])
is the more efficient version of.map(f1).map(f2)
selector (str | None) – The column to apply the function to. Several columns can be specified by separating them with a “,”. See Column Selection for more details.
num_parallel_calls (int) – The number of examples to process in parallel.
- Return type:
- prefetch(num_examples)[source]¶
Prefetch examples in the background while the current example is being processed.
- repeat(num_repeats=None, reset_rng=False)[source]¶
Repeats the sequence of pipeline examples
num_repeats
times.
- shard(shard_idx, num_shards, allow_uneven=False)[source]¶
Read only 1/
num_shards
of the examples in the data pipeline.
- yield_from(fn)[source]¶
Map every example to a data pipeline and yield the examples returned from the mapped data pipelines.
- Parameters:
fn (Callable[[Any], DataPipeline]) – The function to map examples to data pipelines.
- Return type:
- and_return(max_num_warnings=0)[source]¶
Return a new
DataPipeline
instance.- Return type:
- final class fairseq2.data.data_pipeline.Collater(pad_value=None, pad_to_multiple=1, overrides=None)[source]¶
Bases:
object
Concatenate a list of inputs into a single inputs.
Used to create batches. If all tensors in the input example have the same last dimension,
Collater
returns the concatenated tensors.Otherwise
pad_value
is required, and the last dimension of the batch will be made long enough to fit the longest tensor, rounded up topad_to_multiple
. The returned batch is then a dictionary with the following keys:{ "is_ragged": True/False # True if padding was needed "seqs": [[1, 4, 5, 0], [1, 2, 3, 4]] # "(Tensor) concatenated and padded tensors from the input "seq_lens": [3, 4] # A tensor describing the original length of each input tensor }
Collater preserves the shape of the original data. For a tuple of lists, it returns a tuple of batches. For a dict of lists, it returns a dict of lists.
- Parameters:
pad_value (int | None) – When concatenating tensors of different lengths, the value used to pad the shortest tensor
pad_to_multiple (int) – Always pad to a length of that multiple.
overrides (Sequence[CollateOptionsOverride] | None) – List of overrides
CollateOptionsOverride
. Allows to overridepad_value
andpad_to_multiple
for specific columns.
- class fairseq2.data.data_pipeline.CollateOptionsOverride(selector, pad_value=None, pad_to_multiple=1)[source]¶
Bases:
object
Overrides how the collater should create batch for a particular column.
Useful if not all columns should use the same padding idx, or padding multiple. See
Collater
for details.- Parameters:
selector (str) – The columns this overrides applies to. See Column Selection for details on how to specify columns.
- final class fairseq2.data.data_pipeline.FileMapper(root_dir=None, cached_fd_count=None)[source]¶
Bases:
object
For a given file name, returns the file content as bytes.
The file name can also specify a slice of the file in bytes:
FileMapper("big_file.txt:1024:48")
will read 48 bytes at offset 1024.- Parameters:
root_dir (Path | None) – Root directory for looking up relative file names. Warning, this is not enforced, FileMapper will happily read any file on the system.
cached_fd_count (int | None) – Enables an LRU cache on the last
cached_fd_count
files read.FileMapper
will memory map all the cached file, so this is especially useful for reading several slices of the same file.
- __call__(pathname)[source]¶
Parses the pathname and returns the file bytes.
- Returns:
A dict with the following keys:
{ "path": "the/path.txt" # the relative path of the file "data": MemoryBlock # a memory block with the content of the file. You can use `bytes` to get a regular python object. }
- Return type:
Functions¶
- fairseq2.data.data_pipeline.create_bucket_sizes(*, max_num_elements, max_seq_len, min_seq_len=1, num_seqs_multiple_of=1)[source]¶
Create optimal bucket sizes for
DataPipeline.bucket_by_length()
.- Parameters:
- Return type:
- fairseq2.data.data_pipeline.list_files(path, pattern=None)[source]¶
List recursively all files under
path
that matchespattern
.- Parameters:
- Return type:
- fairseq2.data.data_pipeline.read_sequence(seq)[source]¶
Read every element in
seq
.- Parameters:
- Return type:
- fairseq2.data.data_pipeline.read_zipped_records(path)[source]¶
Read each file in a zip archive
- Return type:
- fairseq2.data.data_pipeline.read_iterator(iterator, reset_fn, infinite, skip_pickling_check=False)[source]¶
Read each element of
iterator
.Iterators must be pickleable in order for
state_dict
saving/reloading to work. To avoid surprises, by default, upon creation,read_iterator
will perform a test pickle ofiterator
to ensure pickleability and will raise an error if pickling fails. To skip this pickling check performed upon creation (if, for example, you do not need to save/reload thestate_dict
or want to avoid incurring the cost of pickling on creation), setskip_pickling_check
toTrue
.- Parameters:
- Return type:
Exceptions¶
- class fairseq2.data.data_pipeline.DataPipelineError[source]¶
Bases:
Exception
Raised when an error occurs while reading from a data pipeline.
Examples¶
Creating a Basic Pipeline¶
from fairseq2.data import read_sequence, DataPipeline
# Create a simple pipeline that processes numbers
pipeline = (
read_sequence([1, 2, 3, 4, 5])
.map(lambda x: x * 2)
.filter(lambda x: x > 5)
.and_return()
)
# Iterate over the results
for item in pipeline:
print(item) # Outputs: 6, 8, 10
Using Column Selection¶
# Process structured data with column selection
data = [
{"text": "Hello", "label": 1},
{"text": "World", "label": 0}
]
pipeline = (
read_sequence(data)
.map(lambda x: x.upper(), selector="text")
.and_return()
)
# Results will have uppercase text but unchanged labels
# [{"text": "HELLO", "label": 1}, {"text": "WORLD", "label": 0}]
Combining Pipelines¶
# Create two pipelines
p1 = read_sequence([1, 2, 3]).and_return()
p2 = read_sequence(['a', 'b', 'c']).and_return()
# Zip them together with names
combined = DataPipeline.zip(
[p1, p2],
names=["numbers", "letters"]
).and_return()
# Results: [
# {"numbers": 1, "letters": 'a'},
# {"numbers": 2, "letters": 'b'},
# {"numbers": 3, "letters": 'c'}
# ]
Using Bucketing¶
from fairseq2.data import create_bucket_sizes
# Create optimal bucket sizes for sequence processing
bucket_sizes = create_bucket_sizes(
max_num_elements=1024,
max_seq_len=128,
min_seq_len=1,
num_seqs_multiple_of=8
)
# Use bucketing in a pipeline
pipeline = (
read_sequence(data)
.bucket_by_length(
bucket_sizes,
selector="text",
drop_remainder=False
)
.and_return()
)
State Management¶
# Save pipeline state
state = pipeline.state_dict()
# Load pipeline state
new_pipeline = create_pipeline() # Create a new pipeline
new_pipeline.load_state_dict(state) # Restore the state
See Also¶
Data Pipeline - Basic introduction to data pipeline