Skip to main content

Streaming Example

This example demonstrates how to programmatically control streaming on your Aria Gen2 device using the Python SDK. You'll learn how to start streaming, implement custom callbacks for real-time data processing, and optionally record streaming data to VRS files.

Quick Start

Run the streaming example script:

python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py

To save streaming data to a VRS file:

python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py --record-to-vrs /path

What This Example Does

The script performs the following operations:

  1. Connects to the device
  2. Configures streaming settings (profile, interface)
  3. Starts streaming on the device
  4. Sets up a streaming receiver with custom callbacks
  5. Processes real-time data from all sensors (cameras, IMU, audio, machine perception)
  6. Optionally records streaming data to a VRS file

Code Walkthrough

Step 1: Import Required Modules

import argparse
import signal
import sys
import aria.sdk_gen2 as sdk_gen2
import aria.stream_receiver as receiver
from projectaria_tools.core.mps import EyeGaze, hand_tracking
from projectaria_tools.core.sensor_data import (
AudioData, AudioDataRecord, FrontendOutput, ImageData, ImageDataRecord, MotionData,
)

Key Modules:

  • aria.sdk_gen2: Main SDK for device control
  • aria.stream_receiver: Receives and processes streaming data
  • projectaria_tools.core: Data structures for sensor data and machine perception

Step 2: Parse Command-Line Arguments

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--record-to-vrs",
dest="record_to_vrs",
type=str,
default="",
required=False,
help="Output directory to save the received streaming into VRS",
)
return parser.parse_args()

Available Options:

  • --record-to-vrs: Optional path to save streaming data as a VRS file
  • If not specified, data is only processed in callbacks (not saved)

Important: Data drops can occur with poor streaming connections. The saved VRS file will reflect any data drops that occurred during streaming.


Step 3: Connect to Device and Start Streaming

# Create device client
device_client = sdk_gen2.DeviceClient()

# Establish connection to the device
config = sdk_gen2.DeviceClientConfig()
device_client.set_client_config(config)
device = device_client.connect()

# Set recording config with profile name
streaming_config = sdk_gen2.HttpStreamingConfig()
streaming_config.profile_name = "profile9"
device.set_streaming_config(streaming_config)

# Start and stop recording
device.start_streaming()

Streaming Configuration:

  • profile_name: Use mp_streaming_demo for smooth visualization with all machine perception data
  • streaming_interface: Current support:
    • USB_NCM: USB connection (default)

Step 4: Define Data Callbacks

Callbacks are functions that process data as it arrives from the device. Here's how to implement callbacks for each data type:

Image Callback

Processes all camera streams: RGB (1x), SLAM (4x), and Eye Tracking (2x):

def image_callback(image_data: ImageData, image_record: ImageDataRecord):
"""Called for each image frame from any camera."""
image_array = image_data.to_numpy_array()
timestamp_ns = image_record.capture_timestamp_ns

print(f"Received image: shape={image_array.shape}, timestamp={timestamp_ns} ns")

# Example: Process the image
# - image_array is a numpy array you can process with OpenCV, PIL, etc.
# - image_record contains metadata like timestamp, camera ID, exposure

Audio Callback

Processes audio data from all 8 microphone channels:

def audio_callback(audio_data: AudioData, audio_record: AudioDataRecord, num_channels: int):
"""Called for each audio data packet."""
num_samples = len(audio_data.data)
num_timestamps = len(audio_record.capture_timestamps_ns)

print(f"Received audio: samples={num_samples}, timestamps={num_timestamps}, channels={num_channels}")

# Example: Process audio
# - audio_data.data contains the raw audio samples
# - audio_record.capture_timestamps_ns contains per-sample timestamps

IMU Callback

Processes high-frequency IMU data from both IMU sensors at 800Hz:

def imu_callback(imu_data: MotionData, sensor_label: str):
"""Called for each IMU data sample from imu-left or imu-right."""
accel = imu_data.accel_msec2 # Acceleration in m/s²
gyro = imu_data.gyro_radsec # Gyroscope in rad/s

print(f"Received {sensor_label}: accel={accel}, gyro={gyro}")

# Example: Process IMU data
# - Use for motion tracking.

Eye Gaze Callback

Processes eye tracking data with gaze direction and depth:

def eyegaze_callback(eyegaze_data: EyeGaze):
"""Called for each eye gaze estimate."""
timestamp_sec = eyegaze_data.tracking_timestamp.total_seconds()
yaw_rad = eyegaze_data.yaw
pitch_rad = eyegaze_data.pitch
depth_m = eyegaze_data.depth

print(f"Eye Gaze: timestamp={timestamp_sec}s, yaw={yaw_rad:.3f}, pitch={pitch_rad:.3f}, depth={depth_m:.3f}m")

# Example: Use eye gaze data
# - Track where user is looking
# - Estimate focus depth
# - Build attention maps

Hand Tracking Callback

Processes hand pose estimates for both left and right hands:

def handtracking_callback(handtracking_data: hand_tracking.HandTrackingResult):
"""Called for each hand tracking estimate."""
timestamp_sec = handtracking_data.tracking_timestamp.total_seconds()
print(f"Hand Tracking: timestamp={timestamp_sec}s")

