VRS
A file format for sensor data.
Loading...
Searching...
No Matches
Classes | Public Types | Public Member Functions | Static Public Attributes | Protected Attributes | Friends | List of all members
vrs::RecordFileWriter Class Reference

The class to create VRS files. More...

#include <RecordFileWriter.h>

Classes

struct  SortRecord
 

Public Types

using RecordBatch = vector< pair< StreamId, list< Record * > > >
 Batch of records collected at one point in time, for each recordable.
 
using RecordBatches = vector< unique_ptr< RecordBatch > >
 Series of record batches collected.
 
using SortedRecords = deque< SortRecord >
 List of records, sorted by time.
 

Public Member Functions

 RecordFileWriter (const RecordFileWriter &)=delete
 
RecordFileWriteroperator= (const RecordFileWriter &)=delete
 
void addRecordable (Recordable *recordable)
 
vector< Recordable * > getRecordables () const
 
void setCompressionThreadPoolSize (size_t size=kMaxThreadPoolSizeForHW)
 
void setInitCreatedThreadCallback (const InitCreatedThreadCallback &initCreatedThreadCallback)
 
int writeToFile (const string &filePath)
 
void purgeOldRecords (double maxTimestamp, bool recycleBuffers=true)
 
virtual int createFileAsync (const string &filePath)
 
