Building High-Performance DataLoaders ===================================== The :py:class:`~spdl.pipeline.Pipeline` operates on iterables and applies a series of functions to process data efficiently. While the ``Pipeline`` class implements the ``Iterable`` protocol and can be used directly in ``for batch in pipeline:`` loops, many ML practitioners prefer a PyTorch-style DataLoader interface that they are familiar with. This guide explains how to build high-performance data loading solutions using SPDL pipelines, and highlights the key conceptual differences from PyTorch's approach. Understanding the Paradigm Shift --------------------------------- The fundamental difference between SPDL and PyTorch's data loading approach lies in how data processing is structured and executed. PyTorch DataLoader Architecture ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In PyTorch, the typical data loading pattern uses three components: 1. **Sampler** - Generates indices for data access 2. **Dataset** - Maps an index to processed data (typically a Tensor) 3. **DataLoader** - Handles batching, multiprocessing, and prefetching The following diagram illustrates the PyTorch DataLoader structure: .. mermaid:: flowchart TB subgraph P[PyTorch DataLoader] direction TB Sampler --> DataSet[Dataset: Map int → Tensor] DataSet --> Buffer[Prefetch Buffer] end In this model, the Dataset encapsulates all data processing logic—from loading raw data to producing final Tensors. This monolithic approach has limitations: - All processing happens in a single ``__getitem__`` call - Different types of operations (I/O, CPU processing, memory operations) are not distinguished - Limited opportunities for fine-grained concurrency control - Difficult to optimize individual processing stages SPDL Pipeline Architecture ~~~~~~~~~~~~~~~~~~~~~~~~~~~ SPDL takes a different approach by decomposing data processing into discrete stages: .. mermaid:: flowchart TB subgraph S[SPDL Pipeline] direction TB src[Source: Iterator] subgraph stages[Processing Stages] direction TB p1[Stage 1: I/O Operations
e.g., Load from storage] p2[Stage 2: Decoding
e.g., Decode image/video] p3[Stage 3: Preprocessing
e.g., Resize, normalize] p1 --> p2 --> p3 end src --> stages stages --> sink[Sink Buffer] end This decomposition enables: - **Separation of concerns**: Different operations can be configured independently - **Optimized concurrency**: I/O-bound and CPU-bound operations can use different execution strategies - **Better resource utilization**: Each stage can be tuned for its specific workload - **Reduced memory overhead**: Data stays in efficient formats until batching The Key Insight: Split I/O and CPU Work ---------------------------------------- To achieve high performance with SPDL, you must split your data loading logic into stages based on their resource requirements: **I/O-Bound Operations** (Network, Disk) - Fetching data from remote storage - Reading files from disk - Database queries - Best executed with high concurrency (many threads or async operations) **CPU-Bound Operations** (Computation) - Image/video decoding - Data transformations - Preprocessing and augmentation - Best executed with moderate concurrency (matching CPU cores) **Memory Operations** (Data Movement) - Batching individual items - Converting to contiguous memory - Device transfers (CPU → GPU) - Often best executed serially or with low concurrency Restructuring PyTorch Datasets for SPDL ---------------------------------------- Consider a typical PyTorch Dataset: .. code-block:: python class MyDataset: def __init__(self, data_urls: list[str]): self.data_urls = data_urls def __len__(self) -> int: return len(self.data_urls) def __getitem__(self, index: int) -> torch.Tensor: # Everything happens here - I/O, decoding, preprocessing url = self.data_urls[index] raw_data = download(url) # I/O-bound decoded = decode_image(raw_data) # CPU-bound processed = resize_image(decoded) # CPU-bound return torch.tensor(processed) # Memory operation To use this with SPDL effectively, decompose it into a pipeline builder function: .. code-block:: python from typing import Any, Callable from collections.abc import Iterable from spdl.pipeline import PipelineBuilder from spdl.source import DistributedRandomSampler import spdl.io import torch import numpy as np def build_data_pipeline( catalog: list[str], sampler: Iterable[int], *, load_fn: Callable[[int], Any], preprocess_fn: Callable[[Any], Any] | None = None, collate_fn: Callable[[list[Any]], torch.Tensor], batch_size: int, drop_last: bool = False, num_io_workers: int = 16, num_cpu_workers: int = 8, buffer_size: int = 10, ) -> PipelineBuilder: """Build a pipeline that processes data in stages. Args: catalog: List of data identifiers (e.g., URLs, file paths, database keys) sampler: An iterable that yields indices (e.g., DistributedRandomSampler) load_fn: Function to load data given an index preprocess_fn: Optional function to preprocess loaded data collate_fn: Function to collate a list of items into a batch batch_size: Number of items per batch drop_last: Whether to drop the last incomplete batch num_io_workers: Concurrency level for I/O operations num_cpu_workers: Concurrency level for CPU processing buffer_size: Size of the prefetch buffer Returns: A configured PipelineBuilder ready to build pipelines """ # Build the pipeline with the provided sampler builder = ( PipelineBuilder() .add_source(sampler) # Stage 1: I/O operations (high concurrency) .pipe( lambda idx: load_fn(catalog[idx]), concurrency=num_io_workers, output_order="completion", # Don't wait for slow requests ) ) # Stage 2: CPU processing (moderate concurrency, if provided) if preprocess_fn is not None: builder.pipe( preprocess_fn, concurrency=num_cpu_workers, output_order="input", # Maintain order ) # Stage 3: Batching and collation builder.aggregate(batch_size, drop_last=drop_last) builder.pipe(collate_fn) # Stage 4: GPU transfer (serial execution) builder.pipe(spdl.io.transfer_tensor) # Prefetch buffer builder.add_sink(buffer_size) return builder .. note:: The :py:func:`spdl.io.transfer_tensor` function combines and encapsulates multiple operations required to transfer data from CPU to GPU in the background without interrupting model computation in the default CUDA stream. This includes the "pin memory" operation, which moves data to page-locked memory regions for faster transfer. Unlike PyTorch's DataLoader, there is no separate ``pin_memory`` parameter—this optimization is built into ``transfer_tensor``. Now we can use this function to implement the equivalent of ``MyDataset``: .. code-block:: python # Define the processing functions that match MyDataset's logic def load_data(url: str) -> bytes: return download(url) # I/O-bound def process_data(raw_data: bytes) -> np.ndarray: decoded = decode_image(raw_data) # CPU-bound processed = resize_image(decoded) # CPU-bound return processed def collate_batch(items: list[np.ndarray]) -> torch.Tensor: return torch.stack([torch.tensor(item) for item in items]) # Build the pipeline with a sampler catalog = ["http://example.com/image1.jpg", "http://example.com/image2.jpg", ...] sampler = DistributedRandomSampler(len(catalog)) builder = build_data_pipeline( catalog, sampler, load_fn=load_data, preprocess_fn=process_data, collate_fn=collate_batch, batch_size=32, num_io_workers=16, num_cpu_workers=8, ) # Create and use the pipeline pipeline = builder.build(num_threads=1) with pipeline.auto_stop(): for batch in pipeline: # batch is a torch.Tensor on GPU, ready for training train_step(batch) Building a DataLoader-Style Interface -------------------------------------- Now that we've seen how to build a pipeline from a PyTorch Dataset, we can wrap this pattern in a reusable DataLoader class. This class follows the same structure as the ``build_data_pipeline`` function above, but provides a familiar PyTorch-style interface. .. note:: The DataLoader uses :py:class:`~spdl.source.DistributedRandomSampler` as the data source, which provides built-in support for distributed training. This sampler automatically handles data partitioning across multiple processes/nodes, ensuring each worker processes a unique subset of the data. Here's a complete implementation: .. code-block:: python from collections.abc import Iterator from typing import Callable, TypeVar from spdl.pipeline import PipelineBuilder from spdl.source import DistributedRandomSampler import spdl.io T = TypeVar('T') class DataLoader: """A PyTorch-style DataLoader built on SPDL Pipeline. This implementation follows the same staged approach as the build_data_pipeline function, with separate stages for I/O, CPU processing, batching, and GPU transfer. """ def __init__( self, data_source: list, *, # Data processing functions load_fn: Callable[[int], T], preprocess_fn: Callable[[T], T] | None = None, collate_fn: Callable[[list[T]], torch.Tensor], # Batching batch_size: int, drop_last: bool = False, # Concurrency num_io_workers: int = 8, num_cpu_workers: int = 4, # Buffering buffer_size: int = 10, ): self.batch_size = batch_size self.drop_last = drop_last # Create sampler once for distributed training support self._sampler = DistributedRandomSampler(len(data_source)) # Compute and cache the number of batches num_samples = len(self._sampler) if drop_last: self._num_batches = num_samples // batch_size else: self._num_batches = (num_samples + batch_size - 1) // batch_size # Build the pipeline builder using build_data_pipeline self._builder = build_data_pipeline( data_source, self._sampler, load_fn=load_fn, preprocess_fn=preprocess_fn, collate_fn=collate_fn, batch_size=batch_size, drop_last=drop_last, num_io_workers=num_io_workers, num_cpu_workers=num_cpu_workers, buffer_size=buffer_size, ) def __len__(self) -> int: """Return the number of batches in the dataloader.""" return self._num_batches def __iter__(self) -> Iterator: # Build a fresh pipeline for each iteration pipeline = self._builder.build(num_threads=1) with pipeline.auto_stop(): yield from pipeline Usage example: .. code-block:: python # Define your processing functions def load_image(url: str) -> bytes: # I/O operation return download_from_url(url) def preprocess_image(data: bytes) -> np.ndarray: # CPU operations img = decode_image(data) img = resize(img, (224, 224)) return img def collate_images(images: list[np.ndarray]) -> torch.Tensor: return torch.stack([torch.from_numpy(img) for img in images]) # Create the dataloader dataloader = DataLoader( data_source=image_urls, load_fn=load_image, preprocess_fn=preprocess_image, collate_fn=collate_images, batch_size=32, num_io_workers=16, num_cpu_workers=8, ) # Use it like PyTorch DataLoader for batch in dataloader: # batch is a torch.Tensor of shape (32, 224, 224, 3) on GPU train_step(batch) Best Practices -------------- 1. **Profile Your Pipeline** SPDL provides powerful profiling tools to identify bottlenecks in your data loading pipeline. Use :py:func:`spdl.pipeline.profile_pipeline` to get detailed performance metrics for each stage: .. code-block:: python from spdl.pipeline import profile_pipeline # Profile the pipeline to see stage-by-stage performance profile_pipeline( builder.get_config(), num_iterations=100, # Number of batches to profile ) The :py:func:`~spdl.pipeline.profile_pipeline` function executes your pipeline for a specified number of iterations and reports detailed statistics including: - Throughput (items/second) for each stage - Time spent in each stage - Queue utilization between stages - Bottleneck identification For continuous monitoring during training, you can also enable runtime statistics: .. code-block:: python pipeline = builder.build( num_threads=1, report_stats_interval=5.0, # Report every 5 seconds ) For more details on performance analysis, see :doc:`../optimization_guide/analysis`. 2. **Tune Concurrency Levels** - Start with high concurrency for I/O operations (16-32 workers) - Use moderate concurrency for CPU operations (4-8 workers, matching CPU cores) - Keep CPU-to-GPU data transfer serial (no concurrency) because the underlying hardware does not support concurrent data transfer in one direction, and you want to avoid creating too many CUDA streams 3. **Choose Appropriate Output Order** - Use ``output_order="completion"`` for I/O stages to avoid head-of-line blocking - Use ``output_order="input"`` for preprocessing to maintain deterministic ordering 4. **Understanding Buffer Size** The ``buffer_size`` parameter in SPDL is somewhat analogous to PyTorch DataLoader's ``prefetch_factor``, but with important differences: **Key Differences:** - **Scaling behavior**: In PyTorch, the total number of prefetched batches is ``prefetch_factor × num_workers``. In SPDL, ``buffer_size`` is independent of the number of workers—it simply sets the sink buffer capacity. - **Performance impact**: In PyTorch, ``prefetch_factor`` directly affects data loading performance by controlling how many batches are prepared ahead of time. In SPDL, the pipeline continuously tries to fill all queues between stages, and the stage concurrency parameters act as the effective prefetch at each stage. The ``buffer_size`` does **not** affect pipeline performance—if the pipeline is fast enough, the buffer stays filled; if it's slow, the buffer remains mostly empty regardless of its size. **Recommendation**: Set ``buffer_size`` based on memory constraints rather than performance tuning. A value of 2-10 is typically sufficient. The actual prefetching happens at each pipeline stage based on the ``concurrency`` parameter. 5. **Consider Using Async Operations** For I/O-bound operations, async functions can be more efficient than synchronous alternatives. SPDL natively supports async functions—simply pass them directly to the pipeline: .. code-block:: python async def async_load(url: str) -> bytes: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.read() builder.pipe( async_load, concurrency=32, # Can handle many concurrent requests ) .. important:: **Do not wrap async functions in synchronous wrappers.** It's common to see async functions converted to sync by launching and shutting down an event loop at each invocation (e.g., using ``asyncio.run()``). This is inefficient and unnecessary in SPDL. The pipeline manages the event loop internally, so you should pass async functions as-is. SPDL will execute them efficiently using a shared event loop. Comparison with PyTorch DataLoader ----------------------------------- .. list-table:: :header-rows: 1 :widths: 30 35 35 * - Aspect - PyTorch DataLoader - SPDL Pipeline * - Processing Model - Monolithic Dataset - Staged Pipeline * - Concurrency - Process-based (multiprocessing) - Multi-threading, multiprocessing, async, and sub-interpreters (Python 3.14+) * - Initialization Overhead - High (dataset copied to each worker process) - Low * - Memory Overhead - High (each worker process holds a copy of the dataset) - Low * - Configurability - Limited (global num_workers) - Fine-grained (per-stage) * - I/O Optimization - Limited - Granular control with native asyncio support for high throughput * - Learning Curve - Familiar to PyTorch users - Requires understanding stages When to Use SPDL ---------------- SPDL pipelines are particularly beneficial when: - Your data loading involves significant I/O operations (network, remote storage) - You need fine-grained control over different processing stages - Memory efficiency is important (large batches, limited RAM) - You want to optimize for throughput in production environments - Your preprocessing involves mixed I/O and CPU operations For simple datasets with minimal I/O and preprocessing, PyTorch's DataLoader may be sufficient. However, as your data loading becomes more complex, SPDL's staged approach provides better performance and flexibility. Next Steps ---------- - See :doc:`../migration/pytorch` for detailed migration examples - Explore :doc:`../case_studies/index` for real-world use cases - Read :doc:`../optimization_guide/index` for performance tuning