VRS
A file format for sensor data.
Loading...
Searching...
No Matches
RecordFileWriter.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <functional>
21#include <memory>
22#include <set>
23#include <thread>
24
25#include "Compressor.h"
26#include "DiskFile.h"
27#include "FileFormat.h"
28#include "IndexRecord.h"
29#include "NewChunkHandler.h"
30#include "Record.h"
31#include "Recordable.h"
32#include "WriteFileHandler.h"
33
34namespace vrs {
35
36namespace test {
37struct RecordFileWriterTester;
38} // namespace test
39
40using std::function;
41using std::map;
42using std::multiset;
43using std::pair;
44using std::set;
45using std::string;
46
47class Record;
48class Recordable;
49
51namespace RecordFileWriter_ {
52struct WriterThreadData;
53struct PurgeThreadData;
54struct CompressionThreadsData;
55struct RecordWriterData;
56} // namespace RecordFileWriter_
57
59enum class ThreadRole { Writer, Purge, Compression };
60
63using InitCreatedThreadCallback = std::function<void(std::thread&, ThreadRole, int)>;
64
92 public:
94 RecordFileWriter(const RecordFileWriter&) = delete;
95 RecordFileWriter& operator=(const RecordFileWriter&) = delete;
96 virtual ~RecordFileWriter();
97
102 void addRecordable(Recordable* recordable);
103
106 vector<Recordable*> getRecordables() const;
107
112 void setCompressionThreadPoolSize(size_t size = kMaxThreadPoolSizeForHW);
113 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX; // one-per-CPU core
114
121 void setInitCreatedThreadCallback(const InitCreatedThreadCallback& initCreatedThreadCallback);
122
128 int writeToFile(const string& filePath);
129
135 void purgeOldRecords(double maxTimestamp, bool recycleBuffers = true);
136
141 virtual int createFileAsync(const string& filePath);
142
162 const string& filePath,
163 size_t maxChunkSizeMB = 0,
164 unique_ptr<NewChunkHandler>&& chunkHandler = nullptr);
165
170 void setMaxChunkSizeMB(size_t maxChunkSizeMB);
171
180 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
181
186 int writeRecordsAsync(double maxTimestamp);
187
193 int autoWriteRecordsAsync(const function<double()>& maxTimestampProvider, double delay);
194
202 int autoPurgeRecords(const function<double()>& maxTimestampProvider, double delay);
203
206 bool isWriting() const {
207 return file_->isOpened();
208 }
209
221
224 int closeFileAsync();
225
228 virtual int waitForFileClosed();
229
237 void setTag(const string& tagName, const string& tagValue);
240 void addTags(const map<string, string>& newTags);
243 const map<string, string>& getTags() const {
244 return fileTags_;
245 }
246
253 int setWriteFileHandler(unique_ptr<WriteFileHandler> writeFileHandler);
254
261
264 struct SortRecord {
265 SortRecord(Record* record, StreamId streamId) : record(record), streamId(streamId) {}
266
268 bool operator<(const SortRecord& rhs) const {
269 return this->record->getTimestamp() < rhs.record->getTimestamp() ||
270 (this->record->getTimestamp() <= rhs.record->getTimestamp() &&
271 (this->streamId < rhs.streamId ||
272 // Records have a unique creation order within a particular device
273 (this->streamId == rhs.streamId &&
274 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
275 }
276
277 Record* record;
278 StreamId streamId;
279 };
281 using RecordBatch = vector<pair<StreamId, list<Record*>>>;
283 using RecordBatches = vector<unique_ptr<RecordBatch>>;
285 using SortedRecords = deque<SortRecord>;
286
287 private:
289 uint64_t collectOldRecords(RecordBatch& batch, double maxTimestamp);
290 bool autoCollectRecords(bool checkTime);
291 static uint64_t addRecordBatchesToSortedRecords(
292 const RecordBatches& batch,
293 SortedRecords& inOutSortedRecords);
294 int createFileAsync(const string& filePath, bool splitHead);
295 int createFile(const string& filePath, bool splitHead);
296 int writeRecords(SortedRecords& records, int lastError);
297 int writeRecordsSingleThread(SortedRecords& records, int lastError);
298 int writeRecordsMultiThread(
299 RecordFileWriter_::CompressionThreadsData& compressionThreadsData,
300 SortedRecords& records,
301 int lastError);
302 void writeOneRecord(
304 Record* record,
305 StreamId id,
306 Compressor& compressor,
307 uint32_t compressedSize);
308 int completeAndCloseFile();
309
310 void backgroundWriteCollectedRecord();
311 bool addRecordsReadyToWrite(SortedRecords& inOutRecordsToWrite);
312
314 set<Recordable*> recordables_;
315 mutable std::mutex recordablesMutex_;
316
317 protected:
319 unique_ptr<WriteFileHandler> file_;
320 uint64_t maxChunkSize_{};
321 unique_ptr<NewChunkHandler> newChunkHandler_;
322 FileFormat::FileHeader fileHeader_;
323 uint32_t lastRecordSize_{};
324 bool sortRecords_{true}; // should records be sorted? Don't only for best IO throughput.
325 bool skipFinalizeIndexRecords_ = false; // for unit testing only!
326 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
327 IndexRecord::Writer indexRecordWriter_;
328 map<string, string> fileTags_;
329 size_t compressionThreadPoolSize_{};
330
333 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_; // background thread's queue byte size
334
337
338 InitCreatedThreadCallback initCreatedThreadCallback_;
339
340 friend struct ::vrs::test::RecordFileWriterTester; // for tests ONLY
341};
342
343} // namespace vrs
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:91
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:357
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:367
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:636
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:283
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:324
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:553
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:514
const map< string, string > & getTags() const
Definition RecordFileWriter.h:243
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:304
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:616
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:281
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:336
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:332
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:316
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:505
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:568
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:524
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:319
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:440
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:647
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:533
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:594
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:660
int closeFileAsync()
Definition RecordFileWriter.cpp:598
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:329
bool isWriting() const
Definition RecordFileWriter.h:206
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:588
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:272
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:501
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:285
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:79
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:133
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:242
Definition AsyncDiskFileChunk.hpp:49
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:63
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:59
Every file starts with this header, which may grow but not shrink!
Definition FileFormat.h:89
Definition RecordFileWriter.h:264
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:268
Definition RecordFileWriter.cpp:121
Definition RecordFileWriter.cpp:191
Definition RecordFileWriter.cpp:887
Definition RecordFileWriter.cpp:148