Skip to main content

Tutorial 7: Loading and Visualizing MPS Output Data

Introduction

This tutorial demonstrates how to access and visualize Machine Perception Services (MPS) results. MPS provides cloud-based processing of Aria data to generate high-quality 3D reconstruction, SLAM trajectories, and other perception outputs.

What you'll learn:

  • How to load and access MPS SLAM trajectory data (open loop and closed loop)
  • How to load and visualize MPS semi-dense point clouds and observations
  • How to load and access MPS hand tracking results
  • How to create 3D visualizations of MPS results

Prerequisites

  • Complete Tutorial 1 (VrsDataProvider Basics) to understand basic data provider concepts
  • Complete Tutorial 2 (Device Calibration) to understand how to properly use calibration in Aria data.
  • Download Aria Gen2 sample data: VRS and MPS output zip file.

Note on Visualization If visualization window is not showing up, this is due to Rerun lib's caching issue. Just rerun the specific code cell.

import os
from projectaria_tools.core import mps

# Set up paths to your MPS data
mps_folder_path = "path/to/your/mps/folder/"
vrs_file_path = "path/to/your/recording.vrs"

# Load VRS data provider for additional context
vrs_data_provider = data_provider.create_vrs_data_provider(vrs_file_path)

MPS SLAM Trajectories

Understanding Open Loop vs Closed Loop Trajectories

MPS SLAM algorithm outputs 2 trajectory files (see wiki page for data type definitions):

  • Open loop trajectory: High-frequency (1kHz) odometry from visual-inertial odometry (VIO), accurate over short periods but drifts over time and distance.
  • Closed loop trajectory: High-frequency (1kHz) pose from mapping with loop closure corrections, reducing drift but possibly less accurate locally over short spans.

Loading Closed Loop Trajectory

from projectaria_tools.core.mps.utils import (
filter_points_from_confidence,
get_nearest_pose,
)

print("=== MPS - Closed loop trajectory ===")

# Load MPS closed-loop trajectory data
closed_loop_trajectory_file = os.path.join(
mps_folder_path, "slam", "closed_loop_trajectory.csv"
)
closed_loop_trajectory = mps.read_closed_loop_trajectory(closed_loop_trajectory_file)

# Print out the content of the first sample in closed_loop_trajectory
if closed_loop_trajectory:
sample = closed_loop_trajectory[0]
print("ClosedLoopTrajectoryPose sample:")
print(f" tracking_timestamp: {int(sample.tracking_timestamp.total_seconds() * 1e6)} us")
print(f" utc_timestamp: {int(sample.utc_timestamp.total_seconds() * 1e6)} us")
print(f" transform_world_device:\n{sample.transform_world_device}")
print(f" device_linear_velocity_device: {sample.device_linear_velocity_device}")
print(f" angular_velocity_device: {sample.angular_velocity_device}")
print(f" quality_score: {sample.quality_score}")
print(f" gravity_world: {sample.gravity_world}")
print(f" graph_uid: {sample.graph_uid}")
else:
print("closed_loop_trajectory is empty.")

Loading Open Loop Trajectory

print("=== MPS - Open loop trajectory ===")

# Load MPS open-loop trajectory data
open_loop_trajectory_file = os.path.join(
mps_folder_path, "slam", "open_loop_trajectory.csv"
)
open_loop_trajectory = mps.read_open_loop_trajectory(open_loop_trajectory_file)

# Print out the content of the first sample in open_loop_trajectory
if open_loop_trajectory:
sample = open_loop_trajectory[0]
print("OpenLoopTrajectoryPose sample:")
print(f" tracking_timestamp: {int(sample.tracking_timestamp.total_seconds() * 1e6)} us")
print(f" utc_timestamp: {int(sample.utc_timestamp.total_seconds() * 1e6)} us")
print(f" transform_odometry_device:\n{sample.transform_odometry_device}")
print(f" device_linear_velocity_odometry: {sample.device_linear_velocity_odometry}")
print(f" angular_velocity_device: {sample.angular_velocity_device}")
print(f" quality_score: {sample.quality_score}")
print(f" gravity_odometry: {sample.gravity_odometry}")
print(f" session_uid: {sample.session_uid}")
else:
print("open_loop_trajectory is empty.")

