Tutorial 7: Loading and Visualizing MPS Output Data
Introduction
This tutorial demonstrates how to access and visualize Machine Perception Services (MPS) results. MPS provides cloud-based processing of Aria data to generate high-quality 3D reconstruction, SLAM trajectories, and other perception outputs.
What you'll learn:
- How to load and access MPS SLAM trajectory data (open loop and closed loop)
- How to load and visualize MPS semi-dense point clouds and observations
- How to create 3D visualizations of MPS SLAM results
Prerequisites
- Complete Tutorial 1 (VrsDataProvider Basics) to understand basic data provider concepts
- Complete Tutorial 2 (Device Calibration) to understand how to properly use calibration in Aria data.
- Download Aria Gen2 sample data: VRS and MPS output zip file.
Note on Visualization
If visualization window is not showing up, this is due to Rerun lib's caching issue. Just rerun the specific code cell.
import os
from projectaria_tools.core import mps
# Set up paths to your MPS data
mps_folder_path = "path/to/your/mps/folder/"
vrs_file_path = "path/to/your/recording.vrs"
# Load VRS data provider for additional context
vrs_data_provider = data_provider.create_vrs_data_provider(vrs_file_path)
MPS SLAM Trajectories
Understanding Open Loop vs Closed Loop Trajectories
MPS SLAM algorithm outputs 2 trajectory files (see wiki page for data type definitions):
- Open loop trajectory: High-frequency (1kHz) odometry from visual-inertial odometry (VIO), accurate over short periods but drifts over time and distance.
- Closed loop trajectory: High-frequency (1kHz) pose from mapping with loop closure corrections, reducing drift but possibly less accurate locally over short spans.
Loading Closed Loop Trajectory
from projectaria_tools.core.mps.utils import (
filter_points_from_confidence,
get_nearest_pose,
)
print("=== MPS - Closed loop trajectory ===")
# Load MPS closed-loop trajectory data
closed_loop_trajectory_file = os.path.join(
mps_folder_path, "slam", "closed_loop_trajectory.csv"
)
closed_loop_trajectory = mps.read_closed_loop_trajectory(closed_loop_trajectory_file)
# Print out the content of the first sample in closed_loop_trajectory
if closed_loop_trajectory:
sample = closed_loop_trajectory[0]
print("ClosedLoopTrajectoryPose sample:")
print(f" tracking_timestamp: {int(sample.tracking_timestamp.total_seconds() * 1e6)} us")
print(f" utc_timestamp: {int(sample.utc_timestamp.total_seconds() * 1e6)} us")
print(f" transform_world_device:\n{sample.transform_world_device}")
print(f" device_linear_velocity_device: {sample.device_linear_velocity_device}")
print(f" angular_velocity_device: {sample.angular_velocity_device}")
print(f" quality_score: {sample.quality_score}")
print(f" gravity_world: {sample.gravity_world}")
print(f" graph_uid: {sample.graph_uid}")
else:
print("closed_loop_trajectory is empty.")
Loading Open Loop Trajectory
print("=== MPS - Open loop trajectory ===")
# Load MPS open-loop trajectory data
open_loop_trajectory_file = os.path.join(
mps_folder_path, "slam", "open_loop_trajectory.csv"
)
open_loop_trajectory = mps.read_open_loop_trajectory(open_loop_trajectory_file)
# Print out the content of the first sample in open_loop_trajectory
if open_loop_trajectory:
sample = open_loop_trajectory[0]
print("OpenLoopTrajectoryPose sample:")
print(f" tracking_timestamp: {int(sample.tracking_timestamp.total_seconds() * 1e6)} us")
print(f" utc_timestamp: {int(sample.utc_timestamp.total_seconds() * 1e6)} us")
print(f" transform_odometry_device:\n{sample.transform_odometry_device}")
print(f" device_linear_velocity_odometry: {sample.device_linear_velocity_odometry}")
print(f" angular_velocity_device: {sample.angular_velocity_device}")
print(f" quality_score: {sample.quality_score}")
print(f" gravity_odometry: {sample.gravity_odometry}")
print(f" session_uid: {sample.session_uid}")
else:
print("open_loop_trajectory is empty.")
MPS Semi-dense Point Cloud and Observations
Understanding Point Cloud Data
MPS SLAM algorithm outputs 2 files related to semi-dense point cloud (see wiki page for data type definitions):
semidense_points.csv.gz: Global points in the world coordinate frame.semidense_observations.csv.gz: Point observations for each camera, at each timestamp.
Note that semidense point files are normally large, therefore loading them may take some time.
Loading Semi-dense Point Cloud
print("=== MPS - Semi-dense Point Cloud ===")
# Load MPS semi-dense point cloud data
semidense_points_file = os.path.join(
mps_folder_path, "slam", "semidense_points.csv.gz"
)
semidense_points = mps.read_global_point_cloud(semidense_points_file)
# Print out the content of the first sample in semidense_points
if semidense_points:
sample = semidense_points[0]
print("GlobalPointPosition sample:")
print(f" uid: {sample.uid}")
print(f" graph_uid: {sample.graph_uid}")
print(f" position_world: {sample.position_world}")
print(f" inverse_distance_std: {sample.inverse_distance_std}")
print(f" distance_std: {sample.distance_std}")
print(f"Total number of semi-dense points: {len(semidense_points)}")
else:
print("semidense_points is empty.")
# Filter semidense points by inv_dep or depth.
# The filter will KEEP points with (inv_dep or depth < threshold)
filtered_semidense_points = filter_points_from_confidence(raw_points = semidense_points, threshold_invdep = 1e-3, threshold_dep = 5e-2)
print(f"Filtering semidense points from a total of {len(semidense_points)} points down to {len(filtered_semidense_points)}")
Loading Point Observations
print("=== MPS - Semi-dense Point Observations ===")
# Load MPS semi-dense point observations data
semidense_observations_file = os.path.join(
mps_folder_path, "slam", "semidense_observations.csv.gz"
)
semidense_observations = mps.read_point_observations(semidense_observations_file)
# Print out the content of the first sample in semidense_observations
if semidense_observations:
sample = semidense_observations[0]
print("PointObservation sample:")
print(f" point_uid: {sample.point_uid}")
print(f" frame_capture_timestamp: {int(sample.frame_capture_timestamp.total_seconds() * 1e6)} us")
print(f" camera_serial: {sample.camera_serial}")
print(f" uv: {sample.uv}")
print(f"Total number of point observations: {len(semidense_observations)}")
else:
print("semidense_observations is empty.")
Visualizing MPS SLAM Results
In the following code snippet, we demonstrate how to visualize the MPS SLAM results in a 3D view.
We first prepare a short trajectory segment, then extract the semidense points position, along with timestamp-mapped observations for visualization purpose. Finally we plot everything in Rerun.
Color Mapping Helper Function
from collections import defaultdict
import numpy as np
# A helper coloring function
def color_from_zdepth(z_depth_m: float) -> np.ndarray:
"""
Map z-depth (meters, along the camera's forward axis) to a bright Viridis-like RGB color.
- If z_depth_m <= 0 (point is behind the camera), return black [0, 0, 0].
- Near (0.2 m) -> yellow, Far (5.0 m) -> purple.
Returns an array of shape (3,) with dtype=uint8.
"""
if not np.isfinite(z_depth_m) or z_depth_m <= 0.0:
return np.array([0, 0, 0], dtype=np.uint8)
NEAR_METERS, FAR_METERS = 0.2, 5.0
# Normalize to [0,1], then flip so near → bright (yellow), far → dark (purple)
clamped = min(max(float(z_depth_m), NEAR_METERS), FAR_METERS)
normalized_position = (clamped - NEAR_METERS) / (FAR_METERS - NEAR_METERS + 1e-12)
gradient_position = 1.0 - normalized_position
# Viridis-like anchor colors: purple → blue → teal → green → yellow
color_stops = [
(68, 1, 84),
(59, 82, 139),
(33, 145, 140),
(94, 201, 98),
(253, 231, 37),
]
# Locate segment and blend between its endpoints
segment_count = len(color_stops) - 1
continuous_index = gradient_position * segment_count
lower_segment_index = int(continuous_index)
if lower_segment_index >= segment_count:
red, green, blue = color_stops[-1]
else:
segment_fraction = continuous_index - lower_segment_index
r0, g0, b0 = color_stops[lower_segment_index]
r1, g1, b1 = color_stops[lower_segment_index + 1]
red = r0 + segment_fraction * (r1 - r0)
green = g0 + segment_fraction * (g1 - g0)
blue = b0 + segment_fraction * (b1 - b0)
return np.array([int(red), int(green), int(blue)], dtype=np.uint8)
Preparing Data for Visualization
print("=== Preparing MPS SLAM results for visualization ===")
# Check if we have valid SLAM data to visualize
if not closed_loop_trajectory or not semidense_points:
raise RuntimeError("Warning: This tutorial requires valid MPS SLAM data to run.")
# --
# Prepare Trajectory data
# --
# Select a short segment of trajectory (e.g., first 5000 samples, subsampled by 50)
segment_length = min(50000, len(closed_loop_trajectory))
trajectory_segment = closed_loop_trajectory[:segment_length:50]
timestamp_to_pose = {
pose.tracking_timestamp: pose for pose in trajectory_segment
}
print(f"Finished preparing a trajectory of length {len(trajectory_segment)}... ")
# -----------
# Prepare Semidense point data
# -----------
# Filter the semidense point cloud by confidence and limit max point count, and extract the point positions
filtered_semidense_point_cloud_data = filter_points_from_confidence(semidense_points)
points_positions = np.array(
[
point.position_world for point in filtered_semidense_point_cloud_data
]
)
print(f"Finished preparing filtered semidense points cloud of {len(filtered_semidense_point_cloud_data)} points... ")
# -----------
# Prepare Semidense observation data
# -----------
# Based on RGB observations, create a per-timestamp point position list, and color them according to its distance from RGB camera
point_uid_to_position = {
point.uid: np.array(point.position_world) for point in filtered_semidense_point_cloud_data
}
# A helper function that creates a easier-to-query mapping to obtain observations according to timestamps
slam_1_serial = vrs_data_provider.get_device_calibration().get_camera_calib("slam-front-left").get_serial_number()
timestamp_to_point_positions = defaultdict(list) # t_ns -> [position, position, ...]
timestamp_to_point_colors = defaultdict(list) # t_ns -> [color, color, ...]
for obs in semidense_observations:
# Only add observations for SLAM_1 camera, and if the timestamp is in the chosen trajectory segment
if (
obs.camera_serial == slam_1_serial and
obs.frame_capture_timestamp in timestamp_to_pose and
obs.point_uid in point_uid_to_position):
# Insert point position
obs_timestamp = obs.frame_capture_timestamp
point_position = point_uid_to_position[obs.point_uid]
timestamp_to_point_positions[obs_timestamp].append(point_position)
# Insert point color
T_world_device = timestamp_to_pose[obs_timestamp].transform_world_device
point_in_device = T_world_device.inverse() @ point_position
point_z_depth = point_in_device.squeeze()[2]
point_color = color_from_zdepth(point_z_depth)
timestamp_to_point_colors[obs_timestamp].append(point_color)
from itertools import islice
print(f"Finished preparing semidense points observations: ")
for timestamp, points in islice(timestamp_to_point_positions.items(), 5):
print(f"\t timestamp {int(timestamp.total_seconds() * 1e9)} ns has {len(points)} observed points in slam-front-left view. ")
print(f"\t ...")
3D Visualization with Rerun
import rerun as rr
import numpy as np
from projectaria_tools.utils.rerun_helpers import (
AriaGlassesOutline,
ToTransform3D,
ToBox3D,
)
from projectaria_tools.core.mps.utils import (
filter_points_from_confidence,
get_nearest_pose,
)
print("=== Visualizing MPS SLAM Results in 3D ===")
# Initialize Rerun
rr.init("MPS SLAM Visualization")
# Set up the 3D scene
rr.log("world", rr.ViewCoordinates.RIGHT_HAND_Z_UP, static=True)
# Log point cloud
rr.log(
"world/semidense_points",
rr.Points3D(
positions=points_positions,
colors=[255, 255, 255, 125],
radii=0.001
),
static=True
)
# Aria glass outline for visualization purpose
device_calib = vrs_data_provider.get_device_calibration()
aria_glasses_point_outline = AriaGlassesOutline(
device_calib, use_cad_calib=True
)
# Plot Closed loop trajectory
closed_loop_traj_cached_full = []
observation_points_cached = None
observation_colors_cached = None
for closed_loop_pose in trajectory_segment:
capture_timestamp_ns = int(closed_loop_pose.tracking_timestamp.total_seconds() * 1e9)
rr.set_time_nanos("device_time", capture_timestamp_ns)
T_world_device = closed_loop_pose.transform_world_device
# Log device pose as a coordinate frame
rr.log(
"world/device",
ToTransform3D(
T_world_device,
axis_length=0.05,
),
)
# Plot Aria glass outline
rr.log(
"world/device/glasses_outline",
rr.LineStrips3D(
aria_glasses_point_outline,
colors=[150,200,40],
radii=5e-3,
),
)
# Plot gravity direction vector
rr.log(
"world/vio_gravity",
rr.Arrows3D(
origins=[T_world_device.translation()[0]],
vectors=[
closed_loop_pose.gravity_world * 1e-2
], # length converted from 9.8 meter -> 10 cm
colors=[101,67,33],
radii=5e-3,
),
static=False,
)
# Update cached results for observations. Cache is needed because observation has a much lower freq than high-freq trajectory.
if closed_loop_pose.tracking_timestamp in timestamp_to_point_positions.keys():
observation_points_cached = timestamp_to_point_positions[closed_loop_pose.tracking_timestamp]
observation_colors_cached = timestamp_to_point_colors[closed_loop_pose.tracking_timestamp]
if observation_points_cached is not None:
rr.log(
"world/semidense_observations",
rr.Points3D(
positions = observation_points_cached,
colors = observation_colors_cached,
radii=0.01
),
static = False
)
# Plot the entire VIO trajectory that are cached so far
closed_loop_traj_cached_full.append(T_world_device.translation()[0])
rr.log(
"world/vio_trajectory",
rr.LineStrips3D(
closed_loop_traj_cached_full,
colors=[173, 216, 255],
radii=5e-3,
),
static=False,
)
rr.notebook_show()
Understanding MPS Data Structures
Trajectory Data Types
ClosedLoopTrajectoryPose
- tracking_timestamp: Device timestamp when pose was computed
- utc_timestamp: UTC timestamp
- transform_world_device: 6DOF pose in world coordinate frame
- device_linear_velocity_device: Linear velocity in device frame
- angular_velocity_device: Angular velocity in device frame
- quality_score: Pose estimation quality (higher = better)
- gravity_world: Gravity vector in world frame
- graph_uid: Unique identifier for the pose graph
OpenLoopTrajectoryPose
- tracking_timestamp: Device timestamp when pose was computed
- utc_timestamp: UTC timestamp
- transform_odometry_device: 6DOF pose in odometry coordinate frame
- device_linear_velocity_odometry: Linear velocity in odometry frame
- angular_velocity_device: Angular velocity in device frame
- quality_score: Pose estimation quality (higher = better)
- gravity_odometry: Gravity vector in odometry frame
- session_uid: Unique identifier for the session
Point Cloud Data Types
GlobalPointPosition
- uid: Unique identifier for the 3D point
- graph_uid: Identifier linking point to pose graph
- position_world: 3D position in world coordinate frame
- inverse_distance_std: Inverse distance standard deviation (quality metric)
- distance_std: Distance standard deviation (quality metric)
PointObservation
- point_uid: Links observation to 3D point
- frame_capture_timestamp: When the observation was captured
- camera_serial: Serial number of the observing camera
- uv: 2D pixel coordinates of the observation
MPS vs On-Device Comparisons
Key Differences
| Aspect | On-Device (VIO/SLAM) | MPS SLAM |
|---|---|---|
| Processing | Real-time during recording | Cloud-based post-processing |
| Accuracy | Good for real-time use | Higher accuracy with global optimization |
| Frequency | 20Hz (VIO), 800Hz (high-freq) | 1kHz (both open/closed loop) |
| Drift | Accumulates over time | Minimized with loop closure |
| Point Cloud | Not available | Dense semi-dense reconstructions |
| Coordinate Frame | Odometry frame | Global world frame |
Use Cases
- On-Device Data: Real-time applications, live feedback, immediate processing
- MPS Data: High-quality reconstruction, research analysis, detailed mapping
Summary
This tutorial covered the essential aspects of working with MPS data:
- Trajectory Access: Loading both open loop and closed loop trajectories
- Point Cloud Data: Accessing semi-dense 3D reconstructions and observations
- Data Filtering: Using confidence thresholds to improve point cloud quality
- 3D Visualization: Creating comprehensive visualizations with trajectories and point clouds
- Data Structures: Understanding the comprehensive MPS data formats
MPS provides high-quality, globally consistent 3D reconstructions that are ideal for:
- Research Applications: Detailed spatial analysis and mapping
- 3D Reconstruction: High-fidelity environmental modeling
- Motion Analysis: Accurate trajectory analysis without drift
- Multi-modal Studies: Combining precise 3D data with sensor information
- Benchmarking: Comparing against ground truth for algorithm development