# Process left hand
if handtracking_data.left_hand is not None:
left_hand = handtracking_data.left_hand
print(f" Left hand confidence: {left_hand.confidence:.3f}")
print(f" Left wrist position: {left_hand.get_wrist_position_device()}")
print(f" Left palm position: {left_hand.get_palm_position_device()}")
if left_hand.wrist_and_palm_normal_device is not None:
normals = left_hand.wrist_and_palm_normal_device
print(f" Left wrist normal: {normals.wrist_normal_device}")
print(f" Left palm normal: {normals.palm_normal_device}")
else:
print(" Left hand: No data")


# Process right hand (similar to left hand)
if handtracking_data.right_hand is not None:
right_hand = handtracking_data.right_hand
print(f" Right hand confidence: {right_hand.confidence:.3f}")
print(f" Right wrist position: {right_hand.get_wrist_position_device()}")
print(f" Right palm position: {right_hand.get_palm_position_device()}")
if right_hand.wrist_and_palm_normal_device is not None:
normals = right_hand.wrist_and_palm_normal_device
print(f" Right wrist normal: {normals.wrist_normal_device}")
print(f" Right palm normal: {normals.palm_normal_device}")
else:
print(" Right hand: No data")

VIO Callback

Processes Visual-Inertial Odometry data (6-DOF pose estimates):

def vio_callback(vio_data: FrontendOutput):
"""Called for each VIO pose estimate."""
timestamp_ns = vio_data.capture_timestamp_ns
rotation = vio_data.transform_odometry_bodyimu.rotation().log()
translation = vio_data.transform_odometry_bodyimu.translation()

print(f"VIO: timestamp={timestamp_ns}ns, rotation={rotation}, translation={translation}")

# Example: Use VIO data
# - Track device position and orientation
# - Build 3D maps
# - Enable AR applications

Step 5: Set Up the Streaming Receiver

The streaming receiver listens for data and dispatches it to your callbacks:

def setup_streaming_receiver(device, record_to_vrs):
"""Configure and start the streaming receiver."""
# Configure server
config = sdk_gen2.HttpServerConfig()
config.address = "0.0.0.0" # Listen on all interfaces
config.port = 6768 # Default streaming port

# Create receiver
stream_receiver = receiver.StreamReceiver()
stream_receiver.set_server_config(config)

# Optional: Record to VRS
if record_to_vrs != "":
stream_receiver.record_to_vrs(record_to_vrs)

# Register all callbacks
stream_receiver.register_slam_callback(image_callback)
stream_receiver.register_rgb_callback(image_callback)
stream_receiver.register_audio_callback(audio_callback)
stream_receiver.register_eye_gaze_callback(eyegaze_callback)
stream_receiver.register_hand_pose_callback(handtracking_callback)
stream_receiver.register_vio_callback(vio_callback)

# Start receiving data
stream_receiver.start_server()

return stream_receiver

Important Configuration Notes:

  • Port 6768: Ensure this port is open and not blocked by firewall
  • VPN: Disable VPN to allow streaming data to be received
  • 0.0.0.0: Listens on all network interfaces (required for device to connect)

Step 6: Main Function

Tie everything together:

if __name__ == "__main__":
args = parse_args()

# Connect and start streaming
device = device_streaming()

# Set up receiver with callbacks
stream_receiver = setup_streaming_receiver(device, args.record_to_vrs)

# Keep running until interrupted
print("Streaming... Press Ctrl+C to stop")
try:
signal.pause() # Wait for interrupt signal
except KeyboardInterrupt:
print("\nStopping streaming...")
device.stop_streaming()
print("Streaming stopped")

Complete Example Code

Here's the full streaming script structure:

import argparse
import time

import aria.sdk_gen2 as sdk_gen2
import aria.stream_receiver as receiver

from projectaria_tools.core.mps import EyeGaze, hand_tracking, OpenLoopTrajectoryPose
from projectaria_tools.core.sensor_data import (
AudioData,
AudioDataRecord,
FrontendOutput,
ImageData,
ImageDataRecord,
MotionData,
)

# Set up the device client to initiate connection to the device
device_client = sdk_gen2.DeviceClient()


def device_streaming():
# Set up the device client config to specify the device to be connected to e.g. device serial number.
# If nothing is specified, the first device in the list of connected devices will be connected to
config = sdk_gen2.DeviceClientConfig()
device_client.set_client_config(config)
device = device_client.connect()

# Set recording config with profile name
streaming_config = sdk_gen2.HttpStreamingConfig()
streaming_config.profile_name = "profile9"
device.set_streaming_config(streaming_config)

# Start and stop recording
device.start_streaming()
return device


def image_callback(image_data: ImageData, image_record: ImageDataRecord):
print(
f"Received image data of size {image_data.to_numpy_array().shape} with timestamp {image_record.capture_timestamp_ns} ns"
)


def audio_callback(
audio_data: AudioData, audio_record: AudioDataRecord, num_channels: int
):
print(
f"Received audio data with {len(audio_data.data)} samples and {len(audio_record.capture_timestamps_ns)} timestamps and num channels {num_channels}"
)


