Building a Pipeline¶
Now, we use Pipeline
to construct the data loader.
When training models in the cloud, the process of making data
available on GPUs typically involves 4 to 5 steps.
Download raw data
Load and preprocess data
Batch data
Preprocess the batch (optional)
Transfer the batch to GPUs
When using SPDL’s pipeline abstraction, this process can be written as follows.
from spdl.pipeline import PipelineBuilder
# List of URLs
class Dataset(Iterable[str]):
...
# Download the data
def download(urls: list[str]) -> list[bytes]:
...
# Load tensor from the raw data with some additional preprocessing
def preprocess(data: bytes) -> Tensor:
...
# Create a batch Tensor
def collate(samples: list[Tensor]) -> Tensor:
...
# Transfer the batch tensor to the GPU
def gpu_transfer(batch: Tensor) -> Tensor:
...
# Build Pipeline
pipeline = (
PipelineBuilder()
.add_source(Dataset())
.aggregate(...)
.pipe(download, concurrency=...)
.disaggregate()
.pipe(preprocess, concurrency=...)
.aggregate(batch_size)
.pipe(collate)
.pipe(gpu_transfer)
.add_sink()
.build(num_threads=...)
)
# Run
with pipeline.auto_stop():
for batch in pipeline.get_iterator(timeout):
...
Typically, network calls are more efficient when requests are batched, so we aggregate the URLs before making a network call and then disaggregate them afterward.
Since downloading takes some time but does not consume many CPU resources, we make multiple download calls concurrently.
Decoding the raw data and applying preprocessing can be time-consuming and computationally intensive. As previously described, it is recommended to keep total CPU utilization at around 40% to avoid a QPS drop. However, we want to prevent the training process from suffering from data starvation. For this purpose, we apply some concurrency to the preprocessing stage.
The data are then batched and transferred to the GPU. These stages typically do not require concurrency. Concurrent GPU data transfer is not feasible due to hardware constraints.