DataPipelineBuilder

final class fairseq2.data.DataPipelineBuilder[source]

Bases: object

API to create DataPipeline

and_return(max_num_warnings=0)[source]

Return a new DataPipeline instance.

Return type:

DataPipeline

bucket(bucket_size, drop_remainder=False)[source]

Combine a number of consecutive examples into a single example.

Parameters:
  • bucket_size (int) – The number of examples to combine.

  • drop_remainder (bool) – If True, drops the last bucket in case it has fewer than bucket_size examples.

Return type:

Self

bucket_by_length(bucket_sizes, selector=None, min_data_len=1, skip_below_min_examples=False, skip_above_max_examples=False, drop_remainder=False)[source]

Combine examples of similar shape into batches.

Return type:

Self

collate(pad_value=None, pad_to_multiple=1, overrides=None)[source]

Concatenate a list of inputs into a single inputs.

This is equivalent to calling .map(Collater()). See fairseq2.data.Collater for details.

Return type:

Self

dynamic_bucket(threshold, cost_fn, min_num_examples=None, max_num_examples=None, drop_remainder=False)[source]

Combine a number of consecutive examples into a single example based on cumulative cost of examples, as measured by user-provided cost_fn.

Yields a bucket once the cumulative cost produced by cost_fn meets or exceeds threshold.

Parameters:
  • threshold (float) – Threshold for cumulative cost to trigger bucketing.

  • cost_fn (Callable[[Any], float]) – Cost function that outputs cost for a particular example.

  • min_num_examples (int | None) – Minimum number of examples per bucket.

  • max_num_examples (int | None) – Maximum number of examples per bucket.

  • drop_remainder (bool) – If True, drops the last bucket in case it has fewer than min_num_examples examples or the cumulative cost has not reached threshold yet.

Return type:

Self

filter(predicate)[source]

Filter examples from data pipeline and keep only those who match predicate.

Parameters:

predicate (Callable[[Any], Any]) – The predicate used to select examples to keep.

Return type:

Self

map(fn, selector=None, num_parallel_calls=1)[source]

Apply fn to each example.

Example usage:

data = [2, 5]
data.map(lambda x: x + 10)
# yields: 12, 15
data.map(lambda x: x + 10, num_parallel_calls=8)
# same results but will use more cores
data = [{"a": 2, "b": 1}, {"a": 5, "b": 3}]
data.map(lambda x: x + 10, selector="a")
# yields: {"a": 12, "b": 1}, {"a": 15, "b": 3}
data.map(lambda x: x + 10, selector="a,b")
# yields: {"a": 12, "b": 11}, {"a": 15, "b": 13}
Parameters:
  • fn (Callable[[Any], Any] | Sequence[Callable[[Any], Any]]) – The function to apply. If it’s a list of function, they will be automatically chained. .map([f1, f2]) is the more efficient version of .map(f1).map(f2)

  • selector (str | None) – The column to apply the function to. Several columns can be specified by separating them with a “,”. See Column syntax for more details.

  • num_parallel_calls (int) – The number of examples to process in parallel.

Return type:

Self

prefetch(num_examples)[source]

Prefetch examples in the background while the current example is being processed.

Parameters:

num_examples (int) – The number of examples to prefetch.

Return type:

Self

repeat(num_repeats=None, reset_rng=False)[source]

Repeats the sequence of pipeline examples num_repeats times.

Parameters:
  • num_repeats (int | None) – The number of times to repeat examples. If None, repeats infinitely.

  • reset_rng (bool) – If True, upon repeats, resets all random number generators in pipeline.

Return type:

Self

shard(shard_idx, num_shards, allow_uneven=False)[source]

Read only 1/num_shards of the examples in the data pipeline.

Parameters:
  • shard_idx (int) – The shard index.

  • num_shards (int) – The number of shards.

Return type:

Self

shuffle(shuffle_window, seed=None)[source]

Shuffle examples using a fixed sized buffer.

Parameters:

shuffle_window (int) – The size of the intermediate buffer used for shuffling. Examples will be randomly sampled from this buffer, and selected examples will be replaced with new examples. If 0, all examples will be loaded into memory for full shuffling.

Return type:

Self

skip(num_examples)[source]

Skip num_examples examples.

Return type:

Self

take(num_examples)[source]

Return at most num_examples examples.

Return type:

Self

yield_from(fn)[source]

Map every example to a data pipeline and yield the examples returned from the mapped data pipelines.

Parameters:

fn (Callable[[Any], DataPipeline]) – The function to map examples to data pipelines.

Return type:

Self