MPS Semi-dense Point Cloud and Observations

Understanding Point Cloud Data

MPS SLAM algorithm outputs 2 files related to semi-dense point cloud (see wiki page for data type definitions):

  • semidense_points.csv.gz: Global points in the world coordinate frame.
  • semidense_observations.csv.gz: Point observations for each camera, at each timestamp.

Note that semidense point files are normally large, therefore loading them may take some time.

Loading Semi-dense Point Cloud

print("=== MPS - Semi-dense Point Cloud ===")

# Load MPS semi-dense point cloud data
semidense_points_file = os.path.join(
mps_folder_path, "slam", "semidense_points.csv.gz"
)
semidense_points = mps.read_global_point_cloud(semidense_points_file)

# Print out the content of the first sample in semidense_points
if semidense_points:
sample = semidense_points[0]
print("GlobalPointPosition sample:")
print(f" uid: {sample.uid}")
print(f" graph_uid: {sample.graph_uid}")
print(f" position_world: {sample.position_world}")
print(f" inverse_distance_std: {sample.inverse_distance_std}")
print(f" distance_std: {sample.distance_std}")
print(f"Total number of semi-dense points: {len(semidense_points)}")
else:
print("semidense_points is empty.")

# Filter semidense points by inv_dep or depth.
# The filter will KEEP points with (inv_dep or depth < threshold)
filtered_semidense_points = filter_points_from_confidence(raw_points = semidense_points, threshold_invdep = 1e-3, threshold_dep = 5e-2)
print(f"Filtering semidense points from a total of {len(semidense_points)} points down to {len(filtered_semidense_points)}")

Loading Point Observations

print("=== MPS - Semi-dense Point Observations ===")

# Load MPS semi-dense point observations data
semidense_observations_file = os.path.join(
mps_folder_path, "slam", "semidense_observations.csv.gz"
)
semidense_observations = mps.read_point_observations(semidense_observations_file)

# Print out the content of the first sample in semidense_observations
if semidense_observations:
sample = semidense_observations[0]
print("PointObservation sample:")
print(f" point_uid: {sample.point_uid}")
print(f" frame_capture_timestamp: {int(sample.frame_capture_timestamp.total_seconds() * 1e6)} us")
print(f" camera_serial: {sample.camera_serial}")
print(f" uv: {sample.uv}")
print(f"Total number of point observations: {len(semidense_observations)}")
else:
print("semidense_observations is empty.")

MPS Hand Tracking

MPS Hand Tracking augments each sequence with offline hand pose estimates, adding left and right hand tracking results wherever they are detected.

Hand Tracking Outputs

  • 21 landmarks per detected hand expressed in the device coordinate frame
  • Wrist-to-device transform providing the full 6DoF wrist pose
  • Palm and wrist normals for reasoning about hand orientation
  • Confidence scores indicating tracking quality for each hand

Loading Hand Tracking Results

print("=== MPS - Hand Tracking ===")

hand_tracking_results_file = os.path.join(
mps_folder_path, "hand_tracking", "hand_tracking_results.csv"
)
hand_tracking_results = mps.hand_tracking.read_hand_tracking_results(
hand_tracking_results_file
)

if hand_tracking_results:
if len(hand_tracking_results) > 10:
sample = hand_tracking_results[10] # get stable hand tracking result, since first hand tracking result in example vrs might be empty
else:
sample = hand_tracking_results[0]
sample_ts_us = int(sample.tracking_timestamp.total_seconds() * 1e6)
print(f"Sample tracking timestamp: {sample_ts_us} us")
print(f"Total number of hand tracking results: {len(hand_tracking_results)}")

