Aria Data Provider Code Snippets
In this section, we introduce the Python/C++ API to access sensor data in Project Aria VRS files (projectaria_tools/main/core/data_provider).
Open a VRS file
- Python
- C++
from projectaria_tools.core import data_provider
from projectaria_tools.core.sensor_data import TimeDomain, TimeQueryOptions
from projectaria_tools.core.stream_id import RecordableTypeId, StreamId
vrsfile = "example.vrs"
provider = data_provider.create_vrs_data_provider(vrsfile)
assert provider is not None, "Cannot open file"
#include <dataprovider/VrsDataProvider.h>
using namespace projectaria::tools::data_provider;
std::string vrsfile = "example.vrs"
auto maybeProvider = createVrsDataProvider(vrsFilename);
XR_CHECK(maybeProvider, "Cannot open file");
VrsDataProvider& provider = *maybeProvider;
Mapping between labels and stream ids
- Python
- C++
Stream IDs can be mapped from labels by using get_stream_id_from_label
:
stream_id = provider.get_stream_id_from_label("camera-slam-left")
Inversely, you can retrieve a label from a stream ID by using get_stream_id_from_label
:
label = provider.get_label_from_stream_id(StreamId("1201-1"))
Stream IDs can be mapped from labels by using getStreamIdFromLabel
:
auto streamId = provider.getStreamIdFromLabel("camera-slam-left");
Inversely, you can retrieve a label from a stream id by using getLabelFromStreamId
.
auto label = provider.getLabelFromStreamId(StreamId::fromNumericalName("1201-1"));
Random access data by index
- Python
- C++
for stream_id in provider.get_all_streams():
for i in range(0, provider.get_num_data(stream_id)):
sensor_data = provider.get_sensor_data_by_index(stream_id, i)
for (const auto& streamId : provider.getAllStreams()) {
for( size_t i = 0 ; i < provider.getNumData(streamId); ++i) {
auto sensorData = provider.getSensorDataByIndex(streamId, i);
}
}
Random access data by timestamp
Project Aria data has four kinds of TimeDomain entries. We strongly recommend always working with DEVICE_TIME
when using single-device Aria data. The TIME_CODE
TimeDomain is used when synchronizing time across multiple devices. Go to Timestamps in Aria VRS Files for more information.
- Python
- C++
- TimeDomain.RECORD_TIME
- TimeDomain.DEVICE_TIME - recommended
- TimeDomain.HOST_TIME
- TimeDomain.TIME_CODE - for multiple devices
You can also search using three different time query options:
- TimeQueryOptions.BEFORE (default): last data with
t <= t_query
- TimeQueryOptions.AFTER : first data with
t >= t_query
- TimeQueryOptions.CLOSEST : the data where
|t - t_query|
is smallest
for stream_id in provider.get_all_streams():
t_first = provider.get_first_time_ns(stream_id, TimeDomain.DEVICE_TIME)
t_last = provider.get_last_time_ns(stream_id, TimeDomain.DEVICE_TIME)
query_timestamp = (t_first + t_last) // 2 # example query timestamp
sensor_data = provider.get_sensor_data_by_time_ns(stream_id, query_timestamp, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST)
- TimeDomain::RecordTime
- TimeDomain::DeviceTime - recommended
- TimeDomain::HostTime
- TimeDomain::TimeCode - for multiple devices
You can also search using three different time query options:
- TimeQueryOptions::Before : last data with
t <= t_query
- TimeQueryOptions::After : first data with
t >= t_query
- TimeQueryOptions::Closest : the data where
|t - t_query|
is smallest
for (const auto& streamId : provider.getAllStreams()) {
int64_t tFirst = provider.getFirstTimeNs(streamId, TimeDomain::DeviceTime);
int64_t tLast = provider.getLastTimeNs(streamId, TimeDomain::DeviceTime);
auto queryTimestamp = (tFirst + tLast) / 2; // example query timestamp
auto sensorData = provider.getSensorDataByTimeNs(streamId, queryTimestamp, TimeDomain::DeviceTime, TimeQueryOptions::Closest);
}
Deliver all sensor data in VRS
- Python
- C++
Async iterator to deliver sensor data for all streams in device time order:
for data in provider.deliver_queued_sensor_data():
print(data.get_time_ns(TimeDomain.DEVICE_TIME))
Alternatively, you can use iterator-type syntax:
seq = provider.deliver_queued_sensor_data()
obj = next(seq)
while True:
print(obj.get_time_ns(TimeDomain.DEVICE_TIME))
try:
obj = next(seq)
except StopIteration:
break
Deliver with sub-stream selection, time truncation, and frame rate sub-sampling:
# Starts by default options which activates all sensors
deliver_option = provider.get_default_deliver_queued_options()
# Only play data from two cameras, also reduce framerate to half of vrs
deliver_option.deactivate_stream_all()
for label in ["camera-slam-left", "camera-slam-right"]:
streamId = provider.get_stream_id_from_label(label)
deliver_option.activate_stream(streamId)
deliver_option.set_subsample_rate(streamId, 2)
# skip first 100ns
deliver_option.set_truncate_first_device_time_ns(100)
for data in provider.deliver_queued_sensor_data() :
print(data.get_time_ns(TimeDomain.DEVICE_TIME))
Async iterator to deliver sensor data for all streams in device time order:
for (const SensorData& data : provider.deliverQueuedSensorData()) {
std::cout << data.getTimeNs(TimeDomain::DeviceTime) << std::endl;
}
Alternatively, you can use iterator-type syntax:
auto seq = provider.deliverQueuedSensorData();
for (const auto& it = seq.begin(), it != seq.end(); ++it) {
SensorData data = *it;
std::cout << data.getTimeNs(TimeDomain::DeviceTime) << std::endl;
}
Deliver with sub-stream selection, time truncation, and frame rate sub-sampling:
// Starts by default options which activates all sensors
deliverOption = provider.getDefaultDeliverQueuedOptions();
deliverOption.deactivateStreamAll();
// Only play data from two cameras, also reduce framerate to half of vrs
for (const auto& label : {"camera-slam-left", "camera-slam-right"}) {
std::optional<vrs::StreamId> maybeStreamId = provider.getStreamIdFromLabel(label);
if (maybeStreamId) {
deliverOption.activateStream(maybeStreamId.value());
deliverOption.setSubsampleRate(maybeStreamId.value(), 2);
}
}
// skip first 100ns
deliverOption.setTruncateFirstDeviceTimeNs(100);
for (const SensorData& data : provider.deliverQueuedSensorData(deliverOption)) {
std::cout << data.getTimeNs(TimeDomain::DeviceTime) << std::endl;
}