def imu_callback(imu_data: MotionData, sensor_label: str):
print(
f"Received {sensor_label} accel data {imu_data.accel_msec2} and gyro {imu_data.gyro_radsec}"
)


def eyegaze_callback(eyegaze_data: EyeGaze):
print(
f"Received EyeGaze data at timestamp {eyegaze_data.tracking_timestamp.total_seconds()} sec "
f"with yaw={eyegaze_data.yaw:.3f} rad, pitch={eyegaze_data.pitch:.3f} rad, "
f"depth={eyegaze_data.depth:.3f} m"
)


def handtracking_callback(handtracking_data: hand_tracking.HandTrackingResult):
print(
f"Received HandTracking data at timestamp {handtracking_data.tracking_timestamp.total_seconds()} sec"
)

# Check left hand data
if handtracking_data.left_hand is not None:
left_hand = handtracking_data.left_hand
print(f" Left hand confidence: {left_hand.confidence:.3f}")
print(f" Left wrist position: {left_hand.get_wrist_position_device()}")
print(f" Left palm position: {left_hand.get_palm_position_device()}")
if left_hand.wrist_and_palm_normal_device is not None:
normals = left_hand.wrist_and_palm_normal_device
print(f" Left wrist normal: {normals.wrist_normal_device}")
print(f" Left palm normal: {normals.palm_normal_device}")
else:
print(" Left hand: No data")

# Check right hand data
if handtracking_data.right_hand is not None:
right_hand = handtracking_data.right_hand
print(f" Right hand confidence: {right_hand.confidence:.3f}")
print(f" Right wrist position: {right_hand.get_wrist_position_device()}")
print(f" Right palm position: {right_hand.get_palm_position_device()}")
if right_hand.wrist_and_palm_normal_device is not None:
normals = right_hand.wrist_and_palm_normal_device
print(f" Right wrist normal: {normals.wrist_normal_device}")
print(f" Right palm normal: {normals.palm_normal_device}")
else:
print(" Right hand: No data")


def vio_callback(vio_data: FrontendOutput):
print(
f"Received VIO data at timestamp {vio_data.capture_timestamp_ns} with transform_odometry_bodyimu: {vio_data.transform_odometry_bodyimu.rotation().log()} and {vio_data.transform_odometry_bodyimu.translation()} ns"
)


def calib_callback(calib_json_str: str):
print(f"Received calibration: {calib_json_str}")


def setup_streaming_receiver(device, record_to_vrs):
# setup the server to receive streaming data from the device
# IP address : 0.0.0.0 means that the server is listening on all available interfaces
# Port : 6768 is the port number that the server is listening on
config = sdk_gen2.HttpServerConfig()
config.address = "0.0.0.0"
config.port = 6768

# setup the receiver
stream_receiver = receiver.StreamReceiver()
stream_receiver.set_server_config(config)
if record_to_vrs != "":
stream_receiver.record_to_vrs(record_to_vrs)

# register callbacks for each type of data
stream_receiver.register_slam_callback(image_callback)
stream_receiver.register_rgb_callback(image_callback)
stream_receiver.register_audio_callback(audio_callback)
stream_receiver.register_eye_gaze_callback(eyegaze_callback)
stream_receiver.register_hand_pose_callback(handtracking_callback)
stream_receiver.register_vio_callback(vio_callback)

# start the server
stream_receiver.start_server()

time.sleep(10)

# stop streaming and terminate the server
device.stop_streaming()

time.sleep(2)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--record-to-vrs",
dest="record_to_vrs",
type=str,
default="",
required=False,
help="Output directory to save the received streaming into VRS",
)

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
# setup device to start streaming
device = device_streaming()

# setup streaming receiver to receive streaming data with callbacks
setup_streaming_receiver(device, args.record_to_vrs)

Usage Examples

Basic Streaming with Console Output

python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py

What happens:

  • Connects to device and starts streaming
  • Prints real-time data from all sensors to console
  • Data is NOT saved

Streaming and Recording to VRS

python ~/Downloads/projectaria_client_sdk_samples_gen2/device_streaming.py \
--record-to-vrs ~/Downloads/streaming_capture.vrs

What happens:

  • Streams data with real-time callbacks
  • Simultaneously records all data to VRS file
  • VRS file can be played back later with aria_rerun_viewer

Troubleshooting

No Data in Callbacks

Problem: Streaming starts but callbacks are never called.

Solutions:

  1. Check port 6768 is open:

    # On Linux/macOS
    sudo lsof -i :6768
  2. Disable VPN:

    • VPNs block streaming data
    • Disconnect from VPN/Lighthouse
  3. Check firewall:

    • Ensure firewall allows port 6768
    • Add exception for Python script
  4. Verify server address:

    • Must be 0.0.0.0 to listen on all interfaces
  5. Run aria_doctor:

    aria_doctor

High Data Drop Rate

Problem: VRS file shows many dropped frames.

Solutions:

  • Close other applications consuming bandwidth
  • Check system resources (CPU, memory)
  • Reduce callback processing time
  • Increase message queue size

Next Steps