for handedness, hand in (("Left", sample.left_hand), ("Right", sample.right_hand)):
if hand is None:
print(f" {handedness} hand: not available in this sample")
continue

landmarks = hand.landmark_positions_device
landmark_count = len(landmarks) if landmarks is not None else 0
wrist_position = hand.get_wrist_position_device()
palm_position = hand.get_palm_position_device()
print(f" {handedness} hand confidence: {hand.confidence:.2f}")
print(f" {handedness} hand landmark count: {landmark_count}")
print(f" {handedness} wrist position (device frame): {wrist_position}")
print(f" {handedness} palm position (device frame): {palm_position}")
else:
print("hand_tracking_results is empty.")

Visualizing MPS Results

In the following snippets, we combine the MPS SLAM outputs and hand tracking results to build an interactive 3D visualization in Rerun.

Color Mapping Helper Function

import numpy as np

# A helper coloring function
def color_from_zdepth(z_depth_m: float) -> np.ndarray:
"""
Map z-depth (meters, along the camera's forward axis) to a bright Viridis-like RGB color.
- If z_depth_m <= 0 (point is behind the camera), return black [0, 0, 0].
- Near (0.2 m) -> yellow, Far (5.0 m) -> purple.
Returns an array of shape (3,) with dtype=uint8.
"""
if not np.isfinite(z_depth_m) or z_depth_m <= 0.0:
return np.array([0, 0, 0], dtype=np.uint8)

NEAR_METERS, FAR_METERS = 0.2, 5.0

# Normalize to [0,1], then flip so near → bright (yellow), far → dark (purple)
clamped = min(max(float(z_depth_m), NEAR_METERS), FAR_METERS)
normalized_position = (clamped - NEAR_METERS) / (FAR_METERS - NEAR_METERS + 1e-12)
gradient_position = 1.0 - normalized_position

# Viridis-like anchor colors: purple → blue → teal → green → yellow
color_stops = [
(68, 1, 84),
(59, 82, 139),
(33, 145, 140),
(94, 201, 98),
(253, 231, 37),
]

# Locate segment and blend between its endpoints
segment_count = len(color_stops) - 1
continuous_index = gradient_position * segment_count
lower_segment_index = int(continuous_index)

if lower_segment_index >= segment_count:
red, green, blue = color_stops[-1]
else:
segment_fraction = continuous_index - lower_segment_index
r0, g0, b0 = color_stops[lower_segment_index]
r1, g1, b1 = color_stops[lower_segment_index + 1]
red = r0 + segment_fraction * (r1 - r0)
green = g0 + segment_fraction * (g1 - g0)
blue = b0 + segment_fraction * (b1 - b0)

return np.array([int(red), int(green), int(blue)], dtype=np.uint8)

Preparing SLAM Data for Visualization

from collections import defaultdict
from itertools import islice
import numpy as np

print("=== Preparing MPS SLAM results for visualization ===")

# Check if we have valid SLAM data to visualize
if not closed_loop_trajectory or not semidense_points:
raise RuntimeError("Warning: This tutorial requires valid MPS SLAM data to run.")

# -----------
# Prepare Trajectory data
# -----------
# Select a short segment of trajectory (e.g., first 5000 samples, subsampled by 50)
segment_length = min(50000, len(closed_loop_trajectory))
trajectory_segment = closed_loop_trajectory[:segment_length:50]
timestamp_to_pose = {
pose.tracking_timestamp: pose for pose in trajectory_segment
}
print(f"Finished preparing a trajectory of length {len(trajectory_segment)}... ")

# -----------
# Prepare Semidense point data
# -----------
# Filter the semidense point cloud by confidence and limit max point count, and extract the point positions
filtered_semidense_point_cloud_data = filter_points_from_confidence(semidense_points)
points_positions = np.array(
[
point.position_world for point in filtered_semidense_point_cloud_data
]
)
print(f"Finished preparing filtered semidense points cloud of {len(filtered_semidense_point_cloud_data)} points... ")

