Skip to main content

Tutorial 3: Sequential Accessing Multi-sensor Data

View Notebook on GitHub

Introduction

This tutorial shows how to use the unified queued API in vrs_data_provider to efficiently stream multi-sensor data from Aria VRS files.

We will learn how to use the unified SensorData interface, access time-ordered sensor data queues, and customize stream control and time windowing for efficient processing.

In this tutorial, we will learn:

  1. Use the basic queued API to iterate through all sensor data.
  2. Explore the unified SensorData interface
  3. Customize stream selection and time windowing in this queued API.
  4. Apply frame rate subsampling for efficient processing

Prerequisites

  • Complete Tutorial 1 (VrsDataProvider Basics) to understand basic data provider concepts
  • Download Aria Gen2 sample data from link

Note on Visualization If visualization window is not showing up, this is due to Rerun lib's caching issue. Just rerun the specific code cell.

from projectaria_tools.core import data_provider

# Load local VRS file
vrs_file_path = "path/to/your/recording.vrs"
vrs_data_provider = data_provider.create_vrs_data_provider(vrs_file_path)

Basic Sequential Data Access API

The deliver_queued_sensor_data() method in vrs_data_provider provides a unified way to iterate through all sensor data in timestamp order. This is the primary API for sequential access to multi-sensor data.

Key features of the queued API:

  • Returns data from ALL streams in the VRS file
  • Orders data by device timestamp (chronological order)
  • Customizable via stream selection, sub-sampling each stream, etc.
  • The returned data can be further converted to each sensor type via a unified interface.

Here is a simple example to query the first K data samples from the VRS, and inspect each data sample's properties:

print("\n=== Basic Sequential Data Access ===")
print("Processing all sensor data in timestamp order...")

# Variables to store how many data samples has arrived for each sensor stream
data_count = 0
per_stream_data_counts = {}
total_num_samples = 5000

# Call deliver queued sensor data API to obtain a "streamed" data, in sorted timestamp order
# The iterator would return a unified SensorData instance
print(f"Start inspecting the first {total_num_samples} data samples in the VRS")
for sensor_data in vrs_data_provider.deliver_queued_sensor_data():
# Which stream does this sensor data belong to
stream_id = sensor_data.stream_id()
stream_label = vrs_data_provider.get_label_from_stream_id(stream_id)

# Aggregate data count for this stream
data_count += 1
if stream_label not in per_stream_data_counts:
per_stream_data_counts[stream_label] = 0
per_stream_data_counts[stream_label] += 1

# Limit output for demonstration
if data_count >= total_num_samples:
print("Stopping after 5000 samples for demonstration...")
break


# Print data counts for each sensor stream
print(f"\nTotal processed: {data_count} sensor data samples")
print("Data count per stream:")
for stream_label, count in per_stream_data_counts.items():
print(f"\t{stream_label}: {count}")

Understanding the Unified SensorData Interface

The queued API returns data using a unified SensorData interface. This allows you to handle different types of sensor data (images, IMU, audio, etc.) in a consistent way, regardless of the sensor type.

Each SensorData object provides:

  • Stream ID and stream label for identification
  • Sensor data type (IMAGE, IMU, AUDIO, etc.)
  • Timestamps in different time domains
  • Access to the actual sensor data (images, IMU, audio, etc.)
from projectaria_tools.core.sensor_data import SensorDataType, TimeDomain, TimeQueryOptions

print("\n=== Exploring the SensorData Interface ===")

# Get a few samples to examine their properties
data_count = 0
for sensor_data in vrs_data_provider.deliver_queued_sensor_data():
if data_count >= 5:
break

# Inspect where this sensor data come from, and what is its data type
stream_id = sensor_data.stream_id()
stream_label = vrs_data_provider.get_label_from_stream_id(stream_id)
data_type = sensor_data.sensor_data_type()

# Inspect the device timestamp of this sensor data
device_time = sensor_data.get_time_ns(TimeDomain.DEVICE_TIME)

