25#include "Compressor.h"
27#include "FileFormat.h"
28#include "IndexRecord.h"
29#include "NewChunkHandler.h"
31#include "Recordable.h"
32#include "WriteFileHandler.h"
37struct RecordFileWriterTester;
51namespace RecordFileWriter_ {
52struct WriterThreadData;
53struct PurgeThreadData;
54struct CompressionThreadsData;
55struct RecordWriterData;
113 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX;
162 const string& filePath,
163 size_t maxChunkSizeMB = 0,
164 unique_ptr<NewChunkHandler>&& chunkHandler =
nullptr);
180 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
202 int autoPurgeRecords(
const function<
double()>& maxTimestampProvider,
double delay);
207 return file_->isOpened();
237 void setTag(
const string& tagName,
const string& tagValue);
240 void addTags(
const map<string, string>& newTags);
271 (this->streamId < rhs.streamId ||
273 (this->streamId == rhs.streamId &&
274 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
289 uint64_t collectOldRecords(
RecordBatch& batch,
double maxTimestamp);
290 bool autoCollectRecords(
bool checkTime);
291 static uint64_t addRecordBatchesToSortedRecords(
295 int createFile(
const string& filePath,
bool splitHead);
297 int writeRecordsSingleThread(
SortedRecords& records,
int lastError);
298 int writeRecordsMultiThread(
307 uint32_t compressedSize);
308 int completeAndCloseFile();
310 void backgroundWriteCollectedRecord();
311 bool addRecordsReadyToWrite(
SortedRecords& inOutRecordsToWrite);
314 set<Recordable*> recordables_;
315 mutable std::mutex recordablesMutex_;
320 uint64_t maxChunkSize_{};
321 unique_ptr<NewChunkHandler> newChunkHandler_;
323 uint32_t lastRecordSize_{};
324 bool sortRecords_{
true};
325 bool skipFinalizeIndexRecords_ =
false;
326 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
327 IndexRecord::Writer indexRecordWriter_;
328 map<string, string> fileTags_;
329 size_t compressionThreadPoolSize_{};
333 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_;
340 friend struct ::vrs::test::RecordFileWriterTester;
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:91
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:357
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:367
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:636
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:283
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:324
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:553
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:514
const map< string, string > & getTags() const
Definition RecordFileWriter.h:243
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:304
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:616
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:281
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:336
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:332
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:316
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:505
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:568
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:524
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:319
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:440
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:647
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:533
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:594
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:660
int closeFileAsync()
Definition RecordFileWriter.cpp:598
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:329
bool isWriting() const
Definition RecordFileWriter.h:206
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:588
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:272
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:501
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:285
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:79
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:133
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:242
Definition AsyncDiskFileChunk.hpp:49
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:63
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:59
Definition RecordFileWriter.h:264
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:268
Definition RecordFileWriter.cpp:121
Definition RecordFileWriter.cpp:191
Definition RecordFileWriter.cpp:887
Definition RecordFileWriter.cpp:148