# -----------
# Prepare Semidense observation data
# -----------
# Based on RGB observations, create a per-timestamp point position list, and color them according to distance from the RGB camera
point_uid_to_position = {
point.uid: np.array(point.position_world) for point in filtered_semidense_point_cloud_data
}

# A helper function that creates an easier-to-query mapping to obtain observations according to timestamps
slam_1_serial = vrs_data_provider.get_device_calibration().get_camera_calib("slam-front-left").get_serial_number()
timestamp_to_point_positions = defaultdict(list) # t_ns -> [position, position, ...]
timestamp_to_point_colors = defaultdict(list) # t_ns -> [color, color, ...]

for obs in semidense_observations:
# Only add observations for SLAM_1 camera, and if the timestamp is in the chosen trajectory segment
if (
obs.camera_serial == slam_1_serial
and obs.frame_capture_timestamp in timestamp_to_pose
and obs.point_uid in point_uid_to_position
):
# Insert point position
obs_timestamp = obs.frame_capture_timestamp
point_position = point_uid_to_position[obs.point_uid]
timestamp_to_point_positions[obs_timestamp].append(point_position)

# Insert point color
T_world_device = timestamp_to_pose[obs_timestamp].transform_world_device
point_in_device = T_world_device.inverse() @ point_position
point_z_depth = point_in_device.squeeze()[2]
point_color = color_from_zdepth(point_z_depth)
timestamp_to_point_colors[obs_timestamp].append(point_color)

print("Finished preparing semidense points observations: ")
for timestamp, points in islice(timestamp_to_point_positions.items(), 5):
print(f"\t timestamp {int(timestamp.total_seconds() * 1e9)} ns has {len(points)} observed points in slam-front-left view. ")
print("\t ...")

Prepare Hand Tracking Data

print("=== Preparing MPS Hand Tracking data ===")

hand_tracking_results_segment: list[mps.hand_tracking.HandTrackingResult] = []

if not hand_tracking_results:
print("No hand tracking results loaded; visualization will skip hands.")
else:
if trajectory_segment:
segment_start_ns = int(
trajectory_segment[0].tracking_timestamp.total_seconds() * 1e9
)
segment_end_ns = int(
trajectory_segment[-1].tracking_timestamp.total_seconds() * 1e9
)

hand_tracking_results_segment = [
result
for result in hand_tracking_results
if segment_start_ns
<= int(result.tracking_timestamp.total_seconds() * 1e9)
<= segment_end_ns
]

if hand_tracking_results_segment:
print(
"Finished preparing hand tracking results: "
f"{len(hand_tracking_results_segment)} samples overlap with the trajectory segment."
)
else:
hand_tracking_results_segment = hand_tracking_results
print(
"Hand tracking results do not overlap with the selected trajectory segment; "
"visualization will reuse the nearest available hand pose."
)

3D Visualization with Rerun

import rerun as rr
import numpy as np
from projectaria_tools.utils.rerun_helpers import (
AriaGlassesOutline,
ToTransform3D,
ToBox3D,
create_hand_skeleton_from_landmarks,
)
from projectaria_tools.core.mps.utils import (
filter_points_from_confidence,
get_nearest_pose,
)

HAND_COLORS = {
"left": np.array([102, 204, 255], dtype=np.uint8),
"right": np.array([255, 153, 102], dtype=np.uint8),
}
HAND_LANDMARK_RADIUS = 1e-2
HAND_KEYPOINT_RADIUS = 1.5e-2
HAND_SKELETON_RADIUS = 7e-3


def log_hand_tracking_result(hand_result):
rr.log("world/device/hand-tracking", rr.Clear.recursive())
if hand_result is None:
return

for label in ("left", "right"):
hand = getattr(hand_result, f"{label}_hand")
if hand is None or hand.landmark_positions_device is None:
continue

