spdl.dataloader.DataLoader

class DataLoader(src: Iterable[Source] | AsyncIterable[Source], *, preprocessor: Callable[[Source], T] | Callable[[Source], Iterable[T]] | Callable[[Source], Awaitable[T]] | Callable[[Source], AsyncIterable[T]] | None = None, batch_size: int | None = None, drop_last: bool = False, aggregator: Callable[[list[T]], Output] | Callable[[list[T]], Awaitable[Output]] | None = None, transfer_fn: Callable[[Output], Output] | None = None, buffer_size: int = 3, num_threads: int = 8, timeout: float | None = None, output_order: str = 'completion')[source]

A data preprocessing pipeline composed of source, preprocessing and aggregation.

It generates source items, preprocess them concurrently, and aggergates them and store the result in buffer.

┌────────┐
│ Source │
└─┬──────┘
  │
┌─▼─────────────────────┐
│ Preprocessing         │─┐
│                       │ │─┐
│ fn: Source -> T       │ │ │
│                       │ │ │
└──┬────────────────────┘ │ │
  │└──┬───────────────────┘ │
  │   └─────────────────────┘
  │
┌─▼─────────────────────┐
│ Aggregate             │─┐
│                       │ │─┐
│ fn: list[T] -> Output │ │ │
│                       │ │ │
└──┬────────────────────┘ │ │
  │└──┬───────────────────┘ │
  │   └─────────────────────┘
  │
┌─▼──────────────────────┐
│ Transfer               │
│                        │  * No concurrency, as GPUs do not support
│ fn: Output -> Output   │    transferring multiple data concurrently.
│                        │
└─┬──────────────────────┘
  │
┌─▼──────┐
│ Buffer │
└────────┘
Parameters:
  • src – Data source. An object impelements Iterable interface or AsyncIterable interface. Typically, a generator that yields file paths or URLs. To iterate over an object that implements Mapping protocol, and optionally with sampling, use MapIterator.

  • preprocessor – A [async] function or [async] generator that process the individual data source. Often times it loads data into array format.

  • aggregator – A [async] function that takes a set of preprocessed data and returns one item. Typically, batching and GPU transfer.

  • batch_size – The number of items to aggregate before it’s passed to the aggregator.

  • drop_last – If True and the number of source items are not divisible by batch_size, then drop the reminder.

  • transfer_fn – A function applied to the output of aggregator function. It is intended for transferring data to GPU devices. Since GPU device transfer does not support concurrent transferring, this function is executed in a single thread.

  • buffer_size – The number of aggregated items to buffer.

  • num_threads – The number of worker threads.

  • timeout – The timeout until the next item becomes available. Default behavior is to wait indefinitely.

  • output_order – If ‘completion’ (default), items processed by the preprocessor are passed to the aggregator in order of completion. If ‘input’, then they are passed to the aggregator in the order of the source input.

Exapmles:
>>> import spdl.io
>>> from spdl.io import CPUBuffer, CUDABuffer, ImageFrames
>>>
>>> import torch
>>> from torch import Tensor
>>>
>>> ##################################################################
>>> # Source
>>> ##################################################################
>>> def source(root_dir: str) -> Iterable[str]:
...     # Iterate the directory and find images.
...     yield from glob.iglob(f"{root_dir}/**/*.JPEG", recursive=True)
>>>
>>>
>>> ##################################################################
>>> # Preprocessor
>>> ##################################################################
>>> width, height, batch_size = 224, 224, 32
>>>
>>> # Filter description that scales the image and convert to RGB
>>> filter_desc = spdl.io.get_filter_desc(
...     scale_width=width,
...     scale_height=height,
...     pix_fmt="rgb24"
... )
>>>
>>> def decode_image(path: str) -> ImageFrames:
...     # Decode image and resize
...     packets = spdl.io.demux_image(path)
...     return spdl.io.decode_packets(packets, filter_desc=filter_desc)
...
>>>
>>> ##################################################################
>>> # Aggregator
>>> ##################################################################
>>> size = width * height * batch_size * 3
>>> storage = spdl.io.cpu_storage(size, pin_memory=True)
>>>
>>> def batchify(data: list[ImageFrames]) -> Tensor:
...     # Merge the decoded frames into the pre-allocated pinned-memory.
...     return spdl.io.convert_frames(data, storage=storage)
...
>>>
>>> ##################################################################
>>> # Transfer
>>> ##################################################################
>>> cuda_device_index = 0
>>> stream = torch.cuda.Stream(device=cuda_device_index)
>>> cuda_config = spdl.io.cuda_config(
...     device_index=cuda_device_index,
...     stream=stream.cuda_stream,
... )
>>>
>>> def transfer(cpu_buffer: CPUBuffer) -> CUDABuffer:
...     # Send to CUDA in a separate stream.
...     cuda_buffer = spdl.io.transfer_buffer(cpu_buffer, cuda_config=cuda_config)
...     # Cast to Torch Tensor type.
...     return spdl.io.to_torch(cuda_buffer)
...
>>>
>>> dataloader = DataLoader(
...     src=source(root_dir),
...     preprocessor=decode_image,
...     batch_size=batch_size,
...     aggregator=batchify,
...     transfer_fn=transfer,
... )
>>>
>>> for batch in dataloader:
...     ...
>>>

See also

__iter__() Iterable[Output][source]

Run the data loading pipeline in background.

Yields:

The items processed by processor and aggregator.