Data Pipeline

Data pipelines in fairseq2 provide an efficient way to process and transform data for machine learning tasks. The implementation leverages multiple threads to work around Python’s Global Interpreter Lock (GIL) limitations, resulting in better performance than pure Python dataloaders.

Basic Pipeline Structure

A data pipeline consists of a series of operations that transform data. Here’s a basic example:

data = (
    text.read_text("file.tsv")
    .map(lambda x: str(x.split("\t")[1]).lower())
    .filter(lambda x: len(x) < 10)
)
        graph LR
   A[read_text] --> B[map]
   B --> C[filter]
   style A fill:#f9f,stroke:#333
   style B fill:#bbf,stroke:#333
   style C fill:#bfb,stroke:#333
    
A more complex pipeline that can be built w/ fairseq2 as a diagram
A more complex pipeline that can be built w/ fairseq2

Column Selection

Data items in the pipeline can be tuples or Python dictionaries. Many operators support a selector argument to specify which column to process:

  • For tuples: "[3]" selects the fourth element (0-based indexing)

  • For dictionaries: "foo" selects the value for key "foo"

  • Nested selectors: Use . to traverse nested structures (e.g., "foo[1].y")

Example with nested data:

data = {"foo": [{"x": 1, "y": 2}, {"x": 3, "y": 4, "z": 5}], "bar": 6}
# "foo[1].y" selects 4
# "bar" selects 6
        graph TD
   A[Input Dictionary] --> B[foo]
   A --> C[bar: 6]
   B --> D[List Index 0]
   B --> E[List Index 1]
   D --> F[x: 1]
   D --> G[y: 2]
   E --> H[x: 3]
   E --> I[y: 4]
   E --> J[z: 5]
   style I fill:#f96,stroke:#333
   style C fill:#f96,stroke:#333
    

Pipeline Types

fairseq2 supports three types of pipelines:

  1. Finite Pipelines: Standard pipelines that terminate after processing all data

  2. Pseudo-infinite Pipelines: Created using DataPipeline.count or DataPipeline.constant

  3. Infinite Pipelines: Created using DataPipelineBuilder.repeat without arguments

        graph TD
   subgraph Finite
      A[read_sequence] --> B[End]
   end
   subgraph Pseudo-infinite
      C[constant/count] --> D[Stops with other pipelines]
   end
   subgraph Infinite
      E[repeat] --> F[Never ends]
   end
    

Combining Pipelines

fairseq2 provides several ways to combine pipelines:

  1. Round Robin: Alternates between pipelines:

    pipeline1 = DataPipeline.constant(0).and_return()
    pipeline2 = read_sequence([1, 2, 3]).and_return()
    
    for example in DataPipeline.round_robin(pipeline1, pipeline2).and_return():
        print(example)
    
    # round_robin yields: 0, 1, 0, 2, 0, 3
    
  2. Zip: Combines examples from multiple pipelines:

    pipeline1 = read_sequence([0]).repeat().and_return()
    pipeline2 = read_sequence([1, 2, 3]).and_return()
    
    for example in DataPipeline.zip(pipeline1, pipeline2, names=["a", "b"]).and_return():
        print(example)
    
    # Yields: {"a": 0, "b": 1}, {"a": 0, "b": 2}, {"a": 0, "b": 3}
    
  3. Sample: Randomly samples from pipelines based on weights:

    pipeline1 = read_sequence([0]).repeat().and_return()
    pipeline2 = read_sequence([1, 2, 3]).and_return()
    
    for example in DataPipeline.sample(pipeline1, pipeline2, weights=[0.5, 0.5]).and_return():
        print(example)
    
        graph TD
   subgraph Round Robin
      A1[Pipeline 1] --> C1{Alternate}
      B1[Pipeline 2] --> C1
      C1 --> D1[Output]
   end
   subgraph Zip
      A2[Pipeline 1] --> C2((Combine))
      B2[Pipeline 2] --> C2
      C2 --> D2[Output]
   end
   subgraph Sample
      A3[Pipeline 1] --> C3{Random Select}
      B3[Pipeline 2] --> C3
      C3 --> D3[Output]
   end
    

More Features

Shuffling

fairseq2 provides flexible shuffling capabilities through the shuffle operator:

# Basic shuffling with a window size
pipeline = (
    read_sequence(data)
    .shuffle(shuffle_window=1000)  # Shuffle using a 1000-example buffer
    .and_return()
)

# Shuffle between epochs
for epoch in range(3):
    pipeline.reset()  # By default, this re-shuffles data
    for item in pipeline:
        process(item)

# Disable shuffling between epochs
pipeline.reset(reset_rng=True)  # Keep the same order

The shuffle operator maintains a buffer of the specified size. When requesting the next example, it randomly samples from this buffer and replaces the selected example with a new one from the source. Setting shuffle_window=0 loads all examples into memory for full shuffling.

Bucketing

Bucketing helps handle variable-length sequences efficiently. There are several bucketing strategies:

  1. Fixed-size Bucketing: Combine a fixed number of examples

pipeline = (
    read_sequence(data)
    .bucket(bucket_size=32, drop_remainder=True)  # Combine 32 examples into one bucket
    .and_return()
)
  1. Length-based Bucketing: Group sequences of similar lengths

from fairseq2.data import create_bucket_sizes

# Create optimal bucket sizes
bucket_sizes = create_bucket_sizes(
    max_num_elements=1024,   # Max elements per bucket
    max_seq_len=128,         # Max sequence length
    min_seq_len=1,           # Min sequence length
    num_seqs_multiple_of=8   # Ensure bucket sizes are multiples of 8
)

# Use bucketing in pipeline
pipeline = (
    read_sequence(data)
    .bucket_by_length(
        bucket_sizes,
        selector="length",             # Column containing sequence lengths
        skip_above_max_examples=True,  # Skip sequences longer than max_seq_len
        drop_remainder=False           # Keep partial buckets
    )
    .and_return()
)
  1. Dynamic Bucketing: Combine examples based on a cost function

def sequence_cost(example):
    return len(example["text"])

pipeline = (
    read_sequence(data)
    .dynamic_bucket(
        threshold=1024,        # Target bucket size
        cost_fn=sequence_cost, # Function to compute example cost
        min_num_examples=16,   # Min examples per bucket
        max_num_examples=64,   # Max examples per bucket
        drop_remainder=False   # Keep partial buckets
    )
    .and_return()
)

This approach efficiently handles variable-length sequences while maintaining appropriate batch sizes for training.

There are more features in fairseq2’s data pipeline:

  • Prefetching: Load data ahead of time for better performance

  • State Management: Save and restore pipeline state for resumable processing

Note

When combining pseudo-infinite pipelines with finite ones, the pseudo-infinite pipeline will stop when the finite pipeline ends. For truly infinite behavior, use repeat() without arguments.

For more technical details, see fairseq2.data.data_pipeline.