landmarks = np.array(hand.landmark_positions_device, dtype=np.float32)
if landmarks.size == 0:
continue

color = HAND_COLORS[label]
colors = np.repeat(color[np.newaxis, :], landmarks.shape[0], axis=0)

rr.log(
f"world/device/hand-tracking/{label}/landmarks",
rr.Points3D(
positions=landmarks,
colors=colors,
radii=HAND_LANDMARK_RADIUS,
),
)

skeleton = create_hand_skeleton_from_landmarks(landmarks)
if skeleton:
rr.log(
f"world/device/hand-tracking/{label}/skeleton",
rr.LineStrips3D(
skeleton,
colors=[HAND_COLORS[label].tolist()] * len(skeleton),
radii=HAND_SKELETON_RADIUS,
),
)

wrist = hand.get_wrist_position_device()
palm = hand.get_palm_position_device()
keypoints = [
np.array(position, dtype=np.float32)
for position in (wrist, palm)
if position is not None
]
if keypoints:
rr.log(
f"world/device/hand-tracking/{label}/keypoints",
rr.Points3D(
positions=keypoints,
colors=np.repeat(color[np.newaxis, :], len(keypoints), axis=0),
radii=HAND_KEYPOINT_RADIUS,
),
)


print("=== Visualizing MPS Results in 3D ===")

# Initialize Rerun
rr.init("MPS Visualization")

# Set up the 3D scene
rr.log("world", rr.ViewCoordinates.RIGHT_HAND_Z_UP, static=True)

# Log point cloud
rr.log(
"world/semidense_points",
rr.Points3D(
positions=points_positions,
colors=[255, 255, 255, 125],
radii=0.001
),
static=True
)

# Aria glass outline for visualization purpose
device_calib = vrs_data_provider.get_device_calibration()
aria_glasses_point_outline = AriaGlassesOutline(
device_calib, use_cad_calib=True
)

# Plot Closed loop trajectory
closed_loop_traj_cached_full = []
observation_points_cached = None
observation_colors_cached = None
hand_tracking_timestamps_ns = [
int(result.tracking_timestamp.total_seconds() * 1e9)
for result in hand_tracking_results_segment
]
hand_tracking_index = 0
latest_hand_tracking_result = None

for closed_loop_pose in trajectory_segment:
capture_timestamp_ns = int(closed_loop_pose.tracking_timestamp.total_seconds() * 1e9)
rr.set_time_nanos("device_time", capture_timestamp_ns)

if hand_tracking_results_segment:
while (
hand_tracking_index < len(hand_tracking_results_segment)
and hand_tracking_timestamps_ns[hand_tracking_index] <= capture_timestamp_ns
):
latest_hand_tracking_result = hand_tracking_results_segment[hand_tracking_index]
hand_tracking_index += 1
else:
latest_hand_tracking_result = None

T_world_device = closed_loop_pose.transform_world_device

# Log device pose as a coordinate frame
rr.log(
"world/device",
ToTransform3D(
T_world_device,
axis_length=0.05,
),
)

log_hand_tracking_result(latest_hand_tracking_result)

# Plot Aria glass outline
rr.log(
"world/device/glasses_outline",
rr.LineStrips3D(
aria_glasses_point_outline,
colors=[150,200,40],
radii=5e-3,
),
)

# Plot gravity direction vector
rr.log(
"world/vio_gravity",
rr.Arrows3D(
origins=[T_world_device.translation()[0]],
vectors=[
closed_loop_pose.gravity_world * 1e-2
], # length converted from 9.8 meter -> 10 cm
colors=[101,67,33],
radii=5e-3,
),
static=False,
)

