Data Pipelines

The dataloader will be able to leverage several threads, working around Python Global Interpreter Lock limitations, and also providing better performance than a pure Python dataloader.

Building a data pipeline looks like this:

data = (
    text.read_text("file.tsv")
    .map(lambda x: str(x.split("\t")[1]).lower())
    .filter(lambda x: len(x) < 10)
)

Column syntax

The data items going through the pipeline don’t have to be flat tensors, but can be tuples, or python dictionaries. Several operators have a syntax to specify a specific column of the input data. Notably the DataPipelineBuilder.map operator has a selector argument to choose the column to apply the function to.

If the data item is a tuple, then the selector "[3]" selects the third column. If the data item is a dictionary, then "foo" will select the value corresponding to the key "foo". You can nest selectors using . to separate key selectors, following a python-like syntax. For a data item {"foo": [{"x": 1, "y": 2}, {"x": 3, "y": 4, "z": 5}], "bar": 6}, the selector "foo[1].y" referes to the value 4.

Functions that accepts several selectors, accept them as a comma separated list of selectors. For example .map(lambda x: x * 10, selector="foo[1].y,bar") will multiply the values 4 and 6 by 10, but leave others unmodified.

Pseudo-infinite and Infinite Pipelines

The DataPipeline.count and DataPipeline.constant static methods create pseudo-infinite pipelines. When used with operators that combine multiple pipelines (e.g. DataPipeline.sample, DataPipeline.round_robin, DataPipeline.zip), they will only yield examples as long as the other pipelines yield examples.

For example:

from fairseq2.data import DataPipeline, read_sequence

pipeline1 = DataPipeline.constant(0).and_return()
pipeline2 = read_sequence([1, 2, 3]).and_return()

for example in DataPipeline.round_robin(pipeline1, pipeline2).and_return():
    print(example)

only produces 0, 1, 0, 2, 0, 3.

Infinite pipelines (pipelines created through DataPipelineBuilder.repeat with no arguments) do not exhibit this behavior; they will yield examples indefinitely even when combined with other pipelines.

For example:

from fairseq2.data import DataPipeline, read_sequence

pipeline1 = read_sequence([0]).repeat().and_return()
pipeline2 = read_sequence([1, 2, 3]).and_return()

for example in DataPipeline.round_robin(pipeline1, pipeline2).and_return():
    print(example)

produces 0, 1, 0, 2, 0, 3, 0, 1, 0, 2, 0, 3… indefinitely.