Streaming Example
This example demonstrates how to programmatically control streaming on your Aria Gen2 device using the Python SDK. You'll learn how to start streaming, implement custom callbacks for real-time data processing, and optionally record streaming data to VRS files.
Quick Start
Run the streaming example script:
python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py
To save streaming data to a VRS file:
python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py --record-to-vrs /path
What This Example Does
The script performs the following operations:
- Connects to the device
- Configures streaming settings (profile, interface)
- Starts streaming on the device
- Sets up a streaming receiver with custom callbacks
- Processes real-time data from all sensors (cameras, IMU, audio, machine perception)
- Optionally records streaming data to a VRS file
Code Walkthrough
Step 1: Import Required Modules
import argparse
import signal
import sys
import aria.sdk_gen2 as sdk_gen2
import aria.stream_receiver as receiver
from projectaria_tools.core.mps import EyeGaze, hand_tracking
from projectaria_tools.core.sensor_data import (
AudioData, AudioDataRecord, FrontendOutput, ImageData, ImageDataRecord, MotionData,
)
Key Modules:
aria.sdk_gen2
: Main SDK for device controlaria.stream_receiver
: Receives and processes streaming dataprojectaria_tools.core
: Data structures for sensor data and machine perception
Step 2: Parse Command-Line Arguments
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--record-to-vrs",
dest="record_to_vrs",
type=str,
default="",
required=False,
help="Output directory to save the received streaming into VRS",
)
return parser.parse_args()
Available Options:
--record-to-vrs
: Optional path to save streaming data as a VRS file- If not specified, data is only processed in callbacks (not saved)
Important: Data drops can occur with poor streaming connections. The saved VRS file will reflect any data drops that occurred during streaming.
Step 3: Connect to Device and Start Streaming
# Create device client
device_client = sdk_gen2.DeviceClient()
# Establish connection to the device
config = sdk_gen2.DeviceClientConfig()
device_client.set_client_config(config)
device = device_client.connect()
# Set recording config with profile name
streaming_config = sdk_gen2.HttpStreamingConfig()
streaming_config.profile_name = "profile9"
device.set_streaming_config(streaming_config)
# Start and stop recording
device.start_streaming()
Streaming Configuration:
profile_name
: Usemp_streaming_demo
for smooth visualization with all machine perception datastreaming_interface
: Current support:USB_NCM
: USB connection (default)
Step 4: Define Data Callbacks
Callbacks are functions that process data as it arrives from the device. Here's how to implement callbacks for each data type:
Image Callback
Processes all camera streams: RGB (1x), SLAM (4x), and Eye Tracking (2x):
def image_callback(image_data: ImageData, image_record: ImageDataRecord):
"""Called for each image frame from any camera."""
image_array = image_data.to_numpy_array()
timestamp_ns = image_record.capture_timestamp_ns
print(f"Received image: shape={image_array.shape}, timestamp={timestamp_ns} ns")
# Example: Process the image
# - image_array is a numpy array you can process with OpenCV, PIL, etc.
# - image_record contains metadata like timestamp, camera ID, exposure
Audio Callback
Processes audio data from all 8 microphone channels:
def audio_callback(audio_data: AudioData, audio_record: AudioDataRecord, num_channels: int):
"""Called for each audio data packet."""
num_samples = len(audio_data.data)
num_timestamps = len(audio_record.capture_timestamps_ns)
print(f"Received audio: samples={num_samples}, timestamps={num_timestamps}, channels={num_channels}")
# Example: Process audio
# - audio_data.data contains the raw audio samples
# - audio_record.capture_timestamps_ns contains per-sample timestamps
IMU Callback
Processes high-frequency IMU data from both IMU sensors at 800Hz:
def imu_callback(imu_data: MotionData, sensor_label: str):
"""Called for each IMU data sample from imu-left or imu-right."""
accel = imu_data.accel_msec2 # Acceleration in m/s²
gyro = imu_data.gyro_radsec # Gyroscope in rad/s
print(f"Received {sensor_label}: accel={accel}, gyro={gyro}")
# Example: Process IMU data
# - Use for motion tracking.
Eye Gaze Callback
Processes eye tracking data with gaze direction and depth:
def eyegaze_callback(eyegaze_data: EyeGaze):
"""Called for each eye gaze estimate."""
timestamp_sec = eyegaze_data.tracking_timestamp.total_seconds()
yaw_rad = eyegaze_data.yaw
pitch_rad = eyegaze_data.pitch
depth_m = eyegaze_data.depth
print(f"Eye Gaze: timestamp={timestamp_sec}s, yaw={yaw_rad:.3f}, pitch={pitch_rad:.3f}, depth={depth_m:.3f}m")
# Example: Use eye gaze data
# - Track where user is looking
# - Estimate focus depth
# - Build attention maps
Hand Tracking Callback
Processes hand pose estimates for both left and right hands:
def handtracking_callback(handtracking_data: hand_tracking.HandTrackingResult):
"""Called for each hand tracking estimate."""
timestamp_sec = handtracking_data.tracking_timestamp.total_seconds()
print(f"Hand Tracking: timestamp={timestamp_sec}s")
# Process left hand
if handtracking_data.left_hand is not None:
left_hand = handtracking_data.left_hand
print(f" Left hand confidence: {left_hand.confidence:.3f}")
print(f" Left wrist position: {left_hand.get_wrist_position_device()}")
print(f" Left palm position: {left_hand.get_palm_position_device()}")
if left_hand.wrist_and_palm_normal_device is not None:
normals = left_hand.wrist_and_palm_normal_device
print(f" Left wrist normal: {normals.wrist_normal_device}")
print(f" Left palm normal: {normals.palm_normal_device}")
else:
print(" Left hand: No data")
# Process right hand (similar to left hand)
if handtracking_data.right_hand is not None:
right_hand = handtracking_data.right_hand
print(f" Right hand confidence: {right_hand.confidence:.3f}")
print(f" Right wrist position: {right_hand.get_wrist_position_device()}")
print(f" Right palm position: {right_hand.get_palm_position_device()}")
if right_hand.wrist_and_palm_normal_device is not None:
normals = right_hand.wrist_and_palm_normal_device
print(f" Right wrist normal: {normals.wrist_normal_device}")
print(f" Right palm normal: {normals.palm_normal_device}")
else:
print(" Right hand: No data")
VIO Callback
Processes Visual-Inertial Odometry data (6-DOF pose estimates):
def vio_callback(vio_data: FrontendOutput):
"""Called for each VIO pose estimate."""
timestamp_ns = vio_data.capture_timestamp_ns
rotation = vio_data.transform_odometry_bodyimu.rotation().log()
translation = vio_data.transform_odometry_bodyimu.translation()
print(f"VIO: timestamp={timestamp_ns}ns, rotation={rotation}, translation={translation}")
# Example: Use VIO data
# - Track device position and orientation
# - Build 3D maps
# - Enable AR applications
Step 5: Set Up the Streaming Receiver
The streaming receiver listens for data and dispatches it to your callbacks:
def setup_streaming_receiver(device, record_to_vrs):
"""Configure and start the streaming receiver."""
# Configure server
config = sdk_gen2.HttpServerConfig()
config.address = "0.0.0.0" # Listen on all interfaces
config.port = 6768 # Default streaming port
# Create receiver
stream_receiver = receiver.StreamReceiver()
stream_receiver.set_server_config(config)
# Optional: Record to VRS
if record_to_vrs != "":
stream_receiver.record_to_vrs(record_to_vrs)
# Register all callbacks
stream_receiver.register_slam_callback(image_callback)
stream_receiver.register_rgb_callback(image_callback)
stream_receiver.register_audio_callback(audio_callback)
stream_receiver.register_eye_gaze_callback(eyegaze_callback)
stream_receiver.register_hand_pose_callback(handtracking_callback)
stream_receiver.register_vio_callback(vio_callback)
# Start receiving data
stream_receiver.start_server()
return stream_receiver
Important Configuration Notes:
- Port 6768: Ensure this port is open and not blocked by firewall
- VPN: Disable VPN to allow streaming data to be received
0.0.0.0
: Listens on all network interfaces (required for device to connect)
Step 6: Main Function
Tie everything together:
if __name__ == "__main__":
args = parse_args()
# Connect and start streaming
device = device_streaming()
# Set up receiver with callbacks
stream_receiver = setup_streaming_receiver(device, args.record_to_vrs)
# Keep running until interrupted
print("Streaming... Press Ctrl+C to stop")
try:
signal.pause() # Wait for interrupt signal
except KeyboardInterrupt:
print("\nStopping streaming...")
device.stop_streaming()
print("Streaming stopped")
Complete Example Code
Here's the full streaming script structure:
import argparse
import time
import aria.sdk_gen2 as sdk_gen2
import aria.stream_receiver as receiver
from projectaria_tools.core.mps import EyeGaze, hand_tracking, OpenLoopTrajectoryPose
from projectaria_tools.core.sensor_data import (
AudioData,
AudioDataRecord,
FrontendOutput,
ImageData,
ImageDataRecord,
MotionData,
)
# Set up the device client to initiate connection to the device
device_client = sdk_gen2.DeviceClient()
def device_streaming():
# Set up the device client config to specify the device to be connected to e.g. device serial number.
# If nothing is specified, the first device in the list of connected devices will be connected to
config = sdk_gen2.DeviceClientConfig()
device_client.set_client_config(config)
device = device_client.connect()
# Set recording config with profile name
streaming_config = sdk_gen2.HttpStreamingConfig()
streaming_config.profile_name = "profile9"
device.set_streaming_config(streaming_config)
# Start and stop recording
device.start_streaming()
return device
def image_callback(image_data: ImageData, image_record: ImageDataRecord):
print(
f"Received image data of size {image_data.to_numpy_array().shape} with timestamp {image_record.capture_timestamp_ns} ns"
)
def audio_callback(
audio_data: AudioData, audio_record: AudioDataRecord, num_channels: int
):
print(
f"Received audio data with {len(audio_data.data)} samples and {len(audio_record.capture_timestamps_ns)} timestamps and num channels {num_channels}"
)
def imu_callback(imu_data: MotionData, sensor_label: str):
print(
f"Received {sensor_label} accel data {imu_data.accel_msec2} and gyro {imu_data.gyro_radsec}"
)
def eyegaze_callback(eyegaze_data: EyeGaze):
print(
f"Received EyeGaze data at timestamp {eyegaze_data.tracking_timestamp.total_seconds()} sec "
f"with yaw={eyegaze_data.yaw:.3f} rad, pitch={eyegaze_data.pitch:.3f} rad, "
f"depth={eyegaze_data.depth:.3f} m"
)
def handtracking_callback(handtracking_data: hand_tracking.HandTrackingResult):
print(
f"Received HandTracking data at timestamp {handtracking_data.tracking_timestamp.total_seconds()} sec"
)
# Check left hand data
if handtracking_data.left_hand is not None:
left_hand = handtracking_data.left_hand
print(f" Left hand confidence: {left_hand.confidence:.3f}")
print(f" Left wrist position: {left_hand.get_wrist_position_device()}")
print(f" Left palm position: {left_hand.get_palm_position_device()}")
if left_hand.wrist_and_palm_normal_device is not None:
normals = left_hand.wrist_and_palm_normal_device
print(f" Left wrist normal: {normals.wrist_normal_device}")
print(f" Left palm normal: {normals.palm_normal_device}")
else:
print(" Left hand: No data")
# Check right hand data
if handtracking_data.right_hand is not None:
right_hand = handtracking_data.right_hand
print(f" Right hand confidence: {right_hand.confidence:.3f}")
print(f" Right wrist position: {right_hand.get_wrist_position_device()}")
print(f" Right palm position: {right_hand.get_palm_position_device()}")
if right_hand.wrist_and_palm_normal_device is not None:
normals = right_hand.wrist_and_palm_normal_device
print(f" Right wrist normal: {normals.wrist_normal_device}")
print(f" Right palm normal: {normals.palm_normal_device}")
else:
print(" Right hand: No data")
def vio_callback(vio_data: FrontendOutput):
print(
f"Received VIO data at timestamp {vio_data.capture_timestamp_ns} with transform_odometry_bodyimu: {vio_data.transform_odometry_bodyimu.rotation().log()} and {vio_data.transform_odometry_bodyimu.translation()} ns"
)
def calib_callback(calib_json_str: str):
print(f"Received calibration: {calib_json_str}")
def setup_streaming_receiver(device, record_to_vrs):
# setup the server to receive streaming data from the device
# IP address : 0.0.0.0 means that the server is listening on all available interfaces
# Port : 6768 is the port number that the server is listening on
config = sdk_gen2.HttpServerConfig()
config.address = "0.0.0.0"
config.port = 6768
# setup the receiver
stream_receiver = receiver.StreamReceiver()
stream_receiver.set_server_config(config)
if record_to_vrs != "":
stream_receiver.record_to_vrs(record_to_vrs)
# register callbacks for each type of data
stream_receiver.register_slam_callback(image_callback)
stream_receiver.register_rgb_callback(image_callback)
stream_receiver.register_audio_callback(audio_callback)
stream_receiver.register_eye_gaze_callback(eyegaze_callback)
stream_receiver.register_hand_pose_callback(handtracking_callback)
stream_receiver.register_vio_callback(vio_callback)
# start the server
stream_receiver.start_server()
time.sleep(10)
# stop streaming and terminate the server
device.stop_streaming()
time.sleep(2)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--record-to-vrs",
dest="record_to_vrs",
type=str,
default="",
required=False,
help="Output directory to save the received streaming into VRS",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# setup device to start streaming
device = device_streaming()
# setup streaming receiver to receive streaming data with callbacks
setup_streaming_receiver(device, args.record_to_vrs)
Usage Examples
Basic Streaming with Console Output
python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py
What happens:
- Connects to device and starts streaming
- Prints real-time data from all sensors to console
- Data is NOT saved
Streaming and Recording to VRS
python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py \
--record-to-vrs ~/Downloads/streaming_capture.vrs
What happens:
- Streams data with real-time callbacks
- Simultaneously records all data to VRS file
- VRS file can be played back later with
aria_rerun_viewer
Troubleshooting
No Data in Callbacks
Problem: Streaming starts but callbacks are never called.
Solutions:
-
Check port 6768 is open:
# On Linux/macOS
sudo lsof -i :6768 -
Disable VPN:
- VPNs block streaming data
- Disconnect from VPN/Lighthouse
-
Check firewall:
- Ensure firewall allows port 6768
- Add exception for Python script
-
Verify server address:
- Must be
0.0.0.0
to listen on all interfaces
- Must be
-
Run aria_doctor:
aria_doctor
High Data Drop Rate
Problem: VRS file shows many dropped frames.
Solutions:
- Close other applications consuming bandwidth
- Check system resources (CPU, memory)
- Reduce callback processing time
- Increase message queue size
Next Steps
- Learn about text-to-speech commands
- Review all Python SDK examples
- Explore the Streaming Guide for CLI streaming
- Check Troubleshooting for common issues