spdl.dataloader.DataLoader¶
- class DataLoader(src: Iterable[Source] | AsyncIterable[Source], *, preprocessor: Callable[[Source], T] | Callable[[Source], Iterable[T]] | Callable[[Source], Awaitable[T]] | Callable[[Source], AsyncIterable[T]] | None = None, batch_size: int | None = None, drop_last: bool = False, aggregator: Callable[[list[T]], Output] | Callable[[list[T]], Awaitable[Output]] | None = None, transfer_fn: Callable[[Output], Output] | None = None, buffer_size: int = 3, num_threads: int = 8, timeout: float | None = None, output_order: str = 'completion')[source]¶
A data preprocessing pipeline composed of source, preprocessing and aggregation.
It generates source items, preprocess them concurrently, and aggergates them and store the result in buffer.
┌────────┐ │ Source │ └─┬──────┘ │ ┌─▼─────────────────────┐ │ Preprocessing │─┐ │ │ │─┐ │ fn: Source -> T │ │ │ │ │ │ │ └──┬────────────────────┘ │ │ │└──┬───────────────────┘ │ │ └─────────────────────┘ │ ┌─▼─────────────────────┐ │ Aggregate │─┐ │ │ │─┐ │ fn: list[T] -> Output │ │ │ │ │ │ │ └──┬────────────────────┘ │ │ │└──┬───────────────────┘ │ │ └─────────────────────┘ │ ┌─▼──────────────────────┐ │ Transfer │ │ │ * No concurrency, as GPUs do not support │ fn: Output -> Output │ transferring multiple data concurrently. │ │ └─┬──────────────────────┘ │ ┌─▼──────┐ │ Buffer │ └────────┘
- Parameters:
src – Data source. An object impelements
Iterable
interface orAsyncIterable
interface. Typically, a generator that yields file paths or URLs. To iterate over an object that implementsMapping
protocol, and optionally with sampling, useMapIterator
.preprocessor – A [async] function or [async] generator that process the individual data source. Often times it loads data into array format.
aggregator – A [async] function that takes a set of preprocessed data and returns one item. Typically, batching and GPU transfer.
batch_size – The number of items to aggregate before it’s passed to the aggregator.
drop_last – If
True
and the number of source items are not divisible bybatch_size
, then drop the reminder.transfer_fn – A function applied to the output of aggregator function. It is intended for transferring data to GPU devices. Since GPU device transfer does not support concurrent transferring, this function is executed in a single thread.
buffer_size – The number of aggregated items to buffer.
num_threads – The number of worker threads.
timeout – The timeout until the next item becomes available. Default behavior is to wait indefinitely.
output_order – If ‘completion’ (default), items processed by the preprocessor are passed to the aggregator in order of completion. If ‘input’, then they are passed to the aggregator in the order of the source input.
- Exapmles:
>>> import spdl.io >>> from spdl.io import CPUBuffer, CUDABuffer, ImageFrames >>> >>> import torch >>> from torch import Tensor >>> >>> ################################################################## >>> # Source >>> ################################################################## >>> def source(root_dir: str) -> Iterable[str]: ... # Iterate the directory and find images. ... yield from glob.iglob(f"{root_dir}/**/*.JPEG", recursive=True) >>> >>> >>> ################################################################## >>> # Preprocessor >>> ################################################################## >>> width, height, batch_size = 224, 224, 32 >>> >>> # Filter description that scales the image and convert to RGB >>> filter_desc = spdl.io.get_filter_desc( ... scale_width=width, ... scale_height=height, ... pix_fmt="rgb24" ... ) >>> >>> def decode_image(path: str) -> ImageFrames: ... # Decode image and resize ... packets = spdl.io.demux_image(path) ... return spdl.io.decode_packets(packets, filter_desc=filter_desc) ... >>> >>> ################################################################## >>> # Aggregator >>> ################################################################## >>> size = width * height * batch_size * 3 >>> storage = spdl.io.cpu_storage(size, pin_memory=True) >>> >>> def batchify(data: list[ImageFrames]) -> Tensor: ... # Merge the decoded frames into the pre-allocated pinned-memory. ... return spdl.io.convert_frames(data, storage=storage) ... >>> >>> ################################################################## >>> # Transfer >>> ################################################################## >>> cuda_device_index = 0 >>> stream = torch.cuda.Stream(device=cuda_device_index) >>> cuda_config = spdl.io.cuda_config( ... device_index=cuda_device_index, ... stream=stream.cuda_stream, ... ) >>> >>> def transfer(cpu_buffer: CPUBuffer) -> CUDABuffer: ... # Send to CUDA in a separate stream. ... cuda_buffer = spdl.io.transfer_buffer(cpu_buffer, cuda_config=cuda_config) ... # Cast to Torch Tensor type. ... return spdl.io.to_torch(cuda_buffer) ... >>> >>> dataloader = DataLoader( ... src=source(root_dir), ... preprocessor=decode_image, ... batch_size=batch_size, ... aggregator=batchify, ... transfer_fn=transfer, ... ) >>> >>> for batch in dataloader: ... ... >>>
See also
spdl.pipeline.Pipeline
: The abstraction used for executing the logics.spdl.io.demux_image()
,spdl.io.decode_packets()
: Decoding image.spdl.io.cpu_storage()
: Allocate page-locked memory.spdl.io.convert_frames()
: Merging the decoded frames into pre-allocated memory without creating intermediate arrays.spdl.io.transfer_buffer()
: Sending the data to GPU.spdl.io.to_torch()
,spdl.io.to_numba()
,spdl.io.to_jax()
: Casting the memroy buffer to array type.