The class to create VRS files.
More...
#include <RecordFileWriter.h>
|
using | RecordBatch = vector< pair< StreamId, list< Record * > > > |
| Batch of records collected at one point in time, for each recordable.
|
|
using | RecordBatches = vector< unique_ptr< RecordBatch > > |
| Series of record batches collected.
|
|
using | SortedRecords = deque< SortRecord > |
| List of records, sorted by time.
|
|
|
static constexpr size_t | kMaxThreadPoolSizeForHW = UINT32_MAX |
|
|
struct | ::vrs::test::RecordFileWriterTester |
|
The class to create VRS files.
There are different strategies to write a VRS file:
Write all the data of one or more recordables to a file synchronously in one shot:
To write the data of one or more recordables, progressively, while records are being generated, using a background thread:
- create a RecordFileWrite.
- add the (active) recordables you want to record using addRecordable().
- create the file using createFileAsync().This will create the file & write a few bytes, but should be quite quick.
- optional: call purgeOldData() to discard records that were created before what you really want to record.
- call writeRecordsAsync() regularly to write old enough records to disk in the background. (very fast call)
- optional: call closeFileAsync() to write the remaining unwritten records, in the background. (very fast call)
- call waitForFileClosed() to write all the unwritten records & wait for the file to be written & closed. (blocking call).
- profit!
◆ addRecordable()
void vrs::RecordFileWriter::addRecordable |
( |
Recordable * |
recordable | ) |
|
A record file holds data from various recordables, registered using this method. The ownership of the recordable is not transferred, and the caller is responsible for deleting the recordables after the RecordFileWriter is deleted.
- Parameters
-
recordable | Recordable object for the device to record. |
◆ addTags()
void vrs::RecordFileWriter::addTags |
( |
const map< string, string > & |
newTags | ) |
|
Add file tags in bulk.
- Parameters
-
newTags | A map of string name/value pairs. |
◆ autoPurgeRecords()
int vrs::RecordFileWriter::autoPurgeRecords |
( |
const function< double()> & |
maxTimestampProvider, |
|
|
double |
delay |
|
) |
| |
To purge old records automatically, when no file is being written. Note: while writing a VRS file asynchronously, record purging will automatically be disabled when the file is created, and re-enabled when the file is closed.
- Parameters
-
maxTimestampProvider | Function providing the timestamp of the newest record to be purged. |
delay | Number of seconds between automated calls to purge records. |
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
◆ autoWriteRecordsAsync()
int vrs::RecordFileWriter::autoWriteRecordsAsync |
( |
const function< double()> & |
maxTimestampProvider, |
|
|
double |
delay |
|
) |
| |
To collect & write new records automatically after opening the file.
- Parameters
-
maxTimestampProvider | Function providing the timestamp of the newest record to be sent to the background thread(s) writing records to disk. All records sent to disk are older. |
delay | Number of seconds between automated calls to send records for writing. |
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
◆ backgroundPurgeThreadActivity()
void vrs::RecordFileWriter::backgroundPurgeThreadActivity |
( |
| ) |
|
Background threads implementation: do not call!
◆ backgroundWriterThreadActivity()
void vrs::RecordFileWriter::backgroundWriterThreadActivity |
( |
| ) |
|
Background threads implementation: do not call!
◆ closeFileAsync()
int vrs::RecordFileWriter::closeFileAsync |
( |
| ) |
|
Request to close the file, when all data has been written, but don't wait for that.
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
◆ createChunkedFile()
int vrs::RecordFileWriter::createChunkedFile |
( |
const string & |
filePath, |
|
|
size_t |
maxChunkSizeMB = 0 , |
|
|
unique_ptr< NewChunkHandler > && |
chunkHandler = nullptr |
|
) |
| |
Create a VRS file to write to in a background thread, with a separate head file that will contain the file's header, and the description and index records only. All the user records will be written in one or more following chunks, that will be only written going forward, which makes them streaming friendly. The file's head will always be written using a DiskFile, and will be using updates/overwrites, which is not compatible with streaming, unless you can upload the file's head at the end, and prepend it to the previously uploaded user record chunk(s). A VRS file created this way will be efficient to stream when reading records in timestamp order with no seek backward or forward.
- Parameters
-
filePath | Path relative or absolute of the VRS file to create. The user record chunks will be named using the name provided, with "_1", "_2", etc as suffix. Note: the file's & recordables' tags will be written to disk before the call returns. Make sure to call this method only after you've added all your recordables and set all the tags. |
maxChunkSizeMB | max size of a chunk. Note that the last chunk may actually be larger. If maxChunkSizeMB is 0, at least two chunks will be created: one for the file's records, one for the file's header and index. |
chunkHandler | optional listener to be notified each time a chunk is complete, and when chunk handling should be completed (finalize uploads, maybe?). See NewChunkCallback's documentation for details. |
◆ createFileAsync()
int vrs::RecordFileWriter::createFileAsync |
( |
const string & |
filePath | ) |
|
|
virtual |
Create a VRS file to write to in a background thread.
- Parameters
-
filePath | Path relative or absolute of the VRS file to create. Note: the file's & recordables' tags will be written to disk before the call returns. Make sure to call this method only after you've added all your recordables and set all the tags. |
◆ getBackgroundThreadQueueByteSize()
uint64_t vrs::RecordFileWriter::getBackgroundThreadQueueByteSize |
( |
| ) |
|
Get how many record-bytes have been passed to the background thread, but not yet processed, which correlates with how much memory is being used by the queue.
- Returns
- The number record bytes are waiting to be processed by the background thread.
◆ getRecordables()
vector< Recordable * > vrs::RecordFileWriter::getRecordables |
( |
| ) |
const |
Get the recordables attached to this writer.
- Returns
- A vector of Recordable pointers.
◆ getTags()
const map< string, string > & vrs::RecordFileWriter::getTags |
( |
| ) |
const |
|
inline |
Get all the file tags at once.
- Returns
- A map of all the tags associated with the file itself.
◆ isWriting()
bool vrs::RecordFileWriter::isWriting |
( |
| ) |
const |
|
inline |
Tell if a disk file is being written.
- Returns
- True is a file is being written by this RecordFileWriter instance.
◆ preallocateIndex()
Pre-allocate space for an index similar to the one provided. Must be called before the file is created, but after all the recordables have been attached. Call this method just before creating the file using createFileAsync(). The index should be as similar to the real thing as possible, as it will be used to guess the size of the actual index, compressed. This method is meant to be used for copy operations, so that a compressed index can be pre-allocated based on the index of the source file(s).
- Parameters
-
preliminaryIndex | an index that resembles the index of the final file. |
- Returns
- 0 is the request is accepted.
◆ purgeOldRecords()
void vrs::RecordFileWriter::purgeOldRecords |
( |
double |
maxTimestamp, |
|
|
bool |
recycleBuffers = true |
|
) |
| |
Delete all records older than a certain time (useful to trim a live buffer).
- Parameters
-
maxTimestamp | timestamp cutoff for the records to purge; all purged records have been created before that maxTimestamp. |
recycleBuffer | Tell if the buffers should be recycled and the memory occupied by the records might not released immediately, or if the memory freed immediately. |
◆ setCompressionThreadPoolSize()
void vrs::RecordFileWriter::setCompressionThreadPoolSize |
( |
size_t |
size = kMaxThreadPoolSizeForHW | ) |
|
Set number of threads to use for background compression, or none will be used.
- Parameters
-
size | Number of threads to compress records in parallel. The default value will make RecordFileWriter use as many threads as there are cores in the system. If you do not set any value, RecordFileWriter will use only a single thread. |
◆ setInitCreatedThreadCallback()
Sets a callback that will be called when a Thread is created by this interface. This provides the user the opportunity to set the threads priority, name, etc... The new thread will use the callback so functions like gettid() can be used. This is an optional call, but must be performed before calling createFileAsync().
- Parameters
-
initCreatedThreadCallback | Callback that returns a reference to the thread created, the ThreadRole, and the thread index (only used for Compression threads) |
◆ setMaxChunkSizeMB()
void vrs::RecordFileWriter::setMaxChunkSizeMB |
( |
size_t |
maxChunkSizeMB | ) |
|
Set the maximum chunk size, as a number of MB, 0 meaning no chunking (infinite limit). Must be called after calling createFileAsync(), if it wasn't set then.
- Parameters
-
maxChunkSizeMB | Max number of MB by chunk. Actual chunks might be a bit smaller, to avoid splitting records. |
◆ setTag()
void vrs::RecordFileWriter::setTag |
( |
const string & |
tagName, |
|
|
const string & |
tagValue |
|
) |
| |
Set a tag value. Note: tags are written when the file is created! Changes made later will not be saved! Tags are written early, so that if the app crashes, or we run out of disk space, we don't loose them! An index can be rebuilt if it's missing in a truncated file, but tags need to be safe, or they're useless!
- Parameters
-
tagName | The name of the tag. |
tagValue | The value of the tag. Can be any string value, including a json message. |
◆ setWriteFileHandler()
int vrs::RecordFileWriter::setWriteFileHandler |
( |
unique_ptr< WriteFileHandler > |
writeFileHandler | ) |
|
To use a different type of WriteFileHandler to generate the file. WriteFileHandler uses DiskFile by default, but you can use a different implementation if you need to. You might want to stream the data to the cloud, or override DiskFile to tweak file creation and initialize the file objects differently.
- Parameters
-
- Returns
- An error status, 0 meaning success.
◆ trackBackgroundThreadQueueByteSize()
void vrs::RecordFileWriter::trackBackgroundThreadQueueByteSize |
( |
| ) |
|
It can be useful/necessary to track how much buffer memory is used by the background threads, so as to report errors, stop recording, or simply wait that enough data has been processed. To avoid threading/cache invalidation costs, this feature needs to be enabled by calling this method before you start writing to disk. For race conditions, you need to call this before the background thread is active. To be safe, call it before creating the file.
◆ waitForFileClosed()
int vrs::RecordFileWriter::waitForFileClosed |
( |
| ) |
|
|
virtual |
Start writing all the pending records, and wait for the file to be written & closed.
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
◆ writeRecordsAsync()
int vrs::RecordFileWriter::writeRecordsAsync |
( |
double |
maxTimestamp | ) |
|
Send records older than the timestamp provided to be written to disk in a background thread.
- Parameters
-
maxTimestamp | Largest timestamp of the records sent to be written to disk, i.e. all records to be written are older. |
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
◆ writeToFile()
int vrs::RecordFileWriter::writeToFile |
( |
const string & |
filePath | ) |
|
Take all the records of all the registered and active recordables, and write them all to disk. All synchronous, won't return until the file is closed. On error, a (partial) file may exist.
- Parameters
-
filePath | Path relative or absolute where to write the VRS file. |
- Returns
- A status code: 0 if no error occurred, a file system error code otherwise.
The documentation for this class was generated from the following files: