Aria Gen2 Pilot Dataset Tutorial - VRS Data Loading
This tutorial demonstrates how to load and visualize VRS data from the Aria Gen2 Pilot Dataset using the AriaGen2PilotDataProvider.
What You'll Learn
- Initialize the AriaGen2PilotDataProvider.
- Discover available sensor data streams.
- Query sensor data by index and timestamp.
- Understand time domains and query options for temporal synchronization.
- Understand on device machine perception data stream: eye gaze, hand tracking, vio and vio high frequency.
Import Required Libraries
The following libraries are required for this tutorial:
# Standard library imports
import numpy as np
import os
from pathlib import Path
from datetime import timedelta
# Project Aria Tools imports
from projectaria_tools.core.stream_id import StreamId
from projectaria_tools.core.sensor_data import TimeDomain, TimeQueryOptions, VioStatus, TrackingQuality
from projectaria_tools.core.mps import get_unit_vector_from_yaw_pitch
from projectaria_tools.utils.rerun_helpers import (
create_hand_skeleton_from_landmarks,
AriaGlassesOutline,
ToTransform3D
)
# Aria Gen2 Pilot Dataset imports
from aria_gen2_pilot_dataset import AriaGen2PilotDataProvider
# Visualization library
import rerun as rr
Initialize Data Provider
The AriaGen2PilotDataProvider is the main interface for accessing data from the Aria Gen2 Pilot Dataset. It provides methods to query sensor data, discover available streams, and access device calibration information.
⚠️ Important: Update the sequence_path below to point to your downloaded Aria Gen2 Pilot Dataset sequence folder.
Replace with the actual path to your downloaded sequence folder
sequence_path = "path/to/your/sequence_folder"
# Initialize the data provider
pilot_data_provider = AriaGen2PilotDataProvider(sequence_path)
Check Available Data Modalities
Each Aria Gen2 Pilot dataset sequence contains the following data modalities:
Core Data Types
- VRS - Raw sensor data
- MPS (Machine Perception Service) - SLAM and hand tracking data.
Algorithm Outputs
- Foundation Stereo
- Heart Rate
- Hand-Object Interaction
- Diarization
- Egocentric Voxel Lifting
Please refer to the algorithm introduction here. For detailed instructions on loading and visualizing the algorithm output, see [tutorial_3_algorithm_data_loading.ipynb](TODO: add link).
Important Notes:
- VRS data is mandatory for creating a pilot data provider
- All other modalities are optional and depend on the specific sequence
- The availability of optional modalities varies between sequences
# Check what data types are available in this sequence
print("Data Modalities Available in This Sequence:")
print("=" * 60)
print(f"Raw VRS Sensors: ✅ (Mandatory)")
print(f"MPS Algorithms: {'✅' if pilot_data_provider.has_mps_data() else '❌'}")
print(f"Foundation Stereo: {'✅' if pilot_data_provider.has_stereo_depth_data() else '❌'}")
print(f"Heart Rate Monitoring: {'✅' if pilot_data_provider.has_heart_rate_data() else '❌'}")
print(f"Hand-Object Interaction: {'✅' if pilot_data_provider.has_hand_object_interaction_data() else '❌'}")
print(f"Diarization: {'✅' if pilot_data_provider.has_diarization_data() else '❌'}")
print(f"Egocentric Voxel Lifting: {'✅' if pilot_data_provider.has_egocentric_voxel_lifting_data() else '❌'}")
print("=" * 60)
VRS - Explore Available Streams
Stream Discovery and Navigation
A VRS file contains multiple streams, each storing data from a specific sensor or on-device algorithm result.
Each VRS stream is identified by a unique StreamId (e.g. 1201-1), consisting of RecordableTypeId (sensor type, e.g. 1201, standing for “SLAM camera”), and an instance_id (for multiple sensors of the same type, e.g. -1, standing for “instance #1 of this sensor type”). Below are some common RecordableTypeId in Aria recordings. Full definitions of all Recordable Types are given in the Project Aria Tools documentation, or refer to the `StreamId.h` file in the VRS repo.
| RecordableTypeId | Description |
|---|---|
| 214 | RGB camera stream |
| 1201 | SLAM camera stream |
| 211 | EyeTracking camera stream |
| 1202 | IMU sensor stream |
| 231 | Audio sensor stream |
| 373 | EyeGaze data stream from on-device EyeTracking algorithm |
# Get all available streams
all_streams = pilot_data_provider.get_vrs_all_streams()
print(f"Found {len(all_streams)} streams in the VRS file:")
# Print out each stream ID and their corresponding sensor label
for stream_id in all_streams:
label = pilot_data_provider.get_vrs_label_from_stream_id(stream_id)
print(f" --- Data stream {stream_id}'s label is: {label}")
# Find a specific stream's StreamId by sensor label
print("Seeking RGB data stream...")
rgb_stream_id = pilot_data_provider.get_vrs_stream_id_from_label("camera-rgb")
if rgb_stream_id is not None:
print(f"Found camera-rgb stream in VRS file: {rgb_stream_id}")
else:
print("Cannot find camera-rgb stream in VRS file.")
VRS - Device Calibration
Device calibration information is essential for accurate sensor data processing and 3D reconstruction. The calibration data includes camera intrinsics, extrinsics, and sensor mounting information.
For detailed usage of device calibration, please refer to the Project Aria Tools documentation.
# Get device calibration information
device_calibration = pilot_data_provider.get_vrs_device_calibration()
Sensor Data Query APIs
The AriaGen2PilotDataProvider offers two main approaches for querying sensor data:
Query by Index
The query-by-index API allows you to retrieve the k-th data sample from a specific stream using the following syntax:
get_vrs_\{SENSOR\}_data_by_index(stream_id, index)
where \{SENSOR\} can be replaced by any sensor data type available in Aria VRS. See the full list of supported sensors.
This API is commonly used for:
- Sequential processing (iterating through all frames of a single stream)
- When you know the exact frame number you want to query within a specific stream
Important Note: The indices in each stream are independent and not correlated across different sensor streams. For example, the i-th RGB image does not necessarily correspond to the i-th SLAM image. This is because different sensors may operate at different frequencies or have missing frames, so their data streams are not synchronized by index.
# Initialize Rerun for visualization
rr.init("rerun_viz_query_by_index")
# Get number of samples in the RGB stream
num_samples = pilot_data_provider.get_vrs_num_data(rgb_stream_id)
print(f"RGB stream has a total of {num_samples} frames\n")
# Access frames sequentially and plot the first few frames
first_few = min(10, num_samples)
print(f"Printing the capture timestamps from the first {first_few} frames")
for i in range(first_few): # First 10 frames
image_data, image_record = pilot_data_provider.get_vrs_image_data_by_index(
rgb_stream_id, i
)
# Access image properties
print(f"Frame {i}: timestamp = {image_record.capture_timestamp_ns}")
# Process and visualize image data
if image_data.is_valid():
rr.set_time_nanos("device_time", image_record.capture_timestamp_ns)
rr.log("camera_rgb", rr.Image(image_data.to_numpy_array()))
# Display the visualization
rr.notebook_show()
Query by Timestamp: TimeDomain and TimeQueryOptions
A key feature of Aria devices is the ability to capture time-synchronized, multi-modal sensor data. To help you access this data with precise temporal control, projectaria_tools provides a comprehensive suite of time-based APIs.
The most commonly used is the timestamp-based query:
get_\{SENSOR\}_by_time_ns(stream_id, time_ns, time_domain=None, time_query_options=None)
where \{SENSOR\} can be replaced by any sensor data type available in Aria VRS. See here for a full list of supported {SENSOR}
This API is often used to:
- Synchronize data across multiple sensor streams
- Fetch sensor data at specific timestamps
- Perform temporal analysis
TimeDomain and TimeQueryOptions
When querying sensor data by timestamp, two important concepts are:
- TimeDomain: Specifies the time reference for your query.
- TimeQueryOptions: Controls how the API selects data relative to your requested timestamp.
Below are all available options for each:
TimeDomain Options
| Name | Description | Typical Use Case |
|---|---|---|
| RECORD_TIME | Timestamp stored directly in the VRS index. Fast access, but time domain may vary. | Quick access, not recommended for sync. |
| DEVICE_TIME | Accurate device capture time. All sensors on the same Aria device share this domain. | Recommended for single-device data. |
| HOST_TIME | Arrival time in the host computer’s domain. May not be accurate. | Debugging, host-side analysis. |
| SubGhz | TimeSync server’s domain using SubGhz signals, accurate for multi-device capture. | Multi-device synchronization. |
| Utc | UTC time domain, only seconds-level accuracy. | Coarse, global time reference. |
TimeQueryOptions
| Name | Description |
|---|---|
| Before | Returns the last valid data with timestamp <= t_query. |
| After | Returns the first valid data with timestamp >= t_query. |
| Closest | Returns the data sample closest to t_query. If two are equally close, returns the one before the query. |
from projectaria_tools.core.sensor_data import TimeDomain, TimeQueryOptions
# Initialize Rerun for visualization
rr.init("rerun_viz_query_by_timestamp")
# Get time bounds for RGB images
first_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(rgb_stream_id, TimeDomain.DEVICE_TIME)[0]
last_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(rgb_stream_id, TimeDomain.DEVICE_TIME)[-1]
# Query specific timestamp
target_time_ns = first_timestamp_ns + 1000000000 # 1 second later
image_data, image_record = pilot_data_provider.get_vrs_image_data_by_time_ns(
rgb_stream_id,
target_time_ns,
TimeDomain.DEVICE_TIME,
TimeQueryOptions.CLOSEST
)
actual_time_ns = image_record.capture_timestamp_ns
print(f"Requested RGB data that is closest to: {target_time_ns} ns, Got closest sample at: {actual_time_ns} ns")
# Plot RGB and SLAM images at approximately 1 Hz
camera_label_list = ["camera-rgb", "slam-front-left", "slam-front-right", "slam-side-left", "slam-side-right"]
camera_stream_ids = [pilot_data_provider.get_vrs_stream_id_from_label(camera_label) for camera_label in camera_label_list]
query_timestamp_ns = first_timestamp_ns
for _ in range(10):
for label, stream_id in zip(camera_label_list, camera_stream_ids):
# Query each camera's data according to query timestamp
image_data, image_record = pilot_data_provider.get_vrs_image_data_by_time_ns(
stream_id,
query_timestamp_ns,
TimeDomain.DEVICE_TIME,
TimeQueryOptions.CLOSEST)
# Note: the actual timestamp of the image data is stored within image_record.
# It can be different from the query_time.
capture_time_ns = image_record.capture_timestamp_ns
# Visualize with Rerun
if image_data.is_valid():
rr.set_time_nanos("device_time", capture_time_ns)
rr.log(label, rr.Image(image_data.to_numpy_array()))
query_timestamp_ns = query_timestamp_ns + int(1e9) # 1 second
# Display the visualization
rr.notebook_show()
On-Device Eye Tracking and Hand Tracking
Aria Gen2 glasses feature on-device machine perception algorithms that run during recording. This section covers how to access and visualize eye tracking and hand tracking data from VRS files.
Eye Tracking Data
Eye tracking data provides information about where the user is looking, including gaze direction, depth estimation, and eye movement patterns.
EyeGaze Data Structure
The EyeGaze data type represents on-device eye tracking results. Importantly, it directly reuses the EyeGaze data structure from MPS (Machine Perception Services), providing guaranteed compatibility across VRS and MPS.
Key EyeGaze fields:
| Field Name | Description |
|---|---|
session_uid | Unique ID for the eyetracking session |
tracking_timestamp | Timestamp of the eye tracking camera frame in device time domain, in us. |
vergence.t[x,y,z]_[left,right]_eye | Translation for each eye origin in CPF frame |
yaw,vergence.[left,right]_yaw | Eye gaze yaw angle (horizontal) in radians in CPF frame |
pitch,vergence.[left,right]_pitch(Gen2-only) | Eye gaze pitch angle (vertical) in radians in CPF frame. The left and right pitch are assumed to be the same in Aria-Gen1. |
depth | Depth in meters of the 3D eye gaze point in CPF frame (0 = unavailable) |
yaw_low,yaw_high,pitch_low,pitch_high | Confidence interval bounds for yaw and pitch angle |
| Aria-Gen2 specific fields | |
combined_gaze_origin_in_cpf | Combined gaze origin in CPF frame (Gen2 only) |
spatial_gaze_point_in_cpf | 3D spatial gaze point in CPF frame |
vergence.[left,right]_entrance_pupil_position_meter | Entrance pupil positions for each eye |
vergence.[left,right]_pupil_diameter_meter | Entrance pupil diameter for each eye |
vergence.[left,right]_blink | Blink detection for left and right eyes |
*_valid | Boolean flags to indicating if the corresponding data field in EyeGaze is valid |
EyeGaze API Reference
In AriaGen2PilotDataProvider, EyeGaze is treated the same way as any other sensor data, and shares similar query APIs:
get_vrs_eye_gaze_data_by_index(stream_id, index): Query by index.get_vrs_eye_gaze_data_by_time_ns(stream_id, timestamp, time_domain, query_options): Query by timestamp.
# First, let's check if eye tracking data is available
eyegaze_label = "eyegaze"
eyegaze_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(eyegaze_label)
if eyegaze_stream_id is None:
print(f"❌ {eyegaze_label} data stream does not exist in this VRS file.")
print("Please use a VRS file that contains valid eyegaze data for this tutorial.")
else:
print(f"✅ Found {eyegaze_label} data stream: {eyegaze_stream_id}")
# Get number of eye tracking samples
num_eyegaze_samples = pilot_data_provider.get_vrs_num_data(eyegaze_stream_id)
print(f"Total eye tracking samples: {num_eyegaze_samples}")
# Sample a few eye tracking data points
print("\n=== EyeGaze Data Sample ===")
selected_index = min(5, num_eyegaze_samples - 1)
eyegaze_data = pilot_data_provider.get_vrs_eye_gaze_data_by_index(eyegaze_stream_id, selected_index)
if eyegaze_data is not None:
# Eyegaze timestamp is in format of datetime.deltatime in microseconds, convert it to integer
from datetime import timedelta
eyegaze_timestamp_ns = (eyegaze_data.tracking_timestamp // timedelta(microseconds=1)) * 1000
print(f"Sample {selected_index}:")
print(f"\tTracking timestamp: {eyegaze_timestamp_ns}")
# Check if combined gaze is valid, if so, print out the gaze direction
print(f"\tCombined gaze valid: {eyegaze_data.combined_gaze_valid}")
if eyegaze_data.combined_gaze_valid:
print(f"\tYaw: {eyegaze_data.yaw:.3f} rad")
print(f"\tPitch: {eyegaze_data.pitch:.3f} rad")
print(f"\tDepth: {eyegaze_data.depth:.3f} m")
# Convert gaze direction to unit vector
from projectaria_tools.core.mps import get_unit_vector_from_yaw_pitch
gaze_direction_in_unit_vec = get_unit_vector_from_yaw_pitch(eyegaze_data.yaw, eyegaze_data.pitch)
print(f"\tGaze direction in unit vec [xyz]: {gaze_direction_in_unit_vec}")
# Check if spatial gaze point is valid
print(f"\tSpatial gaze point valid: {eyegaze_data.spatial_gaze_point_valid}")
if eyegaze_data.spatial_gaze_point_valid:
print(f"\tSpatial gaze point in CPF: {eyegaze_data.spatial_gaze_point_in_cpf}")
else:
print("No eye tracking data available at the selected index.")
Eye Tracking Visualization
To visualize eye tracking results in camera images, you need to project the gaze data into the camera coordinate system using device calibration.
Important Coordinate System Note:
All eye tracking results in Aria are stored in a reference coordinate system called Central Pupil Frame (CPF), which is approximately the center of the user's two eye positions. This CPF frame is DIFFERENT from the Device frame in device calibration, where the latter is essentially the slam-front-left camera. To transform between CPF and Device, we use the device calibration API:
device_calibration.get_transform_device_cpf()
# Eye tracking visualization helper function
def plot_eyegaze_in_camera(eyegaze_data, camera_label, camera_calib, T_device_cpf):
"""
A helper function to plot eyegaze's spatial gaze point into a camera image
"""
# Skip if eyegaze data is invalid
if not (eyegaze_data.spatial_gaze_point_valid and eyegaze_data.combined_gaze_valid):
return
# First, transform spatial gaze point from CPF -> Device -> Camera frame
spatial_gaze_point_in_cpf = eyegaze_data.spatial_gaze_point_in_cpf
spatial_gaze_point_in_device = T_device_cpf @ spatial_gaze_point_in_cpf
spatial_gaze_point_in_camera = (
camera_calib.get_transform_device_camera().inverse()
@ spatial_gaze_point_in_device
)
# Project into camera and plot 2D gaze location
maybe_pixel = camera_calib.project(spatial_gaze_point_in_camera)
if maybe_pixel is not None:
rr.log(
f"{camera_label}",
rr.Points2D(
positions=[maybe_pixel],
colors=[255, 64, 255],
radii = [30.0]
),
)
# Visualize eye tracking in camera images (if eye tracking data is available)
if eyegaze_stream_id is not None:
print("\n=== Visualizing on-device eye tracking in camera images ===")
# Get device calibration and CPF transform
device_calib = pilot_data_provider.get_vrs_device_calibration()
T_device_cpf = device_calib.get_transform_device_cpf()
# Get RGB camera stream
rgb_camera_label = "camera-rgb"
rgb_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(rgb_camera_label)
rgb_camera_calib = device_calib.get_camera_calib(rgb_camera_label)
# Initialize Rerun for visualization
rr.init("rerun_viz_et_in_cameras")
rr.notebook_show()
# Get time bounds and sample data
first_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(rgb_stream_id, TimeDomain.DEVICE_TIME)[0]
last_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(rgb_stream_id, TimeDomain.DEVICE_TIME)[-1]
# Sample a few frames for visualization
sample_timestamps = []
for i in range(0, min(10, pilot_data_provider.get_vrs_num_data(rgb_stream_id)), 2):
rgb_data, rgb_record = pilot_data_provider.get_vrs_image_data_by_index(rgb_stream_id, i)
sample_timestamps.append(rgb_record.capture_timestamp_ns)
# Visualize RGB images with eye tracking overlay
for timestamp_ns in sample_timestamps:
# Get RGB image
rgb_data, rgb_record = pilot_data_provider.get_vrs_image_data_by_time_ns(
rgb_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if rgb_data.is_valid():
# Visualize the RGB image
rr.set_time_nanos("device_time", rgb_record.capture_timestamp_ns)
rr.log(rgb_camera_label, rr.Image(rgb_data.to_numpy_array()))
# Get eye tracking data for this timestamp
eyegaze_data = pilot_data_provider.get_vrs_eye_gaze_data_by_time_ns(
eyegaze_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if eyegaze_data is not None:
# Plot eye tracking overlay
plot_eyegaze_in_camera(
eyegaze_data=eyegaze_data,
camera_label=rgb_camera_label,
camera_calib=rgb_camera_calib,
T_device_cpf=T_device_cpf
)
else:
print("Skipping eye tracking visualization - no eye tracking data available.")
Hand Tracking Data
Hand tracking data provides comprehensive 3D hand pose information including landmark positions, confidence scores, and hand gestures.
HandTracking Data Structure
HandTracking data contains comprehensive 3D hand pose information. Importantly, it directly reuses the HandTrackingResults data structure from MPS (Machine Perception Services), providing guaranteed compatibility across VRS and MPS.
Key Fields in HandTrackingResults:
| Field Name | Description |
|---|---|
tracking_timestamp | Timestamp of the hand-tracking estimate in the device time domain. |
left_hand | Left-hand pose, or None if no valid pose is found for the timestamp. |
right_hand | Right-hand pose, or None if no valid pose is found for the timestamp. |
Single Hand fields (left or right):
| Field Name | Description |
|---|---|
confidence | Tracking confidence score for this hand. |
landmark_positions_device | List of 21 hand-landmark positions in the device frame (3D points). See the wiki page for landmark definitions. |
transform_device_wrist | Full SE3 transform of the wrist in the Device frame. |
wrist_and_palm_normal_device | Normal vectors for the wrist and palm joints in the Device frame. |
HandTracking Coordinate System
All HandTracking results in Aria are stored in the Device coordinate frame, which is the same as device calibration. This makes it easier to work with compared to eye tracking data.
HandTracking API Reference
In AriaGen2PilotDataProvider, HandTracking shares similar query APIs:
get_vrs_hand_pose_data_by_index(stream_id, index): Query by index.get_vrs_hand_pose_data_by_time_ns(stream_id, timestamp, time_domain, query_options): Query by timestamp.
# Check if hand tracking data is available
handtracking_label = "handtracking"
handtracking_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(handtracking_label)
if handtracking_stream_id is None:
print(f"❌ {handtracking_label} data stream does not exist in this VRS file.")
print("Please use a VRS file that contains valid handtracking data for this tutorial.")
else:
print(f"✅ Found {handtracking_label} data stream: {handtracking_stream_id}")
# Get number of hand tracking samples
num_handtracking_samples = pilot_data_provider.get_vrs_num_data(handtracking_stream_id)
print(f"Total hand tracking samples: {num_handtracking_samples}")
# Sample a few hand tracking data points
print("\n=== HandTracking Data Sample ===")
selected_index = min(5, num_handtracking_samples - 1)
hand_data = pilot_data_provider.get_vrs_hand_pose_data_by_index(handtracking_stream_id, selected_index)
if hand_data is not None:
print(f"Sample {selected_index}:")
print(f"\tTracking timestamp: {hand_data.tracking_timestamp}")
# Print the content of left and right hand if valid
if hand_data.left_hand is not None:
print("\tLeft hand detected")
print(f"\t\tConfidence: {hand_data.left_hand.confidence:.3f}")
print(f"\t\tLandmarks shape: {len(hand_data.left_hand.landmark_positions_device)}")
print(f"\t\tWrist location: {hand_data.left_hand.get_wrist_position_device()}")
else:
print("\tLeft hand: Not detected")
if hand_data.right_hand is not None:
print("\tRight hand detected")
print(f"\t\tConfidence: {hand_data.right_hand.confidence:.3f}")
print(f"\t\tLandmarks shape: {len(hand_data.right_hand.landmark_positions_device)}")
print(f"\t\tWrist location: {hand_data.right_hand.get_wrist_position_device()}")
else:
print("\tRight hand: Not detected")
else:
print("No hand tracking data available at the selected index.")
Interpolated Hand Tracking Results
Context:
In Aria-Gen2 glasses, the on-device hand-tracking data are calculated from the SLAM cameras, not RGB cameras. In the meantime, the SLAM cameras and RGB camera often run at different sampling frequencies, and their triggering are not aligned either. This causes the hand tracking result's timestamp to often not line up with that of RGB camera, causing additional challenges in accurately visualizing handtracking results in RGB images.
API to query interpolated handtracking results
To resolve this, AriaGen2PilotDataProvider enables a special query API for handtracking results:
get_vrs_interpolated_hand_pose_data(stream_id, timestamp_ns, time_domain)
which will return an interpolated hand tracking result, given any timestamp within valid timestamps of the VRS file.
Handtracking Interpolation Implementation
- Find the 2 nearest hand-tracking results before and after the target timestamp.
- If the 2 hand-tracking results time delta is larger than 100 ms, interpolation is considered unreliable → return
None. - Otherwise, interpolate each hand separately:
a. For the left or right hand, perform interpolation only if both the "before" and "after" samples contain a valid result for that hand.
b. If either sample is missing, the interpolated result for that hand will be
None. - Single-hand interpolation is calculated as:
a. Apply linear interpolation on the 3D hand landmark positions.
b. Apply SE3 interpolation on
T_Device_Wrist3D pose. c. Re-calculate the wrist and palm normal vectors. d. Take theminof confidence values.
# Demonstrate interpolated hand tracking (if hand tracking data is available)
if handtracking_stream_id is not None:
print("\n=== Demonstrating query interpolated hand tracking results ===")
# Get SLAM and RGB camera streams for comparison
slam_stream_id = pilot_data_provider.get_vrs_stream_id_from_label("slam-front-left")
rgb_stream_id = pilot_data_provider.get_vrs_stream_id_from_label("camera-rgb")
if slam_stream_id is not None and rgb_stream_id is not None:
# Retrieve a SLAM frame, use its timestamp as query
slam_sample_index = min(10, pilot_data_provider.get_vrs_num_data(slam_stream_id) - 1)
slam_data, slam_record = pilot_data_provider.get_vrs_image_data_by_index(slam_stream_id, slam_sample_index)
slam_timestamp_ns = slam_record.capture_timestamp_ns
# Retrieve the closest RGB frame
rgb_data, rgb_record = pilot_data_provider.get_vrs_image_data_by_time_ns(
rgb_stream_id, slam_timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
rgb_timestamp_ns = rgb_record.capture_timestamp_ns
# Retrieve the closest hand tracking data sample
raw_ht_data = pilot_data_provider.get_vrs_hand_pose_data_by_time_ns(
handtracking_stream_id, slam_timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if raw_ht_data is not None:
from datetime import timedelta
raw_ht_timestamp_ns = (raw_ht_data.tracking_timestamp // timedelta(microseconds=1)) * 1000
# Check if hand tracking aligns with RGB or SLAM data
print(f"SLAM timestamp: {slam_timestamp_ns}")
print(f"RGB timestamp: {rgb_timestamp_ns}")
print(f"Hand tracking timestamp: {raw_ht_timestamp_ns}")
print(f"Hand tracking-SLAM time diff: {abs(raw_ht_timestamp_ns - slam_timestamp_ns) / 1e6:.2f} ms")
print(f"Hand tracking-RGB time diff: {abs(raw_ht_timestamp_ns - rgb_timestamp_ns) / 1e6:.2f} ms")
# Now, query interpolated hand tracking data sample using RGB timestamp
interpolated_ht_data = pilot_data_provider.get_vrs_interpolated_hand_pose_data(
handtracking_stream_id, rgb_timestamp_ns, TimeDomain.DEVICE_TIME
)
# Check that interpolated hand tracking now aligns with RGB data
if interpolated_ht_data is not None:
interpolated_ht_timestamp_ns = (interpolated_ht_data.tracking_timestamp // timedelta(microseconds=1)) * 1000
print(f"Interpolated hand tracking timestamp: {interpolated_ht_timestamp_ns}")
print(f"Interpolated hand tracking-RGB time diff: {abs(interpolated_ht_timestamp_ns - rgb_timestamp_ns) / 1e6:.2f} ms")
else:
print("Interpolated hand tracking data is None - interpolation failed")
else:
print("No raw hand tracking data found for comparison")
else:
print("Required camera streams not found for interpolation demonstration")
else:
print("Skipping hand tracking interpolation demonstration - no hand tracking data available.")
Hand Tracking Visualization
To visualize hand tracking results in camera images, you need to project the hand landmarks and skeleton into the camera coordinate system using device calibration. Since hand tracking data is already in the Device frame, the transformation is more straightforward than eye tracking.
# Hand tracking visualization helper functions
def plot_single_hand_in_camera(hand_joints_in_device, camera_label, camera_calib, hand_label):
"""
A helper function to plot a single hand data in 2D camera view
"""
# Setting different marker plot sizes for RGB and SLAM since they have different resolutions
plot_ratio = 3.0 if camera_label == "camera-rgb" else 1.0
marker_color = [255,64,0] if hand_label == "left" else [255, 255, 0]
# Project into camera frame, and also create line segments
hand_joints_in_camera = []
for pt_in_device in hand_joints_in_device:
pt_in_camera = (
camera_calib.get_transform_device_camera().inverse() @ pt_in_device
)
pixel = camera_calib.project(pt_in_camera)
hand_joints_in_camera.append(pixel)
# Create hand skeleton in 2D image space
from projectaria_tools.utils.rerun_helpers import create_hand_skeleton_from_landmarks
hand_skeleton = create_hand_skeleton_from_landmarks(hand_joints_in_camera)
# Remove "None" markers from hand joints in camera. This is intentionally done AFTER the hand skeleton creation
hand_joints_in_camera = list(
filter(lambda x: x is not None, hand_joints_in_camera)
)
rr.log(
f"{camera_label}/{hand_label}/landmarks",
rr.Points2D(
positions=hand_joints_in_camera,
colors= marker_color,
radii= [3.0 * plot_ratio]
),
)
rr.log(
f"{camera_label}/{hand_label}/skeleton",
rr.LineStrips2D(
hand_skeleton,
colors=[0, 255, 0],
radii= [0.5 * plot_ratio],
),
)
def plot_handpose_in_camera(hand_pose, camera_label, camera_calib):
"""
A helper function to plot hand tracking results into a camera image
"""
# Plot both hands
if hand_pose.left_hand is not None:
plot_single_hand_in_camera(
hand_joints_in_device=hand_pose.left_hand.landmark_positions_device,
camera_label=camera_label,
camera_calib = camera_calib,
hand_label="left")
if hand_pose.right_hand is not None:
plot_single_hand_in_camera(
hand_joints_in_device=hand_pose.right_hand.landmark_positions_device,
camera_label=camera_label,
camera_calib = camera_calib,
hand_label="right")
# Visualize hand tracking in camera images (if hand tracking data is available)
if handtracking_stream_id is not None:
print("\n=== Visualizing on-device hand tracking in camera images ===")
# Get device calibration
device_calib = pilot_data_provider.get_vrs_device_calibration()
# Get RGB camera stream
rgb_camera_label = "camera-rgb"
rgb_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(rgb_camera_label)
rgb_camera_calib = device_calib.get_camera_calib(rgb_camera_label)
# Initialize Rerun for visualization
rr.init("rerun_viz_ht_in_cameras")
rr.notebook_show()
# Get time bounds and sample data
first_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(rgb_stream_id, TimeDomain.DEVICE_TIME)[0]
# Sample a few frames for visualization
sample_timestamps = []
for i in range(0, min(10, pilot_data_provider.get_vrs_num_data(rgb_stream_id)), 2):
rgb_data, rgb_record = pilot_data_provider.get_vrs_image_data_by_index(rgb_stream_id, i)
sample_timestamps.append(rgb_record.capture_timestamp_ns)
# Visualize RGB images with hand tracking overlay
for timestamp_ns in sample_timestamps:
# Get RGB image
rgb_data, rgb_record = pilot_data_provider.get_vrs_image_data_by_time_ns(
rgb_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if rgb_data.is_valid():
# Visualize the RGB image
rr.set_time_nanos("device_time", rgb_record.capture_timestamp_ns)
rr.log(rgb_camera_label, rr.Image(rgb_data.to_numpy_array()))
# Query interpolated hand tracking data for this timestamp
interpolated_hand_pose = pilot_data_provider.get_vrs_interpolated_hand_pose_data(
handtracking_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME
)
if interpolated_hand_pose is not None:
# Plot hand tracking overlay
plot_handpose_in_camera(
hand_pose=interpolated_hand_pose,
camera_label=rgb_camera_label,
camera_calib=rgb_camera_calib
)
else:
print("Skipping hand tracking visualization - no hand tracking data available.")
On-Device VIO (Visual Inertial Odometry)
VIO (Visual Inertial Odometry) combines camera images and IMU (Inertial Measurement Unit) data to estimate device pose and motion in real-time. VIO tracks the device's position, orientation, and velocity by performing visual tracking, IMU integration, sensor fusion, etc, making it the foundation for spatial tracking and understanding.
In Aria-Gen2 devices, the VIO algorithm runs on-device to produce 2 types of tracking results as part of the VRS file: VIO and VIO High Frequency.
VIO Data Structure
Data Type: FrontendOutput
This is a new data type introduced to store the results from the VIO system, containing the following fields:
| Field Name | Description |
|---|---|
frontend_session_uid | Session identifier (resets on VIO restart) |
frame_id | Frame set identifier |
capture_timestamp_ns | Center capture time in nanoseconds |
unix_timestamp_ns | Unix timestamp in nanoseconds |
status | VIO status (VALID/INVALID) |
pose_quality | Pose quality (GOOD/BAD/UNKNOWN) |
visual_tracking_quality | Visual-only tracking quality |
online_calib | Online calibration estimates for SLAM cameras and IMUs |
gravity_in_odometry | Gravity vector in odometry frame |
transform_odometry_bodyimu | Body IMU's pose in odometry reference frame |
transform_bodyimu_device | Transform from body IMU to device frame |
linear_velocity_in_odometry | Linear velocity in odometry frame in m/s |
angular_velocity_in_bodyimu | Angular velocity in body IMU frame in rad/s |
Here, body IMU is the IMU that is picked as the reference for motion tracking. For Aria-Gen2's on-device VIO algorithm, this is often imu-left.
Important Note: Always check status == VioStatus.VALID and pose_quality == TrackingQuality.GOOD for VIO data validity!
VIO High Frequency Data Structure
VIO High Frequency results are generated directly from the on-device VIO results by performing IMU integration between VIO poses, hence providing a much higher data rate at approximately 800Hz.
Data Type: OpenLoopTrajectoryPose
The VioHighFrequency stream re-uses the OpenLoopTrajectoryPose data structure defined in MPS.
| Field Name | Description |
|---|---|
tracking_timestamp | Timestamp in device time domain, in microseconds |
transform_odometry_device | Transformation from device to odometry coordinate frame, represented as a SE3 instance. |
device_linear_velocity_odometry | Translational velocity of device in odometry frame, in m/s |
angular_velocity_device | Angular velocity of device in device frame, in rad/s |
quality_score | Quality of pose estimation (higher = better) |
gravity_odometry | Earth gravity vector in odometry frame |
session_uid | Unique identifier for VIO tracking session |
Important Note: Due to the high frequency nature of this data (~800Hz), consider subsampling for visualization to maintain performance.
VIO API Reference
In AriaGen2PilotDataProvider, VIO data shares similar query APIs:
get_vrs_vio_data_by_index(stream_id, index): Query VIO data by index.get_vrs_vio_data_by_time_ns(stream_id, timestamp, time_domain, query_options): Query VIO data by timestamp.get_vrs_vio_high_freq_data_by_index(stream_id, index): Query VIO high frequency data by index.get_vrs_vio_high_freq_data_by_time_ns(stream_id, timestamp, time_domain, query_options): Query VIO high frequency data by timestamp.
# Check if VIO data is available
vio_label = "vio"
vio_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(vio_label)
vio_high_freq_label = "vio_high_frequency"
vio_high_freq_stream_id = pilot_data_provider.get_vrs_stream_id_from_label(vio_high_freq_label)
print("=== VIO Data Availability ===")
if vio_stream_id is None:
print(f"❌ {vio_label} data stream does not exist in this VRS file.")
else:
print(f"✅ Found {vio_label} data stream: {vio_stream_id}")
if vio_high_freq_stream_id is None:
print(f"❌ {vio_high_freq_label} data stream does not exist in this VRS file.")
else:
print(f"✅ Found {vio_high_freq_label} data stream: {vio_high_freq_stream_id}")
# Sample VIO data (if available)
if vio_stream_id is not None:
print("\n=== VIO Data Sample ===")
# Find the first valid VIO data sample
num_vio_samples = pilot_data_provider.get_vrs_num_data(vio_stream_id)
first_valid_index = None
for idx in range(100, 100 + min(10, num_vio_samples)):
vio_data = pilot_data_provider.get_vrs_vio_data_by_index(vio_stream_id, idx)
if vio_data is not None:
# Check if VIO data is valid (we'll import the status enums)
from projectaria_tools.core.sensor_data import VioStatus, TrackingQuality
if (vio_data.status == VioStatus.VALID and vio_data.pose_quality == TrackingQuality.GOOD):
first_valid_index = idx
break
if first_valid_index is not None:
vio_data = pilot_data_provider.get_vrs_vio_data_by_index(vio_stream_id, first_valid_index)
print("=" * 50)
print(f"First VALID VIO Data Sample (Index: {first_valid_index})")
print("=" * 50)
# Session Information
print(f"Session UID: {vio_data.frontend_session_uid}")
print(f"Frame ID: {vio_data.frame_id}")
# Timestamps
print(f"Capture Time: {vio_data.capture_timestamp_ns} ns")
print(f"Unix Time: {vio_data.unix_timestamp_ns} ns")
# Quality Status
print(f"Status: {vio_data.status}")
print(f"Pose Quality: {vio_data.pose_quality}")
print(f"Visual Quality: {vio_data.visual_tracking_quality}")
# Transforms
print(f"Transform Odometry → Body IMU:\n{vio_data.transform_odometry_bodyimu.to_matrix()}")
print(f"Transform Body IMU → Device:\n{vio_data.transform_bodyimu_device.to_matrix()}")
# Motion
print(f"Linear Velocity: {vio_data.linear_velocity_in_odometry}")
print(f"Angular Velocity: {vio_data.angular_velocity_in_bodyimu}")
print(f"Gravity Vector: {vio_data.gravity_in_odometry}")
else:
print("⚠️ No valid VIO sample found")
# Sample VIO High Frequency data (if available)
if vio_high_freq_stream_id is not None:
print("\n=== VIO High-Frequency Data Sample ===")
# Find the first VIO high_frequency data sample with high quality value
num_vio_high_freq_samples = pilot_data_provider.get_vrs_num_data(vio_high_freq_stream_id)
first_valid_index = None
for idx in range(min(10, num_vio_high_freq_samples)):
vio_high_freq_data = pilot_data_provider.get_vrs_vio_high_freq_data_by_index(vio_high_freq_stream_id, idx)
if vio_high_freq_data is not None and vio_high_freq_data.quality_score > 0.5:
first_valid_index = idx
break
if first_valid_index is not None:
vio_high_freq_data = pilot_data_provider.get_vrs_vio_high_freq_data_by_index(vio_high_freq_stream_id, first_valid_index)
print("=" * 50)
print(f"First VIO High Freq Data Sample with good quality score (Index: {first_valid_index})")
print("=" * 50)
# Timestamps, convert timedelta to nanoseconds
capture_timestamp_ns = int(vio_high_freq_data.tracking_timestamp.total_seconds() * 1e9)
# Session Information
print(f"Session UID: {vio_high_freq_data.session_uid}")
# Timestamps
print(f"Tracking Time: {capture_timestamp_ns} ns")
# Quality
print(f"Quality Score: {vio_high_freq_data.quality_score:.3f}")
# Transform
print(f"Transform Odometry → Device:\n{vio_high_freq_data.transform_odometry_device.to_matrix()}")
# Motion
print(f"Linear Velocity: {vio_high_freq_data.device_linear_velocity_odometry}")
print(f"Angular Velocity: {vio_high_freq_data.angular_velocity_device}")
print(f"Gravity Vector: {vio_high_freq_data.gravity_odometry}")
else:
print("⚠️ No valid VIO high frequency sample found")
VIO Trajectory Visualization
The following code demonstrates how to visualize a VIO trajectory in a 3D view, showing the device's movement through space over time.
# VIO trajectory visualization using sequential data access (matching Tutorial 5 approach)
if vio_stream_id is not None:
print("\n=== Visualizing on-device VIO trajectory in 3D view ===")
# Initialize Rerun for 3D visualization
rr.init("rerun_viz_vio_trajectory")
rr.notebook_show()
# Get device calibration for glasses outline
device_calib = pilot_data_provider.get_vrs_device_calibration()
# Get time bounds for VIO data
first_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(vio_stream_id, TimeDomain.DEVICE_TIME)[0]
last_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(vio_stream_id, TimeDomain.DEVICE_TIME)[-1]
# Play for only 3 seconds
total_length_ns = last_timestamp_ns - first_timestamp_ns
skip_begin_ns = int(15 * 1e9) # Skip 15 seconds
duration_ns = int(3 * 1e9) # 3 seconds
skip_end_ns = max(total_length_ns - skip_begin_ns - duration_ns, 0)
start_time_ns = first_timestamp_ns + skip_begin_ns
end_time_ns = start_time_ns + duration_ns
print(f"Visualizing VIO trajectory from {start_time_ns} to {end_time_ns} ns")
# Plot VIO trajectory in 3D view using sequential approach
# Need to keep a cache to store already-loaded trajectory
vio_traj_cached_full = []
valid_vio_count = 0
# Get VIO timestamps in the time window
all_vio_timestamps = pilot_data_provider.get_vrs_timestamps_ns(vio_stream_id, TimeDomain.DEVICE_TIME)
window_timestamps = [ts for ts in all_vio_timestamps if start_time_ns <= ts <= end_time_ns]
print(f"Processing {len(window_timestamps)} VIO samples in time window")
for timestamp_ns in window_timestamps:
# Query VIO data by timestamp
vio_data = pilot_data_provider.get_vrs_vio_data_by_time_ns(
vio_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if vio_data is not None:
# Check VIO data validity, only plot for valid data
if (vio_data.status == VioStatus.VALID and vio_data.pose_quality == TrackingQuality.GOOD):
# Set timestamp
rr.set_time_nanos("device_time", vio_data.capture_timestamp_ns)
# Set and plot the Device pose for the current timestamp, as a RGB axis
T_World_Device = (
vio_data.transform_odometry_bodyimu @ vio_data.transform_bodyimu_device
)
# Plot device pose as coordinate frame
rr.log(
"world/device",
ToTransform3D(
T_World_Device,
axis_length=0.05,
),
)
# Also plot Aria glass outline for visualization
aria_glasses_point_outline = AriaGlassesOutline(
device_calib, use_cad_calib=True
)
rr.log(
"world/device/glasses_outline",
rr.LineStrips3D(
aria_glasses_point_outline,
colors=[200,200,200],
radii=5e-4,
),
)
# Plot gravity direction vector
rr.log(
"world/vio_gravity",
rr.Arrows3D(
origins=[T_World_Device.translation()[0]],
vectors=[
vio_data.gravity_in_odometry * 1e-2
], # length converted from 9.8 meter -> 10 cm
colors=[101,67,33],
radii=1.5e-3,
),
static=False,
)
# Plot VIO trajectory that are cached so far
vio_traj_cached_full.append(T_World_Device.translation()[0])
rr.log(
"world/vio_trajectory",
rr.LineStrips3D(
vio_traj_cached_full,
colors=[173, 216, 255],
radii=1.5e-3,
),
static=False,
)
valid_vio_count += 1
else:
print(f"VIO data is invalid for timestamp {timestamp_ns}")
print(f"Visualized {valid_vio_count} valid VIO poses out of {len(window_timestamps)} samples")
else:
print("Skipping VIO trajectory visualization - no VIO data available.")
VIO High Frequency Visualization
VIO High Frequency data provides much higher temporal resolution (~800Hz) compared to regular VIO data. This section demonstrates how to visualize the high-frequency trajectory data.
# VIO High Frequency trajectory visualization (if available)
if vio_high_freq_stream_id is not None:
print("\n=== Visualizing VIO High Frequency trajectory ===")
# Initialize Rerun for high frequency visualization
rr.init("rerun_viz_vio_high_freq_trajectory")
rr.notebook_show()
# Get device calibration for glasses outline
device_calib = pilot_data_provider.get_vrs_device_calibration()
# Get time bounds for VIO high frequency data
first_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(vio_high_freq_stream_id, TimeDomain.DEVICE_TIME)[0]
last_timestamp_ns = pilot_data_provider.get_vrs_timestamps_ns(vio_high_freq_stream_id, TimeDomain.DEVICE_TIME)[-1]
# Play for only 2 seconds (shorter duration due to high frequency)
total_length_ns = last_timestamp_ns - first_timestamp_ns
skip_begin_ns = int(15 * 1e9) # Skip 15 seconds
duration_ns = int(2 * 1e9) # 2 seconds
skip_end_ns = max(total_length_ns - skip_begin_ns - duration_ns, 0)
start_time_ns = first_timestamp_ns + skip_begin_ns
end_time_ns = start_time_ns + duration_ns
print(f"Visualizing VIO High Frequency trajectory from {start_time_ns} to {end_time_ns} ns")
# Get VIO high frequency timestamps in the time window
all_vio_hf_timestamps = pilot_data_provider.get_vrs_timestamps_ns(vio_high_freq_stream_id, TimeDomain.DEVICE_TIME)
window_timestamps = [ts for ts in all_vio_hf_timestamps if start_time_ns <= ts <= end_time_ns]
# Subsample for performance (every 10th sample due to high frequency)
subsampled_timestamps = window_timestamps[::10]
print(f"Processing {len(subsampled_timestamps)} VIO High Frequency samples (subsampled from {len(window_timestamps)})")
# Plot VIO High Frequency trajectory in 3D view
vio_hf_traj_cached_full = []
valid_vio_hf_count = 0
for timestamp_ns in subsampled_timestamps:
# Query VIO high frequency data by timestamp
vio_hf_data = pilot_data_provider.get_vrs_vio_high_freq_data_by_time_ns(
vio_high_freq_stream_id, timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
if vio_hf_data is not None and vio_hf_data.quality_score > 0.5:
# Set timestamp
rr.set_time_nanos("device_time", int(vio_hf_data.tracking_timestamp.total_seconds() * 1e9))
# Plot device pose as coordinate frame
T_World_Device = vio_hf_data.transform_odometry_device
rr.log(
"world/device",
ToTransform3D(
T_World_Device,
axis_length=0.03, # Smaller axis for high frequency data
),
)
# Also plot Aria glass outline for visualization
aria_glasses_point_outline = AriaGlassesOutline(
device_calib, use_cad_calib=True
)
rr.log(
"world/device/glasses_outline",
rr.LineStrips3D(
aria_glasses_point_outline,
colors=[150,150,150], # Slightly different color for high frequency
radii=3e-4,
),
)
# Plot gravity direction vector
rr.log(
"world/vio_hf_gravity",
rr.Arrows3D(
origins=[T_World_Device.translation()[0]],
vectors=[
vio_hf_data.gravity_odometry * 1e-2
], # length converted from 9.8 meter -> 10 cm
colors=[67,101,33], # Different color for high frequency
radii=1e-3,
),
static=False,
)
# Plot VIO High Frequency trajectory that are cached so far
vio_hf_traj_cached_full.append(T_World_Device.translation()[0])
rr.log(
"world/vio_hf_trajectory",
rr.LineStrips3D(
vio_hf_traj_cached_full,
colors=[255, 173, 216], # Different color for high frequency
radii=1e-3,
),
static=False,
)
valid_vio_hf_count += 1
print(f"Visualized {valid_vio_hf_count} valid VIO High Frequency poses out of {len(subsampled_timestamps)} samples")
else:
print("Skipping VIO High Frequency trajectory visualization - no VIO High Frequency data available.")