Aria Gen2 Pilot Dataset Tutorial - MPS Data Loading
This tutorial demonstrates how to load and visualize MPS data from the Aria Gen2 Pilot Dataset using the AriaGen2PilotDataProvider.
Introduction
Machine Perception Services, or MPS, is a post-processing cloud service that we provide to Aria users. It runs a set of proprietary Spatial AI machine perception algorithms that are designed for Project Aria glasses. MPS is designed to provide superior accuracy and robustness compared to off-the-shelf open algorithms.
Currently, the supported MPS algorithms for Aria Gen2 include:
- SLAM: Single Sequence Trajectory and Semi-dense point cloud generation.
- Hand Tracking: 21 landmarks, wrist to device transformation, wrist and palm positions and normals.
This tutorial focuses on demonstrating how to load and visualize the MPS results.
What You'll Learn
- How to load MPS output data, and definitions of the data types.
- How to visualize the MPS data
- Understanding the difference between MPS and on-device perception outputs
Dataset Structure
The Aria Gen2 Pilot Dataset comprises four primary data content types:
- Raw sensor streams acquired directly from Aria Gen2 devices
- Real-time machine perception outputs generated on-device via embedded algorithms during data collection
- Offline machine perception results produced by Machine Perception Services (MPS) during post-processing (focus of this tutorial)
- Outputs from additional offline perception algorithms
Each sequence folder contains an mps/ directory with the following structure:
mps/
├── slam/
│ ├── closed_loop_trajectory.csv
│ ├── open_loop_trajectory.csv
│ ├── semidense_observations.csv.gz
│ ├── semidense_points.csv.gz
│ ├── online_calibration.jsonl
│ └── summary.json
└── hand_tracking/
├── hand_tracking_results.csv
└── summary.json
Initialize Data Provider
⚠️ Important: Update the sequence_path below to point to your downloaded Aria Gen2 Pilot Dataset sequence folder.
import numpy as np
from aria_gen2_pilot_dataset import AriaGen2PilotDataProvider
from projectaria_tools.core.sensor_data import TimeDomain
from projectaria_tools.core import mps
from aria_gen2_pilot_dataset.visualization.plot_style import get_plot_style, PlotEntity, PlotStyle
from projectaria_tools.utils.rerun_helpers import (
create_hand_skeleton_from_landmarks,
ToTransform3D,
)
import rerun as rr
import rerun.blueprint as rrb
# TODO: Update this path to your dataset location
sequence_path = "path/to/your/sequence_folder"
pilot_data_provider = AriaGen2PilotDataProvider(sequence_path)
MPS - SLAM
[MPS - SLAM] Output Files
MPS output result files are categorized into sub-folders by algorithm. For SLAM algorithm output, it generates the following files:
closed_loop_trajectory.csvopen_loop_trajectory.csvsemidense_observations.csv.gzsemidense_points.csv.gzonline_calibration.jsonlsummary.json
Please refer to the MPS Wiki page for details of each file.
[MPS - SLAM] Semi-dense Point Cloud and Observations
The MPS SLAM algorithm outputs 2 files related to semi-dense point cloud (see wiki page for data type definitions):
semidense_points.csv.gz: Global points in the world coordinate frame.semidense_observations.csv.gz: Point observations for each camera, at each timestamp.
Note that semidense point files are normally large, therefore loading them may take some time.
Point Cloud Data Types
GlobalPointPosition contains:
uid: Unique identifier for the 3D pointgraph_uid: Identifier linking point to pose graphposition_world: 3D position in world coordinate frameinverse_distance_std: Inverse distance standard deviation (quality metric)distance_std: Distance standard deviation (quality metric)
PointObservation contains:
point_uid: Links observation to 3D pointframe_capture_timestamp: When the observation was capturedcamera_serial: Serial number of the observing camerauv: 2D pixel coordinates of the observation
Filtering Point Cloud by Confidence
You can filter the semi-dense point cloud using confidence thresholds based on inverse_distance_std or distance_std to improve quality. The AriaGen2PilotDataProvider provides a convenient method get_mps_semidense_point_cloud_filtered() for this purpose.
print("=== MPS - Semi-dense Point Cloud ===")
semi_dense_point_cloud = pilot_data_provider.get_mps_semidense_point_cloud()
semi_dense_point_cloud_filtered = pilot_data_provider.get_mps_semidense_point_cloud_filtered(filter_confidence=True, max_point_count=50000)
# Print out the content of the first sample in semidense_points
if semi_dense_point_cloud:
sample = semi_dense_point_cloud[0]
print("GlobalPointPosition sample:")
print(f" uid: {sample.uid}")
print(f" graph_uid: {sample.graph_uid}")
print(f" position_world: {sample.position_world}")
print(f" inverse_distance_std: {sample.inverse_distance_std}")
print(f" distance_std: {sample.distance_std}")
print(f"Total number of semi-dense points: {len(semi_dense_point_cloud)}")
print(f"Total number of filtered semi-dense points: {len(semi_dense_point_cloud_filtered)}")
else:
print("semidense_points is empty.")
[MPS - SLAM] Closed vs Open Loop Trajectory
The MPS SLAM algorithm outputs 2 trajectory files open_loop_trajectory.csv and closed_loop_trajectory.csv (see wiki page for data type definitions):
-
Open loop trajectory: High-frequency (1kHz) odometry from visual-inertial odometry (VIO), accurate over short periods but drifts over time and distance.
-
Closed loop trajectory: High-frequency (1kHz) pose from mapping with loop closure corrections, reducing drift but possibly less accurate locally over short spans. For closed loop, an additional interpolation API is provided.
Key Differences
| Aspect | Open Loop (VIO) | Closed Loop (SLAM) |
|---|---|---|
| Coordinate Frame | Odometry frame | Global world frame |
| Drift | Accumulates over time | Minimized with loop closure |
| Accuracy | Good for short periods | Higher global accuracy |
| Use Case | Real-time odometry | High-quality reconstruction |
Data Types
ClosedLoopTrajectoryPose contains:
tracking_timestamp: Device timestamp when pose was computedtransform_world_device: 6DOF pose in world coordinate framedevice_linear_velocity_device: Linear velocity in device frameangular_velocity_device: Angular velocity in device framequality_score: Pose estimation quality (higher = better)gravity_world: Gravity vector in world frame
OpenLoopTrajectoryPose contains:
tracking_timestamp: Device timestamp when pose was computedtransform_odometry_device: 6DOF pose in odometry coordinate framedevice_linear_velocity_odometry: Linear velocity in odometry frameangular_velocity_device: Angular velocity in device framequality_score: Pose estimation quality (higher = better)gravity_odometry: Gravity vector in odometry frame
print("=== MPS - query whole trajectory ===")
mps_closed_loop_trajectory = pilot_data_provider.get_mps_closed_loop_trajectory()
mps_closed_loop_trajectory_duration = mps_closed_loop_trajectory[-1].tracking_timestamp.total_seconds() - mps_closed_loop_trajectory[0].tracking_timestamp.total_seconds()
print("MPS Closed Loop Trajectory duration: " + f"{mps_closed_loop_trajectory_duration:.2f}")
mps_open_loop_trajectory = pilot_data_provider.get_mps_open_loop_trajectory()
mps_open_loop_trajectory_duration = mps_open_loop_trajectory[-1].tracking_timestamp.total_seconds() - mps_open_loop_trajectory[0].tracking_timestamp.total_seconds()
print("MPS Open Loop Trajectory duration: " + f"{mps_open_loop_trajectory_duration:.2f}")
print("\n")
print("=== MPS - query pose by timestamp ===")
query_timestamp_ns = int((mps_closed_loop_trajectory[0].tracking_timestamp.total_seconds()+1)*1e9)
print("query timestamp ns:", query_timestamp_ns, '\n')
if mps_closed_loop_trajectory_duration > 1: # If duration > 1s
nearest_mps_closed_loop_pose = pilot_data_provider.get_mps_closed_loop_pose(timestamp_ns=query_timestamp_ns, time_domain=TimeDomain.DEVICE_TIME)
print("Nearest mps closed loop pose: ", nearest_mps_closed_loop_pose, "\n")
interpolated_mps_closed_loop_pose = pilot_data_provider.get_mps_interpolated_closed_loop_pose(timestamp_ns=query_timestamp_ns, time_domain=TimeDomain.DEVICE_TIME)
print("Interpolated mps closed loop pose: ", interpolated_mps_closed_loop_pose, "\n")
if mps_open_loop_trajectory_duration > 1: # If duration > 1s
query_timestamp_ns = int((mps_open_loop_trajectory[0].tracking_timestamp.total_seconds()+1)*1e9)
nearest_mps_open_loop_pose = pilot_data_provider.get_mps_open_loop_pose(timestamp_ns=query_timestamp_ns, time_domain=TimeDomain.DEVICE_TIME)
print("Nearest mps closed loop pose: ", nearest_mps_open_loop_pose, "\n")
MPS - Hand Tracking
The MPS Hand Tracking algorithm outputs 2 files related to hand-tracking: hand_tracking_results.csv and summary.json. The MPS Hand Tracking algorithm outputs 2 files related to hand-tracking: hand_tracking_results.csv and summary.json. See wiki page for data type definitions.
Hand Tracking Features
MPS Hand Tracking provides the following outputs for each detected hand:
- 21 landmarks: Detailed hand joint positions
- Wrist to device transformation: Spatial relationship between wrist and device
- Wrist and palm positions: Key reference points for hand pose
- Palm normals: Surface orientation for interaction analysis
Query Methods
The AriaGen2PilotDataProvider offers two query methods for hand tracking data:
get_mps_hand_tracking_result(): Returns the nearest hand tracking result for a given timestampget_mps_interpolated_hand_tracking_result(): Returns interpolated hand tracking result for smoother motion analysis
print("=== MPS - query whole hand tracking results ===")
mps_hand_tracking_results = pilot_data_provider.get_mps_hand_tracking_result_list()
mps_hand_tracking_results_duration = mps_hand_tracking_results[-1].tracking_timestamp.total_seconds() - mps_hand_tracking_results[0].tracking_timestamp.total_seconds()
print("MPS hand tracking results duration: " + f"{mps_hand_tracking_results_duration:.2f}", "\n")
print("=== MPS - query hand tracking result by timestamp ===")
if mps_hand_tracking_results_duration > 1: # If duration > 1s
query_timestamp_ns = int((mps_hand_tracking_results[0].tracking_timestamp.total_seconds()+1)*1e9)
print("Query timestamp ns:", query_timestamp_ns, '\n')
nearest_mps_hand_tracking_result = pilot_data_provider.get_mps_hand_tracking_result(timestamp_ns=query_timestamp_ns, time_domain=TimeDomain.DEVICE_TIME)
print("Nearest tracking result: ", nearest_mps_hand_tracking_result, "\n")
interpolated_mps_hand_tracking_result = pilot_data_provider.get_mps_interpolated_hand_tracking_result(timestamp_ns=query_timestamp_ns, time_domain=TimeDomain.DEVICE_TIME)
print("Interpolated tracking result: ", interpolated_mps_hand_tracking_result, "\n")
Visualization
The following section shows how to plot:
- Semi-dense point cloud
- Closed loop trajectory
- Hand pose results
Visualization Notes
- Point Cloud Filtering: For better visualization performance, we filter the point cloud by confidence and limit the maximum point count
- Trajectory Caching: We accumulate trajectory points over time to visualize the full path
- Rerun Integration: We use Rerun for interactive 3D visualization with proper coordinate frames
import rerun as rr
def plot_mps_semidense_point_cloud(
point_cloud_data: list[mps.GlobalPointPosition]
) -> None:
if point_cloud_data == []:
return
points_array = np.array(
[
point.position_world
for point in point_cloud_data
if hasattr(point, "position_world")
]
)
plot_style = get_plot_style(PlotEntity.SEMI_DENSE_POINT_CLOUD)
rr.log(
f"world/{plot_style.label}",
rr.Points3D(
positions=points_array,
colors=[] * len(points_array),
radii=plot_style.plot_3d_size,
),
static=True,
)
def plot_closed_loop_pose(
closed_loop_trajectory_pose: mps.ClosedLoopTrajectoryPose,
) -> None:
"""Plot MPS closed loop trajectory"""
if not closed_loop_trajectory_pose:
return
# Get transform and add to trajectory cache
T_world_device = closed_loop_trajectory_pose.transform_world_device
closed_loop_trajectory_pose_cache.append(T_world_device.translation()[0])
# Plot device pose
rr.log(
"world/device",
ToTransform3D(T_world_device, axis_length=0.05),
)
# Plot accumulated trajectory
if len(closed_loop_trajectory_pose_cache) > 1:
plot_style = get_plot_style(PlotEntity.TRAJECTORY)
rr.log(
f"world/{plot_style.label}",
rr.LineStrips3D(
[closed_loop_trajectory_pose_cache],
colors=[plot_style.color],
radii=plot_style.plot_3d_size,
),
)
def _get_hand_plot_style(hand_label: str) -> PlotStyle:
if hand_label == "left":
landmarks_plot_entity = PlotEntity.HAND_TRACKING_LEFT_HAND_LANDMARKS
skeleton_plot_entity = PlotEntity.HAND_TRACKING_LEFT_HAND_SKELETON
else:
landmarks_plot_entity = PlotEntity.HAND_TRACKING_RIGHT_HAND_LANDMARKS
skeleton_plot_entity = PlotEntity.HAND_TRACKING_RIGHT_HAND_SKELETON
return get_plot_style(landmarks_plot_entity), get_plot_style(
skeleton_plot_entity
)
def _plot_single_hand_3d(
hand_joints_in_device: list[np.array],
hand_label: str,
) -> None:
"""
Plot single hand data in 3D and 2D camera views
"""
landmarks_style, skeleton_style = _get_hand_plot_style(hand_label=hand_label)
if hand_joints_in_device is None:
return
# Plot 3D hand markers and skeleton
hand_skeleton_3d = create_hand_skeleton_from_landmarks(hand_joints_in_device)
rr.log(
f"world/device/hand-tracking/{hand_label}/{landmarks_style.label}",
rr.Points3D(
positions=hand_joints_in_device,
colors=[landmarks_style.color],
radii=landmarks_style.plot_3d_size,
),
)
rr.log(
f"world/device/hand-tracking/{hand_label}/{skeleton_style.label}",
rr.LineStrips3D(
hand_skeleton_3d,
colors=[skeleton_style.color],
radii=skeleton_style.plot_3d_size,
),
)
def plot_mps_hand_tracking_result_3d(
hand_pose_data: mps.hand_tracking.HandTrackingResult,
) -> None:
"""
Plot hand pose data within 3D world view
"""
rr.log(
"world/device/hand-tracking",
rr.Clear.recursive(),
)
if hand_pose_data is None:
return
if hand_pose_data.left_hand is not None:
_plot_single_hand_3d(
hand_joints_in_device=hand_pose_data.left_hand.landmark_positions_device,
hand_label="left",
)
if hand_pose_data.right_hand is not None:
_plot_single_hand_3d(
hand_joints_in_device=hand_pose_data.right_hand.landmark_positions_device,
hand_label="right",
)
closed_loop_trajectory_pose_cache=[]
open_loop_trajectory_pose_cache=[]
def plot_mps():
rr.init("rerun_viz_mps")
# Create a Spatial3D view to display the points.
blueprint = rrb.Blueprint(
rrb.Spatial3DView(
origin="/",
name="3D Scene",
# Set the background color to light blue.
background=[0, 0, 0],
),
collapse_panels=True,
)
rr.notebook_show(blueprint=blueprint)
# plot semi-dense point cloud
plot_mps_semidense_point_cloud(semi_dense_point_cloud_filtered)
for hand_tracking_result in mps_hand_tracking_results:
closed_loop_pose = pilot_data_provider.get_mps_interpolated_closed_loop_pose(int(hand_tracking_result.tracking_timestamp.total_seconds()*1e9), TimeDomain.DEVICE_TIME)
# plot hand tracking result
plot_mps_hand_tracking_result_3d(hand_tracking_result)
# plot closed loop pose
plot_closed_loop_pose(closed_loop_pose)
plot_mps()
Summary
This tutorial covered the essential aspects of working with MPS data in the Aria Gen2 Pilot Dataset:
Key Takeaways
- MPS SLAM Trajectories: Understanding the difference between open loop (VIO-based, with drift) and closed loop (globally optimized, minimal drift) trajectories
- Semi-dense Point Cloud: Accessing high-quality 3D reconstructions with confidence-based filtering
- Hand Tracking: Loading and visualizing detailed hand pose data with 21 landmarks per hand
- Data Querying: Using both nearest-neighbor and interpolated queries for smooth temporal access
- 3D Visualization: Creating interactive visualizations with Rerun
MPS vs On-Device Data
| Aspect | On-Device (VIO/Tracking) | MPS (Post-processing) |
|---|---|---|
| Processing | Real-time during recording | Cloud-based offline processing |
| Accuracy | Good for real-time use | Higher accuracy with global optimization |
| Latency | Immediate availability | Requires post-processing time |
| Drift | Accumulates over time | Minimized with loop closure (SLAM) |
| Point Cloud | Not available | Dense semi-dense reconstructions |
| Coordinate Frame | Odometry/device frame | Global world frame |
| Use Cases | Live feedback, real-time apps | Research, high-quality reconstruction, benchmarking |