25#include <vrs/Compressor.h>
26#include <vrs/DiskFile.h>
27#include <vrs/FileFormat.h>
28#include <vrs/IndexRecord.h>
29#include <vrs/NewChunkHandler.h>
30#include <vrs/Record.h>
31#include <vrs/Recordable.h>
32#include <vrs/WriteFileHandler.h>
37struct RecordFileWriterTester;
50namespace RecordFileWriter_ {
51struct WriterThreadData;
52struct PurgeThreadData;
53struct CompressionThreadsData;
54struct RecordWriterData;
115 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX;
164 const string& filePath,
165 size_t maxChunkSizeMB = 0,
166 unique_ptr<NewChunkHandler>&& chunkHandler =
nullptr);
182 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
204 int autoPurgeRecords(
const function<
double()>& maxTimestampProvider,
double delay);
209 return file_->isOpened();
239 void setTag(
const string& tagName,
const string& tagValue);
242 void addTags(
const map<string, string>& newTags);
273 (this->streamId < rhs.streamId ||
275 (this->streamId == rhs.streamId &&
276 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
291 uint64_t collectOldRecords(
RecordBatch& batch,
double maxTimestamp);
292 bool autoCollectRecords(
bool checkTime);
293 static uint64_t addRecordBatchesToSortedRecords(
297 int createFile(
const string& filePath,
bool splitHead);
299 int writeRecordsSingleThread(
SortedRecords& records,
int lastError);
300 int writeRecordsMultiThread(
309 uint32_t compressedSize);
310 int completeAndCloseFile();
312 void backgroundWriteCollectedRecord();
313 bool addRecordsReadyToWrite(
SortedRecords& inOutRecordsToWrite);
316 set<Recordable*> recordables_;
317 mutable std::mutex recordablesMutex_;
322 uint64_t maxChunkSize_{};
323 unique_ptr<NewChunkHandler> newChunkHandler_;
325 uint32_t lastRecordSize_{};
326 bool sortRecords_{
true};
327 bool skipFinalizeIndexRecords_ =
false;
328 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
329 IndexRecord::Writer indexRecordWriter_;
330 map<string, string> fileTags_;
331 size_t compressionThreadPoolSize_{};
335 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_;
342 friend struct ::vrs::test::RecordFileWriterTester;
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:90
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:381
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:391
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:660
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:285
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:348
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:577
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:538
const map< string, string > & getTags() const
Definition RecordFileWriter.h:245
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:328
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:640
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:283
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:338
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:334
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:340
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:529
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:592
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:548
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:321
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:464
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:671
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:557
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:618
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:684
int closeFileAsync()
Definition RecordFileWriter.cpp:622
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:353
bool isWriting() const
Definition RecordFileWriter.h:208
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:612
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:296
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:525
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:287
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:81
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:135
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:249
Definition Compressor.cpp:113
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:62
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:58
Definition RecordFileWriter.h:266
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:270
Definition RecordFileWriter.cpp:129
Definition RecordFileWriter.cpp:210
Definition RecordFileWriter.cpp:919
Definition RecordFileWriter.cpp:162