# Update cached results for observations. Cache is needed because observation has a much lower freq than high-freq trajectory.
if closed_loop_pose.tracking_timestamp in timestamp_to_point_positions.keys():
observation_points_cached = timestamp_to_point_positions[closed_loop_pose.tracking_timestamp]
observation_colors_cached = timestamp_to_point_colors[closed_loop_pose.tracking_timestamp]
if observation_points_cached is not None:
rr.log(
"world/semidense_observations",
rr.Points3D(
positions=observation_points_cached,
colors=observation_colors_cached,
radii=0.01
),
static=False
)

# Plot the entire VIO trajectory that are cached so far
closed_loop_traj_cached_full.append(T_world_device.translation()[0])
rr.log(
"world/vio_trajectory",
rr.LineStrips3D(
closed_loop_traj_cached_full,
colors=[173, 216, 255],
radii=5e-3,
),
static=False,
)

rr.notebook_show()

Understanding MPS Data Structures

Trajectory Data Types

ClosedLoopTrajectoryPose

  • tracking_timestamp: Device timestamp when pose was computed
  • utc_timestamp: UTC timestamp
  • transform_world_device: 6DOF pose in world coordinate frame
  • device_linear_velocity_device: Linear velocity in device frame
  • angular_velocity_device: Angular velocity in device frame
  • quality_score: Pose estimation quality (higher = better)
  • gravity_world: Gravity vector in world frame
  • graph_uid: Unique identifier for the pose graph

OpenLoopTrajectoryPose

  • tracking_timestamp: Device timestamp when pose was computed
  • utc_timestamp: UTC timestamp
  • transform_odometry_device: 6DOF pose in odometry coordinate frame
  • device_linear_velocity_odometry: Linear velocity in odometry frame
  • angular_velocity_device: Angular velocity in device frame
  • quality_score: Pose estimation quality (higher = better)
  • gravity_odometry: Gravity vector in odometry frame
  • session_uid: Unique identifier for the session

Point Cloud Data Types

GlobalPointPosition

  • uid: Unique identifier for the 3D point
  • graph_uid: Identifier linking point to pose graph
  • position_world: 3D position in world coordinate frame
  • inverse_distance_std: Inverse distance standard deviation (quality metric)
  • distance_std: Distance standard deviation (quality metric)

PointObservation

  • point_uid: Links observation to 3D point
  • frame_capture_timestamp: When the observation was captured
  • camera_serial: Serial number of the observing camera
  • uv: 2D pixel coordinates of the observation

MPS vs On-Device Comparisons

Key Differences

AspectOn-Device (VIO/SLAM)MPS SLAM
ProcessingReal-time during recordingCloud-based post-processing
AccuracyGood for real-time useHigher accuracy with global optimization
Frequency20Hz (VIO), 800Hz (high-freq)1kHz (both open/closed loop)
DriftAccumulates over timeMinimized with loop closure
Point CloudNot availableDense semi-dense reconstructions
Coordinate FrameOdometry frameGlobal world frame

Use Cases

  • On-Device Data: Real-time applications, live feedback, immediate processing
  • MPS Data: High-quality reconstruction, research analysis, detailed mapping

Summary

This tutorial covered the essential aspects of working with MPS data:

  • Trajectory Access: Loading both open loop and closed loop trajectories
  • Point Cloud Data: Accessing semi-dense 3D reconstructions and observations
  • Data Filtering: Using confidence thresholds to improve point cloud quality
  • Hand Tracking: Loading offline MPS hand tracking results and inspecting landmarks, wrist poses, and confidences
  • 3D Visualization: Creating combined visualizations with trajectories, point clouds, and hand tracking results
  • Data Structures: Understanding the comprehensive MPS data formats

MPS provides high-quality, globally consistent 3D reconstructions that are ideal for:

  • Research Applications: Detailed spatial analysis and mapping
  • 3D Reconstruction: High-fidelity environmental modeling
  • Motion Analysis: Accurate trajectory analysis without drift
  • Multi-modal Studies: Combining precise 3D data with sensor information
  • Benchmarking: Comparing against ground truth for algorithm development