Parquet is a popular binary columnar file format optimized for data storage and large-scale distributed data processing.
Note
Consider using the Parquet format when dealing with complex or nested data structures, such as embeddings, tokens, audio files, bytes, etc., as it enables their efficient and self-contained representation.
A Parquet dataset is a collection of Parquet files that can be partitioned or not. Each Parquet file consists of row groups. Roughly speaking, a row group is the smallest piece of a Parquet file that can be read into memory. Since Parquet is a columnar format, we can flexibly and efficiently read only a subset of columns from a row group.
Note
The row group size, when writing a Parquet file, should be chosen carefully to balance the trade-off between memory usage, read performance, and shuffling quality. As a rule of thumb, a good initial recommendation is to adjust the row group size so that each row group is between 50MB and 500MB.
This module provides an efficient and flexible data loading pipeline for Apache Parquet datasets in fairseq2.
The present tooling is general purpose and can be combined with various downstream workflows for large-scale machine learning workloads with features like sharding, filtering, column selection, and dynamic batching.
Requirements: Install the Arrow dependencies with pipinstallfairseq2[arrow], since we rely on the
pyarrow library to interface with parquet files.
fromfairseq2.data.parquet.fragment_streamingimport(FragmentStreamingConfig,ParquetFragmentStreamer)# Simple configurationconfig=FragmentStreamingConfig(parquet_path="/path/to/dataset/",nb_epochs=2,split_to_row_groups=True,# Work with row groups instead of files. Set to False makes a fragment correspond to a file.fragment_shuffle_window=100,# Shuffle within a window of 100 fragmentsseed=42,# For reproducibility)# Create the streamerstreamer=ParquetFragmentStreamer(config=config)# Build a pipeline for a specific rank/world_size (for distributed training)fragment_pipeline=streamer.build_pipeline(rank=0,world_size=1).and_return()
# No shufflingconfig=FragmentStreamingConfig(...,fragment_shuffle_window=0)# Global shuffling (requires loading all metadata up front)config=FragmentStreamingConfig(...,fragment_shuffle_window=-1)# Local shuffling within a window (faster start time)config=FragmentStreamingConfig(...,fragment_shuffle_window=100)# Circular shift for distributed trainingconfig=FragmentStreamingConfig(...,files_circular_shift=True,# Different ranks start at different positionsfragment_shuffle_window=100)
Note
How shuffling works:
For non-zero positive fragment_shuffle_window value, all dataset files will be shuffled globally (and this shuffling will be different from one epoch to another).
Next, each file will be split into row groups and shuffled locally within fragment_shuffle_window.
Note that the global shuffling needs all parquet files’ metadata upfront, which can be expensive for remote large datasets.
However, if fragment_shuffle_window is set to a small value (e.g. ~ average number of fragments per file * 5), the time to the first batch will be shorter.
The metadata fetching will be done on the fly in that case.
Also note that the shuffling behavior is seeded to be completely deterministic by the seed parameter.
Thus if one resets a pipeline with the same seed value, the exactly same shuffling will be applied.
We can shard a dataset at fragment level using the rank and world_size parameters in build_pipeline:
# Create a pipeline for a specific rank in distributed trainingpipeline=streamer.build_pipeline(rank=rank,world_size=world_size)
This sharding will typically be uneven – different ranks may receive different numbers of rows.
Therefore we recommend using nb_epochs=None for infinite loops in large training runs.
Alternatively, if parquet dataloading is not the bottleneck, you can stream all fragments without sharding – load them in memory and only then shard them at the row level to get more uniform sharding.
Make sure that the filters are applied to the partition columns.
If you want to apply filters to non-partition columns, you will need to apply the filters during the loading process.
Use the NamedColumns class to define which columns to load and how to rename them:
fromdataclassesimportdataclass,fieldfromtypingimportListfromfairseq2.data.parquet.fragment_loadingimport(FragmentLoadingConfig,NamedColumns,ParquetFragmentLoader)@dataclassclassMyColumns(NamedColumns):# Format: new_name: original_column_nametext:str="source_text"label:str="target_label"extra_columns:List[str]=field(default_factory=lambda:["metadata","timestamp"])# Create the loading configloading_config=FragmentLoadingConfig(columns=MyColumns(),add_fragment_traces=True,# Add tracking columnsdrop_null=True,# Drop rows with null valuesnb_prefetch=2,# Prefetch 2 fragmentsnum_parallel_fragments=4,# Process 4 fragments in parallel)# Build the loading pipelineloader=ParquetFragmentLoader(config=loading_config).build_pipeline(fragment_pipeline)
The TableBucketingConfig controls how tables are combined and batched:
fromfairseq2.data.parquet.table_bucketingimport(TableBucketingConfig,TableBucketer)# Create bucketing configbucketing_config=TableBucketingConfig(target_table_size=1000,# Aim for tables with 1000 rowsmin_fragment_number=2,# Combine at least 2 fragmentsmax_fragment_number=10,# Combine at most 10 fragmentsshuffle=True,# Shuffle rows in memorybatch_size=32# Return batches of 32 rows)# Apply bucketingbucketer=TableBucketer(bucketing_config)final_pipeline=bucketer.apply(loading_pipeline)# Iterate through batchesforbatchinfinal_pipeline.and_return():# batch is a PyArrow Tableprint(batch.column_names)print(len(batch))
fromfairseq2.data.parquetimport(BasicDataLoadingConfig,build_basic_parquet_data_pipeline,FragmentStreamingConfig,FragmentLoadingConfig,TableBucketingConfig)# Configure the entire pipelineconfig=BasicDataLoadingConfig(fragment_stream_config=FragmentStreamingConfig(parquet_path="/path/to/dataset/",partition_filters='pc.field("split") == "train"',nb_epochs=None,# Infinite iterationsfragment_shuffle_window=100),fragment_load_config=FragmentLoadingConfig(columns=MyColumns(),nb_prefetch=2,num_parallel_fragments=3),table_bucketing_config=TableBucketingConfig(target_table_size=1000,min_fragment_number=2,max_fragment_number=10,shuffle=True,batch_size=32),)# Create the pipelinepipeline=build_basic_parquet_data_pipeline(config).and_return()# Use the pipelineforbatchinpipeline:# Process the batchpass
importtorch.distributedasdist# Get distributed inforank=dist.get_rank()world_size=dist.get_world_size()config=BasicDataLoadingConfig(fragment_stream_config=FragmentStreamingConfig(parquet_path="/path/to/dataset/",fragment_shuffle_window=100,files_circular_shift=True# Different ranks start at different positions),# ... other configs)# Create a pipeline for this rankpipeline=build_basic_parquet_data_pipeline(config,rank=rank,world_size=world_size).and_return()
PyArrow tables can be converted to various formats:
# Convert to pandasdf=batch.to_pandas()# Convert to dictionarybatch_dict=batch.to_pydict()# Convert to torch tensorsfromfairseq2.data.parquet.utilsimportpyarrow_table_to_torch_dicttensor_dict=pyarrow_table_to_torch_dict(batch)# Using Polars (fast with zero-copy)importpolarsasplpolars_df=pl.from_arrow(batch,rechunk=False)# Convert to list of dictionaries (rows)rows=batch.to_pylist()# Or using polars (usually much faster)rows=pl.from_arrow(batch,rechunk=False).to_dicts()
Note
Using polars, one can use pl.from_arrow(pa_table,rechunk=False) to convert into a polars dataframe (with almost zero memory copy)
pa.Table.to_pylist() or pl.from_arrow(...).to_dicts() (usually much faster) to convert into a list of dictionaries
parquet/utils.py:pyarrow_table_to_torch_dict to convert pyarrow table into a dictionary of cpu torch tensors (best effort)
config=FragmentStreamingConfig(# Avoid global shuffling which requires loading all metadatafragment_shuffle_window=200,# Use local shufflingsplit_to_row_groups=True,# Work with smaller row groups)loading_config=FragmentLoadingConfig(# Only load needed columnscolumns=MyColumns(text="source",label="target"),# Cache data to reduce memory usage with large remote datasetscache=True,# Parallelize fragment loadingnum_parallel_fragments=4,# Prefetch to hide I/O latencynb_prefetch=2)
loading_config=FragmentLoadingConfig(# Enable caching to disk for large tablescache=True,# Parallelize and prefetch for efficiencynum_parallel_fragments=2,# Column pruning to reduce memory footprintcolumns=MyColumns(text="source")# Only load needed columns)bucketing_config=TableBucketingConfig(# Control memory usage directly (in MB)target_table_memory=250,# Limit each table to 250MB# Set boundaries for fragment combiningmin_fragment_number=1,max_fragment_number=5,# Apply smaller batches for processingbatch_size=16,# Enable memory mapping for large tablescache=True,# Consider setting combine_chunks=False for very large datasetscombine_chunks=True)
The target_table_memory parameter provides direct control over the memory footprint:
Specified in megabytes (MB)
Controls how many fragments are loaded and concatenated before processing
Adapts to data complexity (variable-length text, lists, etc.)
More predictable memory peaks than row-based approaches
Better handles cases where row count doesn’t correlate linearly with memory usage
As alternatives, you can also use:
target_table_size: Controls the number of rows (instead of memory)
target_total_length: Controls the total token length across all columns
For maximum memory efficiency, combine with:
Memory mapping: cache=True to store tables on disk
Column pruning: Only load needed columns
Chunk management: Consider combine_chunks=False for very large datasets
You can apply custom transformations to the pipeline:
fromfairseq2.data.parquet.arrow_transformimportfilter_strings_by_length# Create a custom transformationdefmy_transform(table):# Apply filtering by text lengthtable=filter_strings_by_length(table,"text",min_len=10,max_len=1000)returntable# Apply the transformationfinal_pipeline=loading_pipeline.map(my_transform)
fairseq2’s parquet dataloader can easily integrate with datasets from the Hugging Face Hub that are available in parquet format. This integration leverages the huggingface_hub package’s HfFileSystem to seamlessly access parquet files stored on the Hub.
fromfairseq2.data.parquet.fragment_streamingimport(FragmentStreamingConfig,ParquetFragmentStreamer)fromfairseq2.data.parquet.fragment_loadingimport(FragmentLoadingConfig,ParquetFragmentLoader)# Initialize the Hugging Face FileSystemfromhuggingface_hubimportHfFileSystemhf_fs=HfFileSystem()# FileSystem interface for HF# Get dataset files from Hugging Face Hubsource_dataset_glob_path="datasets/cais/mmlu/*/*.parquet"all_paths=hf_fs.glob(source_dataset_glob_path)# Find all parquet filestest_paths=[pathforpathinall_pathsif"test-"inpath]# Optional filtering# Configure the fragment streamingfragment_config=FragmentStreamingConfig(parquet_path=test_paths,nb_epochs=1,filesystem=hf_fs,# Provide the Hugging Face filesystemsplit_to_row_groups=True,fragment_shuffle_window=0,# No shuffling in this example)streamer=ParquetFragmentStreamer(config=fragment_config)# Configure the fragment loadingloading_config=FragmentLoadingConfig(columns=None,# Use all columnsadd_fragment_traces=False,drop_null=False,nb_prefetch=1,num_parallel_fragments=4,filters='pc.field("answer") == 0',# Optional filtering)# Build the pipelineloader=ParquetFragmentLoader(config=loading_config)fragment_pipeline=streamer.build_pipeline(0,1)loading_pipeline=loader.apply(fragment_pipeline)# Process the resultstables=list(iter(loading_pipeline.and_return()))# Process tables as needed# Examples:# - Convert to pandas: df = table.to_pandas()# - Convert to polars (efficient): pl.from_arrow(table)
Benefits of Using Hugging Face Datasets with fairseq2¶
No Download Required: Access datasets directly from Hugging Face Hub without manually downloading them first
Efficient and Resilient Loading: Only load the requested dataset, and auto-retry (using SafeFragment) when network issues or expired authentication tokens interrupt the data loading
Memory Efficiency: Stream data without loading entire datasets into memory
High Performance: Leverage the optimized data loading pipeline of fairseq2
This integration is particularly useful for large-scale datasets like multilingual text corpora, embedding collections, or multimodal datasets where efficient data handling is crucial.