VRS
A file format for sensor data.
Loading...
Searching...
No Matches
RecordFileWriter.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <functional>
21#include <memory>
22#include <set>
23#include <thread>
24
25#include <vrs/Compressor.h>
26#include <vrs/DiskFile.h>
27#include <vrs/FileFormat.h>
28#include <vrs/IndexRecord.h>
29#include <vrs/NewChunkHandler.h>
30#include <vrs/Record.h>
31#include <vrs/Recordable.h>
32#include <vrs/WriteFileHandler.h>
33
34namespace vrs {
35
36namespace test {
37struct RecordFileWriterTester;
38} // namespace test
39
40using std::function;
41using std::map;
42using std::pair;
43using std::set;
44using std::string;
45
46class Record;
47class Recordable;
48
50namespace RecordFileWriter_ {
51struct WriterThreadData;
52struct PurgeThreadData;
53struct CompressionThreadsData;
54struct RecordWriterData;
55} // namespace RecordFileWriter_
56
58enum class ThreadRole { Writer, Purge, Compression };
59
62using InitCreatedThreadCallback = std::function<void(std::thread&, ThreadRole, int)>;
63
91 public:
93 RecordFileWriter(const RecordFileWriter&) = delete;
95 virtual ~RecordFileWriter() noexcept;
96
97 RecordFileWriter& operator=(const RecordFileWriter&) = delete;
98 RecordFileWriter& operator=(RecordFileWriter&&) = delete;
99
104 void addRecordable(Recordable* recordable);
105
108 vector<Recordable*> getRecordables() const;
109
114 void setCompressionThreadPoolSize(size_t size = kMaxThreadPoolSizeForHW);
115 static constexpr size_t kMaxThreadPoolSizeForHW = UINT32_MAX; // one-per-CPU core
116
123 void setInitCreatedThreadCallback(const InitCreatedThreadCallback& initCreatedThreadCallback);
124
130 int writeToFile(const string& filePath);
131
137 void purgeOldRecords(double maxTimestamp, bool recycleBuffers = true);
138
143 virtual int createFileAsync(const string& filePath);
144
164 const string& filePath,
165 size_t maxChunkSizeMB = 0,
166 unique_ptr<NewChunkHandler>&& chunkHandler = nullptr);
167
172 void setMaxChunkSizeMB(size_t maxChunkSizeMB);
173
182 int preallocateIndex(unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex);
183
188 int writeRecordsAsync(double maxTimestamp);
189
195 int autoWriteRecordsAsync(const function<double()>& maxTimestampProvider, double delay);
196
204 int autoPurgeRecords(const function<double()>& maxTimestampProvider, double delay);
205
208 bool isWriting() const {
209 return file_->isOpened();
210 }
211
223
226 int closeFileAsync();
227
230 virtual int waitForFileClosed();
231
239 void setTag(const string& tagName, const string& tagValue);
242 void addTags(const map<string, string>& newTags);
245 const map<string, string>& getTags() const {
246 return fileTags_;
247 }
248
255 int setWriteFileHandler(unique_ptr<WriteFileHandler> writeFileHandler);
256
263
266 struct SortRecord {
267 SortRecord(Record* recordIn, StreamId streamIdIn) : record(recordIn), streamId(streamIdIn) {}
268
270 bool operator<(const SortRecord& rhs) const {
271 return this->record->getTimestamp() < rhs.record->getTimestamp() ||
272 (this->record->getTimestamp() <= rhs.record->getTimestamp() &&
273 (this->streamId < rhs.streamId ||
274 // Records have a unique creation order within a particular device
275 (this->streamId == rhs.streamId &&
276 this->record->getCreationOrder() < rhs.record->getCreationOrder())));
277 }
278
279 Record* record;
280 StreamId streamId;
281 };
283 using RecordBatch = vector<pair<StreamId, list<Record*>>>;
285 using RecordBatches = vector<unique_ptr<RecordBatch>>;
287 using SortedRecords = deque<SortRecord>;
288
289 private:
291 uint64_t collectOldRecords(RecordBatch& batch, double maxTimestamp);
292 bool autoCollectRecords(bool checkTime);
293 static uint64_t addRecordBatchesToSortedRecords(
294 const RecordBatches& batch,
295 SortedRecords& inOutSortedRecords);
296 int createFileAsync(const string& filePath, bool splitHead);
297 int createFile(const string& filePath, bool splitHead);
298 int writeRecords(SortedRecords& records, int lastError);
299 int writeRecordsSingleThread(SortedRecords& records, int lastError);
300 int writeRecordsMultiThread(
301 RecordFileWriter_::CompressionThreadsData& compressionThreadsData,
302 SortedRecords& records,
303 int lastError);
304 void writeOneRecord(
306 Record* record,
307 StreamId id,
308 Compressor& compressor,
309 uint32_t compressedSize);
310 int completeAndCloseFile();
311
312 void backgroundWriteCollectedRecord();
313 bool addRecordsReadyToWrite(SortedRecords& inOutRecordsToWrite);
314
316 set<Recordable*> recordables_;
317 mutable std::mutex recordablesMutex_;
318
319 protected:
321 unique_ptr<WriteFileHandler> file_;
322 uint64_t maxChunkSize_{};
323 unique_ptr<NewChunkHandler> newChunkHandler_;
324 FileFormat::FileHeader fileHeader_;
325 uint32_t lastRecordSize_{};
326 bool sortRecords_{true}; // should records be sorted? Don't only for best IO throughput.
327 bool skipFinalizeIndexRecords_ = false; // for unit testing only!
328 unique_ptr<deque<IndexRecord::DiskRecordInfo>> preliminaryIndex_;
329 IndexRecord::Writer indexRecordWriter_;
330 map<string, string> fileTags_;
331 size_t compressionThreadPoolSize_{};
332
335 std::unique_ptr<std::atomic<uint64_t>> queueByteSize_; // background thread's queue byte size
336
339
340 InitCreatedThreadCallback initCreatedThreadCallback_;
341
342 friend struct ::vrs::test::RecordFileWriterTester; // for tests ONLY
343};
344
345} // namespace vrs
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
The class to create VRS files.
Definition RecordFileWriter.h:90
void purgeOldRecords(double maxTimestamp, bool recycleBuffers=true)
Definition RecordFileWriter.cpp:381
void backgroundWriterThreadActivity()
Definition RecordFileWriter.cpp:391
void setTag(const string &tagName, const string &tagValue)
Definition RecordFileWriter.cpp:660
vector< unique_ptr< RecordBatch > > RecordBatches
Series of record batches collected.
Definition RecordFileWriter.h:285
void setInitCreatedThreadCallback(const InitCreatedThreadCallback &initCreatedThreadCallback)
Definition RecordFileWriter.cpp:348
int autoWriteRecordsAsync(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:577
void setMaxChunkSizeMB(size_t maxChunkSizeMB)
Definition RecordFileWriter.cpp:538
const map< string, string > & getTags() const
Definition RecordFileWriter.h:245
vector< Recordable * > getRecordables() const
Definition RecordFileWriter.cpp:328
virtual int waitForFileClosed()
Definition RecordFileWriter.cpp:640
vector< pair< StreamId, list< Record * > > > RecordBatch
Batch of records collected at one point in time, for each recordable.
Definition RecordFileWriter.h:283
RecordFileWriter_::PurgeThreadData * purgeThreadData_
when a purge thread is active
Definition RecordFileWriter.h:338
RecordFileWriter_::WriterThreadData * writerThreadData_
when a background thread is active
Definition RecordFileWriter.h:334
void setCompressionThreadPoolSize(size_t size=kMaxThreadPoolSizeForHW)
Definition RecordFileWriter.cpp:340
int createChunkedFile(const string &filePath, size_t maxChunkSizeMB=0, unique_ptr< NewChunkHandler > &&chunkHandler=nullptr)
Definition RecordFileWriter.cpp:529
int autoPurgeRecords(const function< double()> &maxTimestampProvider, double delay)
Definition RecordFileWriter.cpp:592
int preallocateIndex(unique_ptr< deque< IndexRecord::DiskRecordInfo > > preliminaryIndex)
Definition RecordFileWriter.cpp:548
unique_ptr< WriteFileHandler > file_
data members valid while a file is being worked on
Definition RecordFileWriter.h:321
void backgroundPurgeThreadActivity()
Definition RecordFileWriter.cpp:464
void addTags(const map< string, string > &newTags)
Definition RecordFileWriter.cpp:671
int writeRecordsAsync(double maxTimestamp)
Definition RecordFileWriter.cpp:557
uint64_t getBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:618
int setWriteFileHandler(unique_ptr< WriteFileHandler > writeFileHandler)
Definition RecordFileWriter.cpp:684
int closeFileAsync()
Definition RecordFileWriter.cpp:622
int writeToFile(const string &filePath)
Definition RecordFileWriter.cpp:353
bool isWriting() const
Definition RecordFileWriter.h:208
void trackBackgroundThreadQueueByteSize()
Definition RecordFileWriter.cpp:612
void addRecordable(Recordable *recordable)
Definition RecordFileWriter.cpp:296
virtual int createFileAsync(const string &filePath)
Definition RecordFileWriter.cpp:525
deque< SortRecord > SortedRecords
List of records, sorted by time.
Definition RecordFileWriter.h:287
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:81
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:135
Class to override to implement a record producing device, or virtual device.
Definition Recordable.h:50
VRS stream identifier class.
Definition StreamId.h:249
Definition Compressor.cpp:113
std::function< void(std::thread &, ThreadRole, int)> InitCreatedThreadCallback
Definition RecordFileWriter.h:62
ThreadRole
Thread types that are created with the RecordFileWriter interface.
Definition RecordFileWriter.h:58
Every file starts with this header, which may grow but not shrink!
Definition FileFormat.h:60
Definition RecordFileWriter.h:266
bool operator<(const SortRecord &rhs) const
we are sorting records primarily by timestamp, but this order is a total order
Definition RecordFileWriter.h:270
Definition RecordFileWriter.cpp:129
Definition RecordFileWriter.cpp:210
Definition RecordFileWriter.cpp:919
Definition RecordFileWriter.cpp:162