print(f"\nSample {data_count + 1}:")
print(f" Stream: {stream_label} (Stream ID: {stream_id})")
print(f" Type: {data_type}")
print(f" Device Time: {device_time/1e9:.6f}s")

# Map sensor data to its specific type, and inspect its actual data content.
# Here we use image and IMU as an example
if data_type == SensorDataType.IMAGE:
image_data = sensor_data.image_data_and_record()[0]
print(f" Image size: {image_data.get_width()} x {image_data.get_height()}")
print(f" Pixel format: {image_data.get_pixel_format()}")

elif data_type == SensorDataType.IMU:
imu_data = sensor_data.imu_data()
accel = imu_data.accel_msec2
gyro = imu_data.gyro_radsec
print(f" IMU Accel: [{accel[0]:.3f}, {accel[1]:.3f}, {accel[2]:.3f}] m/s²")
print(f" IMU Gyro: [{gyro[0]:.3f}, {gyro[1]:.3f}, {gyro[2]:.3f}] rad/s")

data_count += 1

Customizing Data Access with DeliverQueuedOptions

The real power of the queued API comes from customization options. The DeliverQueuedOptions class allows you to:

  1. Apply time windowing - Process only specific time ranges.
  2. Select specific streams - Choose which sensors to include.
  3. Subsample data - Reduce frame rates for specific streams.

These all provide flexible ways to control the sensor queue, to focus on specific time periods or sensor modalities, or customize data rates for different analysis needs.

import rerun as rr

print("\n=== Customizing Data Access with DeliverQueuedOptions ===")

customized_deliver_options = vrs_data_provider.get_default_deliver_queued_options()

# -----------------
# 1. Stream selection feature - only select RGB, 1 SLAM camera, and 1 ET camera data.
# -----------------
rgb_to_select = vrs_data_provider.get_stream_id_from_label("camera-rgb")
slam_to_select = vrs_data_provider.get_stream_id_from_label("slam-front-right")
et_to_select = vrs_data_provider.get_stream_id_from_label("camera-et-right")

# First deactivate all streams, then just add back selected streams
customized_deliver_options.deactivate_stream_all()
for selected_stream_id in [rgb_to_select,slam_to_select,et_to_select]:
customized_deliver_options.activate_stream(selected_stream_id)

# -----------------
# 2. Time windowing feature - Skip first 2 seconds, and play for 3 seconds, if possible
# -----------------
total_length_ns = vrs_data_provider.get_last_time_ns_all_streams(TimeDomain.DEVICE_TIME) - vrs_data_provider.get_first_time_ns_all_streams(TimeDomain.DEVICE_TIME)
skip_begin_ns = int(2 * 1e9) # 2 seconds
duration_ns = int(3 * 1e9) # 3 seconds
skip_end_ns = max(total_length_ns - skip_begin_ns - duration_ns, 0)
customized_deliver_options.set_truncate_first_device_time_ns(skip_begin_ns)
customized_deliver_options.set_truncate_last_device_time_ns(skip_end_ns)

# -----------------
# 3. Per-stream sub-sampling feature - subsample slam camera at rate of 3
# -----------------
slam_subsample_rate = 3
customized_deliver_options.set_subsample_rate(stream_id = slam_to_select, rate = slam_subsample_rate)

# -----------------
# 4. Deliver customized data queue, and visualize
# -----------------
print(f"Start visualizing customized sensor data queue")

rr.init("rerun_viz_customized_sensor_data_queue")

for sensor_data in vrs_data_provider.deliver_queued_sensor_data(customized_deliver_options):
stream_id = sensor_data.stream_id()
stream_label = vrs_data_provider.get_label_from_stream_id(stream_id)
device_time_ns = sensor_data.get_time_ns(TimeDomain.DEVICE_TIME)

image_data_and_record = sensor_data.image_data_and_record()

# Visualize
rr.set_time_nanos("device_time", device_time_ns)
rr.log(stream_label, rr.Image(image_data_and_record[0].to_numpy_array()))

rr.notebook_show()