Data pipelines in fairseq2 provide an efficient way to process and transform data for machine learning tasks.
The implementation leverages multiple threads to work around Python’s Global Interpreter Lock (GIL) limitations,
resulting in better performance than pure Python dataloaders.
graph TD
A[Input Dictionary] --> B[foo]
A --> C[bar: 6]
B --> D[List Index 0]
B --> E[List Index 1]
D --> F[x: 1]
D --> G[y: 2]
E --> H[x: 3]
E --> I[y: 4]
E --> J[z: 5]
style I fill:#f96,stroke:#333
style C fill:#f96,stroke:#333
Finite Pipelines: Standard pipelines that terminate after processing all data
Pseudo-infinite Pipelines: Created using DataPipeline.count or DataPipeline.constant
Infinite Pipelines: Created using DataPipelineBuilder.repeat without arguments
graph TD
subgraph Finite
A[read_sequence] --> B[End]
end
subgraph Pseudo-infinite
C[constant/count] --> D[Stops with other pipelines]
end
subgraph Infinite
E[repeat] --> F[Never ends]
end
fairseq2 provides flexible shuffling capabilities through the shuffle operator:
# Basic shuffling with a window sizepipeline=(read_sequence(data).shuffle(shuffle_window=1000)# Shuffle using a 1000-example buffer.and_return())# Shuffle between epochsforepochinrange(3):pipeline.reset()# By default, this re-shuffles dataforiteminpipeline:process(item)# Disable shuffling between epochspipeline.reset(reset_rng=True)# Keep the same order
The shuffle operator maintains a buffer of the specified size.
When requesting the next example, it randomly samples from this buffer and replaces the selected example with a new one from the source.
Setting shuffle_window=0 loads all examples into memory for full shuffling.
Bucketing helps handle variable-length sequences efficiently. There are several bucketing strategies:
Fixed-size Bucketing: Combine a fixed number of examples
pipeline=(read_sequence(data).bucket(bucket_size=32,drop_remainder=True)# Combine 32 examples into one bucket.and_return())
Length-based Bucketing: Group sequences of similar lengths
fromfairseq2.dataimportcreate_bucket_sizes# Create optimal bucket sizesbucket_sizes=create_bucket_sizes(max_num_elements=1024,# Max elements per bucketmax_seq_len=128,# Max sequence lengthmin_seq_len=1,# Min sequence lengthnum_seqs_multiple_of=8# Ensure bucket sizes are multiples of 8)# Use bucketing in pipelinepipeline=(read_sequence(data).bucket_by_length(bucket_sizes,selector="length",# Column containing sequence lengthsskip_above_max_examples=True,# Skip sequences longer than max_seq_lendrop_remainder=False# Keep partial buckets).and_return())
Dynamic Bucketing: Combine examples based on a cost function
defsequence_cost(example):returnlen(example["text"])pipeline=(read_sequence(data).dynamic_bucket(threshold=1024,# Target bucket sizecost_fn=sequence_cost,# Function to compute example costmin_num_examples=16,# Min examples per bucketmax_num_examples=64,# Max examples per bucketdrop_remainder=False# Keep partial buckets).and_return())
This approach efficiently handles variable-length sequences while maintaining appropriate batch sizes for training.
There are more features in fairseq2’s data pipeline:
Prefetching: Load data ahead of time for better performance
State Management: Save and restore pipeline state for resumable processing
Note
When combining pseudo-infinite pipelines with finite ones, the pseudo-infinite pipeline will stop when the finite pipeline ends.
For truly infinite behavior, use repeat() without arguments.