Skip to main content

Aria Data Provider Code Snippets

In this section, we introduce the Python/C++ API to access sensor data in Project Aria VRS files (projectaria_tools/main/core/data_provider).

Open a VRS file

from projectaria_tools.core import data_provider
from projectaria_tools.core.sensor_data import TimeDomain, TimeQueryOptions
from projectaria_tools.core.stream_id import RecordableTypeId, StreamId

vrsfile = "example.vrs"
provider = data_provider.create_vrs_data_provider(vrsfile)
assert provider is not None, "Cannot open file"

Mapping between labels and stream ids

Stream IDs can be mapped from labels by using get_stream_id_from_label:

stream_id = provider.get_stream_id_from_label("camera-slam-left")

Inversely, you can retrieve a label from a stream ID by using get_stream_id_from_label:

label = provider.get_label_from_stream_id(StreamId("1201-1"))

Random access data by index

for stream_id in provider.get_all_streams():
for i in range(0, provider.get_num_data(stream_id)):
sensor_data = provider.get_sensor_data_by_index(stream_id, i)

Random access data by timestamp

Project Aria data has four kinds of TimeDomain entries. We strongly recommend always working with DEVICE_TIME when using single-device Aria data. The TIME_CODE TimeDomain is used when synchronizing time across multiple devices. Go to Timestamps in Aria VRS Files for more information.

  • TimeDomain.RECORD_TIME
  • TimeDomain.DEVICE_TIME - recommended
  • TimeDomain.HOST_TIME
  • TimeDomain.TIME_CODE - for multiple devices

You can also search using three different time query options:

  • TimeQueryOptions.BEFORE (default): last data with t <= t_query
  • TimeQueryOptions.AFTER : first data with t >= t_query
  • TimeQueryOptions.CLOSEST : the data where |t - t_query| is smallest
for stream_id in provider.get_all_streams():
t_first = provider.get_first_time_ns(stream_id, TimeDomain.DEVICE_TIME)
t_last = provider.get_last_time_ns(stream_id, TimeDomain.DEVICE_TIME)
query_timestamp = (t_first + t_last) // 2 # example query timestamp
sensor_data = provider.get_sensor_data_by_time_ns(stream_id, query_timestamp, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST)

Deliver all sensor data in VRS

Async iterator to deliver sensor data for all streams in device time order:

for data in provider.deliver_queued_sensor_data():
print(data.get_time_ns(TimeDomain.DEVICE_TIME))

Alternatively, you can use iterator-type syntax:

seq = provider.deliver_queued_sensor_data()
obj = next(seq)
while True:
print(obj.get_time_ns(TimeDomain.DEVICE_TIME))
try:
obj = next(seq)
except StopIteration:
break

Deliver with sub-stream selection, time truncation, and frame rate sub-sampling:

# Starts by default options which activates all sensors
deliver_option = provider.get_default_deliver_queued_options()

# Only play data from two cameras, also reduce framerate to half of vrs
deliver_option.deactivate_stream_all()
for label in ["camera-slam-left", "camera-slam-right"]:
streamId = provider.get_stream_id_from_label(label)
deliver_option.activate_stream(streamId)
deliver_option.set_subsample_rate(streamId, 2)

# skip first 100ns
deliver_option.set_truncate_first_device_time_ns(100)
for data in provider.deliver_queued_sensor_data() :
print(data.get_time_ns(TimeDomain.DEVICE_TIME))