int createChunkedFile (const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
 
void setMaxChunkSizeMB (size_t maxChunkSizeMB)
 
int preallocateIndex (unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
 
int writeRecordsAsync (double maxTimestamp)
 
int autoWriteRecordsAsync (const function< double()> &maxTimestampProvider, double delay)
 
int autoPurgeRecords (const function< double()> &maxTimestampProvider, double delay)
 
bool isWriting () const
 
void trackBackgroundThreadQueueByteSize ()
 
uint64_t getBackgroundThreadQueueByteSize ()
 
int closeFileAsync ()
 
virtual int waitForFileClosed ()
 
void setTag (const string &tagName, const string &tagValue)
 
void addTags (const map< string, string > &newTags)
 
const map< string, string > & getTags () const
 
int setWriteFileHandler (unique_ptr< WriteFileHandler > writeFileHandler)
 
void backgroundWriterThreadActivity ()
 
void backgroundPurgeThreadActivity ()
 

Static Public Attributes

static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX
 

Protected Attributes

unique_ptr< WriteFileHandlerfile_
 data members valid while a file is being worked on
 
uint64_t maxChunkSize_ {}
 
unique_ptr< NewChunkHandlernewChunkHandler_
 
FileFormat::FileHeader fileHeader_
 
uint32_t lastRecordSize_ {}
 
bool sortRecords_ {true}
 
bool skipFinalizeIndexRecords_ = false
 
unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex_
 
IndexRecord::Writer indexRecordWriter_
 
map< string, string > fileTags_
 
size_t compressionThreadPoolSize_ {}
 
RecordFileWriter_::WriterThreadDatawriterThreadData_
 when a background thread is active
 
std::unique_ptr< std::atomic< uint64_t > > queueByteSize_
 
RecordFileWriter_::PurgeThreadDatapurgeThreadData_
 when a purge thread is active
 
InitCreatedThreadCallback initCreatedThreadCallback_
 

Friends

struct ::vrs::test::RecordFileWriterTester
 

Detailed Description

The class to create VRS files.

There are different strategies to write a VRS file:

Write all the data of one or more recordables to a file synchronously in one shot:

To write the data of one or more recordables, progressively, while records are being generated, using a background thread:

Member Function Documentation

◆ addRecordable()

void vrs::RecordFileWriter::addRecordable ( Recordable recordable)

A record file holds data from various recordables, registered using this method. The ownership of the recordable is not transferred, and the caller is responsible for deleting the recordables after the RecordFileWriter is deleted.

Parameters
recordableRecordable object for the device to record.

◆ addTags()

void vrs::RecordFileWriter::addTags ( const map< string, string > &  newTags)

Add file tags in bulk.

Parameters
newTagsA map of string name/value pairs.

◆ autoPurgeRecords()

int vrs::RecordFileWriter::autoPurgeRecords ( const function< double()> &  maxTimestampProvider,
double  delay 
)

To purge old records automatically, when no file is being written. Note: while writing a VRS file asynchronously, record purging will automatically be disabled when the file is created, and re-enabled when the file is closed.

Parameters
maxTimestampProviderFunction providing the timestamp of the newest record to be purged.
delayNumber of seconds between automated calls to purge records.
Returns
A status code: 0 if no error occurred, a file system error code otherwise.

◆ autoWriteRecordsAsync()

int vrs::RecordFileWriter::autoWriteRecordsAsync ( const function< double()> &  maxTimestampProvider,
double  delay 
)

To collect & write new records automatically after opening the file.

Parameters
maxTimestampProviderFunction providing the timestamp of the newest record to be sent to the background thread(s) writing records to disk. All records sent to disk are older.
delayNumber of seconds between automated calls to send records for writing.
Returns
A status code: 0 if no error occurred, a file system error code otherwise.

◆ backgroundPurgeThreadActivity()

void vrs::RecordFileWriter::backgroundPurgeThreadActivity ( )

Background threads implementation: do not call!

◆ backgroundWriterThreadActivity()

void vrs::RecordFileWriter::backgroundWriterThreadActivity ( )

Background threads implementation: do not call!

◆ closeFileAsync()

int vrs::RecordFileWriter::closeFileAsync ( )

Request to close the file, when all data has been written, but don't wait for that.

Returns
A status code: 0 if no error occurred, a file system error code otherwise.

◆ createChunkedFile()

int vrs::RecordFileWriter::createChunkedFile ( const string &  filePath,
size_t  maxChunkSizeMB = 0,
unique_ptr< NewChunkHandler > &&  chunkHandler = nullptr 
)

Create a VRS file to write to in a background thread, with a separate head file that will contain the file's header, and the description and index records only. All the user records will be written in one or more following chunks, that will be only written going forward, which makes them streaming friendly. The file's head will always be written using a DiskFile, and will be using updates/overwrites, which is not compatible with streaming, unless you can upload the file's head at the end, and prepend it to the previously uploaded user record chunk(s). A VRS file created this way will be efficient to stream when reading records in timestamp order with no seek backward or forward.

Parameters
filePathPath relative or absolute of the VRS file to create. The user record chunks will be named using the name provided, with "_1", "_2", etc as suffix. Note: the file's & recordables' tags will be written to disk before the call returns. Make sure to call this method only after you've added all your recordables and set all the tags.
maxChunkSizeMBmax size of a chunk. Note that the last chunk may actually be larger. If maxChunkSizeMB is 0, at least two chunks will be created: one for the file's records, one for the file's header and index.
chunkHandleroptional listener to be notified each time a chunk is complete, and when chunk handling should be completed (finalize uploads, maybe?). See NewChunkCallback's documentation for details.

◆ createFileAsync()

int vrs::RecordFileWriter::createFileAsync ( const string &  filePath)
virtual

Create a VRS file to write to in a background thread.

Parameters
filePathPath relative or absolute of the VRS file to create. Note: the file's & recordables' tags will be written to disk before the call returns. Make sure to call this method only after you've added all your recordables and set all the tags.

◆ getBackgroundThreadQueueByteSize()

uint64_t vrs::RecordFileWriter::getBackgroundThreadQueueByteSize ( )

Get how many record-bytes have been passed to the background thread, but not yet processed, which correlates with how much memory is being used by the queue.

Returns
The number record bytes are waiting to be processed by the background thread.

◆ getRecordables()

vector< Recordable * > vrs::RecordFileWriter::getRecordables ( ) const

Get the recordables attached to this writer.

Returns
A vector of Recordable pointers.

◆ getTags()

const map< string, string > & vrs::RecordFileWriter::getTags ( ) const
inline

Get all the file tags at once.

Returns
A map of all the tags associated with the file itself.

◆ isWriting()

bool vrs::RecordFileWriter::isWriting ( ) const
inline

Tell if a disk file is being written.

Returns
True is a file is being written by this RecordFileWriter instance.

◆ preallocateIndex()

int vrs::RecordFileWriter::preallocateIndex ( unique_ptr< deque< IndexRecord::DiskRecordInfo > >  preliminaryIndex)

Pre-allocate space for an index similar to the one provided. Must be called before the file is created, but after all the recordables have been attached. Call this method just before creating the file using createFileAsync(). The index should be as similar to the real thing as possible, as it will be used to guess the size of the actual index, compressed. This method is meant to be used for copy operations, so that a compressed index can be pre-allocated based on the index of the source file(s).

Parameters
preliminaryIndexan index that resembles the index of the final file.
Returns
0 is the request is accepted.

◆ purgeOldRecords()

void vrs::RecordFileWriter::purgeOldRecords ( double  maxTimestamp,
bool  recycleBuffers = true 
)

Delete all records older than a certain time (useful to trim a live buffer).

Parameters
maxTimestamptimestamp cutoff for the records to purge; all purged records have been created before that maxTimestamp.
recycleBufferTell if the buffers should be recycled and the memory occupied by the records might not released immediately, or if the memory freed immediately.

◆ setCompressionThreadPoolSize()

void vrs::RecordFileWriter::setCompressionThreadPoolSize ( size_t  size = kMaxThreadPoolSizeForHW)

Set number of threads to use for background compression, or none will be used.

Parameters
sizeNumber of threads to compress records in parallel. The default value will make RecordFileWriter use as many threads as there are cores in the system. If you do not set any value, RecordFileWriter will use only a single thread.

◆ setInitCreatedThreadCallback()

void vrs::RecordFileWriter::setInitCreatedThreadCallback ( const InitCreatedThreadCallback initCreatedThreadCallback)

Sets a callback that will be called when a Thread is created by this interface. This provides the user the opportunity to set the threads priority, name, etc... The new thread will use the callback so functions like gettid() can be used. This is an optional call, but must be performed before calling createFileAsync().

Parameters
initCreatedThreadCallbackCallback that returns a reference to the thread created, the ThreadRole, and the thread index (only used for Compression threads)

◆ setMaxChunkSizeMB()

void vrs::RecordFileWriter::setMaxChunkSizeMB ( size_t  maxChunkSizeMB)

Set the maximum chunk size, as a number of MB, 0 meaning no chunking (infinite limit). Must be called after calling createFileAsync(), if it wasn't set then.

Parameters
maxChunkSizeMBMax number of MB by chunk. Actual chunks might be a bit smaller, to avoid splitting records.

◆ setTag()

void vrs::RecordFileWriter::setTag ( const string &  tagName,
const string &  tagValue 
)

Set a tag value. Note: tags are written when the file is created! Changes made later will not be saved! Tags are written early, so that if the app crashes, or we run out of disk space, we don't loose them! An index can be rebuilt if it's missing in a truncated file, but tags need to be safe, or they're useless!

Parameters
tagNameThe name of the tag.
tagValueThe value of the tag. Can be any string value, including a json message.

◆ setWriteFileHandler()

int vrs::RecordFileWriter::setWriteFileHandler ( unique_ptr< WriteFileHandler writeFileHandler)

To use a different type of WriteFileHandler to generate the file. WriteFileHandler uses DiskFile by default, but you can use a different implementation if you need to. You might want to stream the data to the cloud, or override DiskFile to tweak file creation and initialize the file objects differently.

Parameters
writeFileHandlera specialized WriteFileHandler object.
Returns
An error status, 0 meaning success.

◆ trackBackgroundThreadQueueByteSize()

void vrs::RecordFileWriter::trackBackgroundThreadQueueByteSize ( )

It can be useful/necessary to track how much buffer memory is used by the background threads, so as to report errors, stop recording, or simply wait that enough data has been processed. To avoid threading/cache invalidation costs, this feature needs to be enabled by calling this method before you start writing to disk. For race conditions, you need to call this before the background thread is active. To be safe, call it before creating the file.

◆ waitForFileClosed()

int vrs::RecordFileWriter::waitForFileClosed ( )
virtual

Start writing all the pending records, and wait for the file to be written & closed.

Returns
A status code: 0 if no error occurred, a file system error code otherwise.

◆ writeRecordsAsync()

int vrs::RecordFileWriter::writeRecordsAsync ( double  maxTimestamp)

Send records older than the timestamp provided to be written to disk in a background thread.

Parameters
maxTimestampLargest timestamp of the records sent to be written to disk, i.e. all records to be written are older.
Returns
A status code: 0 if no error occurred, a file system error code otherwise.

◆ writeToFile()

int vrs::RecordFileWriter::writeToFile ( const string &  filePath)

Take all the records of all the registered and active recordables, and write them all to disk. All synchronous, won't return until the file is closed. On error, a (partial) file may exist.

Parameters
filePathPath relative or absolute where to write the VRS file.
Returns
A status code: 0 if no error occurred, a file system error code otherwise.

The documentation for this class was generated from the following files: