25#include "Compressor.h"
27#include "FileFormat.h"
28#include "IndexRecord.h"
29#include "NewChunkHandler.h"
31#include "Recordable.h"
32#include "WriteFileHandler.h"
37struct RecordFileWriterTester;
51namespace RecordFileWriter_ {
52struct WriterThreadData;
53struct PurgeThreadData;
54struct CompressionThreadsData;
55struct RecordWriterData;
116 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX;
165 const string& filePath,
166 size_t maxChunkSizeMB = 0,
167 unique_ptr<NewChunkHandler>&& chunkHandler =
nullptr);
183 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
205 int autoPurgeRecords(
const function<
double()>& maxTimestampProvider,
double delay);
210 return file_->isOpened();
240 void setTag(
const string& tagName,
const string& tagValue);
243 void addTags(
const map<string, string>& newTags);
274 (this->streamId < rhs.streamId ||
276 (this->streamId == rhs.streamId &&
277 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
292 uint64_t collectOldRecords(
RecordBatch& batch,
double maxTimestamp);
293 bool autoCollectRecords(
bool checkTime);
294 static uint64_t addRecordBatchesToSortedRecords(
298 int createFile(
const string& filePath,
bool splitHead);
300 int writeRecordsSingleThread(
SortedRecords& records,
int lastError);
301 int writeRecordsMultiThread(
310 uint32_t compressedSize);
311 int completeAndCloseFile();
313 void backgroundWriteCollectedRecord();
314 bool addRecordsReadyToWrite(
SortedRecords& inOutRecordsToWrite);
317 set<Recordable*> recordables_;
318 mutable std::mutex recordablesMutex_;
323 uint64_t maxChunkSize_{};
324 unique_ptr<NewChunkHandler> newChunkHandler_;
326 uint32_t lastRecordSize_{};
327 bool sortRecords_{
true};
328 bool skipFinalizeIndexRecords_ =
false;
329 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
330 IndexRecord::Writer indexRecordWriter_;
331 map<string, string> fileTags_;
332 size_t compressionThreadPoolSize_{};
336 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_;
343 friend struct ::vrs::test::RecordFileWriterTester;
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:91
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:382
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:392
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:661
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:286
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:349
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:578
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:539
const map< string, string > & getTags() const
Definition RecordFileWriter.h:246
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:329
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:641
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:284
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:339
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:335
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:341
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:530
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:593
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:549
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:322
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:465
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:672
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:558
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:619
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:685
int closeFileAsync()
Definition RecordFileWriter.cpp:623
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:354
bool isWriting() const
Definition RecordFileWriter.h:209
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:613
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:297
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:526
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:288
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:79
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:133
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:243
Definition Compressor.cpp:113
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:63
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:59
Definition RecordFileWriter.h:267
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:271
Definition RecordFileWriter.cpp:130
Definition RecordFileWriter.cpp:211
Definition RecordFileWriter.cpp:916
Definition RecordFileWriter.cpp:163