VRS
A file format for sensor data.
Loading...
Searching...
No Matches
RecordFileWriter.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <functional>
21#include <memory>
22#include <set>
23#include <thread>
24
25#include "Compressor.h"
26#include "DiskFile.h"
27#include "FileFormat.h"
28#include "IndexRecord.h"
29#include "NewChunkHandler.h"
30#include "Record.h"
31#include "Recordable.h"
32#include "WriteFileHandler.h"
33
34namespace vrs {
35
36namespace test {
37struct RecordFileWriterTester;
38} // namespace test
39
40using std::function;
41using std::map;
42using std::multiset;
43using std::pair;
44using std::set;
45using std::string;
46
47class Record;
48class Recordable;
49
51namespace RecordFileWriter_ {
52struct WriterThreadData;
53struct PurgeThreadData;
54struct CompressionThreadsData;
55struct RecordWriterData;
56} // namespace RecordFileWriter_
57
59enum class ThreadRole { Writer, Purge, Compression };
60
63using InitCreatedThreadCallback = std::function<void(std::thread&, ThreadRole, int)>;
64
92 public:
94 RecordFileWriter(const RecordFileWriter&) = delete;
96 virtual ~RecordFileWriter();
97
98 RecordFileWriter& operator=(const RecordFileWriter&) = delete;
99 RecordFileWriter& operator=(RecordFileWriter&&) = delete;
100
105 void addRecordable(Recordable* recordable);
106
109 vector<Recordable*> getRecordables() const;
110
115 void setCompressionThreadPoolSize(size_t size = kMaxThreadPoolSizeForHW);
116 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX; // one-per-CPU core
117
124 void setInitCreatedThreadCallback(const InitCreatedThreadCallback& initCreatedThreadCallback);
125
131 int writeToFile(const string& filePath);
132
138 void purgeOldRecords(double maxTimestamp, bool recycleBuffers = true);
139
144 virtual int createFileAsync(const string& filePath);
145
165 const string& filePath,
166 size_t maxChunkSizeMB = 0,
167 unique_ptr<NewChunkHandler>&& chunkHandler = nullptr);
168
173 void setMaxChunkSizeMB(size_t maxChunkSizeMB);
174
183 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
184
189 int writeRecordsAsync(double maxTimestamp);
190
196 int autoWriteRecordsAsync(const function<double()>& maxTimestampProvider, double delay);
197
205 int autoPurgeRecords(const function<double()>& maxTimestampProvider, double delay);
206
209 bool isWriting() const {
210 return file_->isOpened();
211 }
212
224
227 int closeFileAsync();
228
231 virtual int waitForFileClosed();
232
240 void setTag(const string& tagName, const string& tagValue);
243 void addTags(const map<string, string>& newTags);
246 const map<string, string>& getTags() const {
247 return fileTags_;
248 }
249
256 int setWriteFileHandler(unique_ptr<WriteFileHandler> writeFileHandler);
257
264
267 struct SortRecord {
268 SortRecord(Record* record, StreamId streamId) : record(record), streamId(streamId) {}
269
271 bool operator<(const SortRecord& rhs) const {
272 return this->record->getTimestamp() < rhs.record->getTimestamp() ||
273 (this->record->getTimestamp() <= rhs.record->getTimestamp() &&
274 (this->streamId < rhs.streamId ||
275 // Records have a unique creation order within a particular device
276 (this->streamId == rhs.streamId &&
277 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
278 }
279
280 Record* record;
281 StreamId streamId;
282 };
284 using RecordBatch = vector<pair<StreamId, list<Record*>>>;
286 using RecordBatches = vector<unique_ptr<RecordBatch>>;
288 using SortedRecords = deque<SortRecord>;
289
290 private:
292 uint64_t collectOldRecords(RecordBatch& batch, double maxTimestamp);
293 bool autoCollectRecords(bool checkTime);
294 static uint64_t addRecordBatchesToSortedRecords(
295 const RecordBatches& batch,
296 SortedRecords& inOutSortedRecords);
297 int createFileAsync(const string& filePath, bool splitHead);
298 int createFile(const string& filePath, bool splitHead);
299 int writeRecords(SortedRecords& records, int lastError);
300 int writeRecordsSingleThread(SortedRecords& records, int lastError);
301 int writeRecordsMultiThread(
302 RecordFileWriter_::CompressionThreadsData& compressionThreadsData,
303 SortedRecords& records,
304 int lastError);
305 void writeOneRecord(
307 Record* record,
308 StreamId id,
309 Compressor& compressor,
310 uint32_t compressedSize);
311 int completeAndCloseFile();
312
313 void backgroundWriteCollectedRecord();
314 bool addRecordsReadyToWrite(SortedRecords& inOutRecordsToWrite);
315
317 set<Recordable*> recordables_;
318 mutable std::mutex recordablesMutex_;
319
320 protected:
322 unique_ptr<WriteFileHandler> file_;
323 uint64_t maxChunkSize_{};
324 unique_ptr<NewChunkHandler> newChunkHandler_;
325 FileFormat::FileHeader fileHeader_;
326 uint32_t lastRecordSize_{};
327 bool sortRecords_{true}; // should records be sorted? Don't only for best IO throughput.
328 bool skipFinalizeIndexRecords_ = false; // for unit testing only!
329 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
330 IndexRecord::Writer indexRecordWriter_;
331 map<string, string> fileTags_;
332 size_t compressionThreadPoolSize_{};
333
336 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_; // background thread's queue byte size
337
340
341 InitCreatedThreadCallback initCreatedThreadCallback_;
342
343 friend struct ::vrs::test::RecordFileWriterTester; // for tests ONLY
344};
345
346} // namespace vrs
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:91
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:382
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:392
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:661
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:286
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:349
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:578
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:539
const map< string, string > & getTags() const
Definition RecordFileWriter.h:246
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:329
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:641
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:284
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:339
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:335
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:341
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:530
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:593
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:549
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:322
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:465
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:672
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:558
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:619
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:685
int closeFileAsync()
Definition RecordFileWriter.cpp:623
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:354
bool isWriting() const
Definition RecordFileWriter.h:209
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:613
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:297
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:526
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:288
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:79
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:133
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:243
Definition Compressor.cpp:113
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:63
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:59
Every file starts with this header, which may grow but not shrink!
Definition FileFormat.h:89
Definition RecordFileWriter.h:267
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:271
Definition RecordFileWriter.cpp:130
Definition RecordFileWriter.cpp:211
Definition RecordFileWriter.cpp:916
Definition RecordFileWriter.cpp:163