DataPipelineBuilder

class fairseq2.data.DataPipelineBuilder[source]

Bases: object

API to create DataPipeline

and_return(max_num_warnings=0)[source]

Return a new DataPipeline instance.

Return type:

DataPipeline

bucket(bucket_size, drop_remainder=False)[source]

Combine a number of consecutive examples into a single example.

Parameters:
  • bucket_size (int) – The number of examples to combine.

  • drop_remainder (bool) – If True, drops the last bucket in case it has fewer than bucket_size examples.

Return type:

Self

bucket_by_length(bucket_sizes, selector=None, drop_remainder=False)[source]

Combine examples of similar shape into batches.

Return type:

Self

collate(pad_value=None, pad_to_multiple=1, overrides=None)[source]

Concatenate a list of inputs into a single inputs.

This is equivalent to calling .map(Collater()). See fairseq2.data.Collater for details.

Return type:

Self

filter(predicate)[source]

Filter examples from data pipeline and keep only those who match predicate.

Parameters:

predicate (Callable[[Any], Any]) – The predicate used to select examples to keep.

Return type:

Self

map(fn, selector=None, num_parallel_calls=1)[source]

Apply fn to each example.

Example usage:

data = [2, 5]
data.map(lambda x: x + 10)
# yields: 12, 15
data.map(lambda x: x + 10, num_parallel_calls=8)
# same results but will use more cores
data = [{"a": 2, "b": 1}, {"a": 5, "b": 3}]
data.map(lambda x: x + 10, selector="a")
# yields: {"a": 12, "b": 1}, {"a": 15, "b": 3}
data.map(lambda x: x + 10, selector="a,b")
# yields: {"a": 12, "b": 11}, {"a": 15, "b": 13}
Parameters:
  • fn (Callable[[Any], Any] | Sequence[Callable[[Any], Any]]) – The function to apply. If it’s a list of function, they will be automatically chained. .map([f1, f2]) is the more efficient version of .map(f1).map(f2)

  • selector (str | None) – The column to apply the function to. Several colums can be specified by separating them with a “,”. See Column syntax for more details.

  • num_parallel_calls (int) – The number of examples to process in parallel.

Return type:

Self

prefetch(num_examples)[source]

Prefetch examples in the background while the current example is being processed.

Parameters:

num_examples (int) – The number of examples to prefetch.

Return type:

Self

shard(shard_idx, num_shards)[source]

Read only 1/num_shards of the examples in the data pipeline.

Parameters:
  • shard_idx (int) – The shard index.

  • num_shards (int) – The number of shards.

Return type:

Self

shuffle(shuffle_window, strict=True, enabled=True)[source]

Shuffle examples using a fixed sized buffer.

Parameters:
  • shuffle_window (int) – The size of the intermediate buffer used for shuffling. Examples will be randomly sampled from this buffer, and selected examples will be replaced with new examples. If 0, all examples will be loaded into memory for full shuffling.

  • strict (bool) – If True, the intermediate shuffle buffer will be saved as part of state_dict. This ensures that on preemption no example will be lost, but for large buffers this can significantly increase the state size and the time to restore the data pipeline.

  • enabled (bool) – If False, disables shuffling.

Return type:

Self

skip(num_examples)[source]

Skip num_examples examples.

Return type:

Self

take(num_examples)[source]

Return at most num_examples examples.

Return type:

Self

yield_from(fn)[source]

Map every example to a data pipeline and yield the examples returned from the mapped data pipelines.

Parameters:

fn (Callable[[Any], DataPipeline]) – The function to map examples to data pipelines.

Return type:

Self