VRS
A file format for sensor data.
Loading...
Searching...
No Matches
IndexRecord.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <deque>
20#include <set>
21
22#include "Compressor.h"
23#include "DiskFile.h"
24#include "FileFormat.h"
25#include "ForwardDefinitions.h"
26#include "NewChunkHandler.h"
27#include "Record.h"
28
29namespace vrs {
30
31using std::deque;
32using std::set;
33using std::unique_ptr;
34using std::vector;
35
36class ProgressLogger;
37
38struct DiskRecordIndexStruct;
39
41namespace IndexRecord {
42
43enum {
48};
49
50#pragma pack(push, 1)
51
54 DiskStreamId() : typeId(static_cast<int32_t>(RecordableTypeId::Undefined)), instanceId(0) {}
55 explicit DiskStreamId(StreamId streamId)
56 : typeId(static_cast<int32_t>(streamId.getTypeId())), instanceId(streamId.getInstanceId()) {}
57
60
61 RecordableTypeId getTypeId() const {
62 return FileFormat::readRecordableTypeId(typeId);
63 }
64
65 uint16_t getInstanceId() const {
66 return instanceId.get();
67 }
68
69 StreamId getStreamId() const {
70 return {getTypeId(), getInstanceId()};
71 }
72};
73
76 DiskRecordInfo() = default;
77 DiskRecordInfo(double timestamp, uint32_t recordSize, StreamId streamId, Record::Type recordType)
78 : timestamp(timestamp),
79 recordSize(recordSize),
80 recordType(static_cast<uint8_t>(recordType)),
81 streamId(streamId) {}
82 DiskRecordInfo(StreamId streamId, Record* record)
83 : timestamp(record->getTimestamp()),
84 recordSize(static_cast<uint32_t>(record->getSize())),
85 recordType(static_cast<uint8_t>(record->getRecordType())),
86 streamId(streamId) {}
87
91 DiskStreamId streamId;
92
93 Record::Type getRecordType() const {
94 return static_cast<Record::Type>(recordType.get());
95 }
96
97 StreamId getStreamId() const {
98 return streamId.getStreamId();
99 }
100};
101
102#pragma pack(pop)
103
106 RecordInfo() = default;
109
110 double timestamp{};
111 int64_t fileOffset{};
114
115 bool operator<(const RecordInfo& rhs) const {
116 return this->timestamp < rhs.timestamp ||
117 (this->timestamp <= rhs.timestamp &&
118 (this->streamId < rhs.streamId ||
119 (this->streamId == rhs.streamId && this->fileOffset < rhs.fileOffset)));
120 }
121
122 bool operator==(const RecordInfo& rhs) const {
123 return this->timestamp == rhs.timestamp && this->fileOffset == rhs.fileOffset &&
124 this->streamId == rhs.streamId && this->recordType == rhs.recordType;
125 }
126};
127
129class Writer {
130 public:
131 explicit Writer(FileFormat::FileHeader& fileHeader) : fileHeader_{fileHeader} {}
132
133 void reset() {
134 streamIds_.clear();
135 writtenRecords_.clear();
136 writtenBytesCount_ = 0;
137 writtenIndexCount_ = 0;
138 splitHeadFile_.reset();
139 }
140
141 DiskFile& initSplitHead() {
142 splitHeadFile_ = std::make_unique<DiskFile>();
143 return *splitHeadFile_;
144 }
145
146 const std::unique_ptr<DiskFile>& getSplitHead() const {
147 return splitHeadFile_;
148 }
149
150 void addStream(StreamId id) {
151 streamIds_.insert(id);
152 }
153
154 int addRecord(double timestamp, uint32_t size, StreamId id, Record::Type recordType);
155
156 int preallocateClassicIndexRecord(
157 WriteFileHandler& file,
158 const deque<DiskRecordInfo>& preliminaryIndex,
159 uint32_t& outLastRecordSize);
160 void useClassicIndexRecord() {
161 preallocatedIndexRecordSize_ = 0;
162 }
163 int finalizeClassicIndexRecord(
164 WriteFileHandler& file,
165 int64_t endOfRecordsOffset,
166 uint32_t& outLastRecordSize);
167
168 int createSplitIndexRecord(uint32_t& outLastRecordSize);
169 int finalizeSplitIndexRecord(const unique_ptr<NewChunkHandler>& chunkHandler);
170
171 protected:
172 int appendToSplitIndexRecord();
173 int completeSplitIndexRecord();
174
175 private:
176 std::unique_ptr<DiskFile> splitHeadFile_; // When the file head is split from the user records
177 FileFormat::FileHeader& fileHeader_;
178 FileFormat::RecordHeader splitIndexRecordHeader_;
179 uint32_t preallocatedIndexRecordSize_{};
180 Compressor compressor_;
181 set<StreamId> streamIds_;
182 deque<IndexRecord::DiskRecordInfo> writtenRecords_;
183 size_t writtenBytesCount_{}; // how many bytes have been written in a partial index
184 size_t writtenIndexCount_{}; // how many index entries have been written in the partial index
185};
186
188class Reader {
189 public:
190 Reader(
191 FileHandler& file,
192 FileFormat::FileHeader& fileHeader,
193 ProgressLogger* progressLogger,
194 set<StreamId>& outStreamIds,
195 vector<RecordInfo>& outIndex);
196
197 bool isIndexComplete() const {
198 return indexComplete_;
199 }
200
201 int readRecord(int64_t firstUserRecordOffset, int64_t& outUsedFileSize);
202
207 int rebuildIndex(bool writeFixedIndex);
208
209 private:
210 int readRecord(
211 int64_t indexRecordOffset,
212 int64_t firstUserRecordOffset,
213 int64_t& outUsedFileSize);
214 int readClassicIndexRecord(
215 size_t indexRecordPayloadSize,
216 size_t uncompressedSize,
217 int64_t firstUserRecordOffset,
218 int64_t& outUsedFileSize);
219 int readSplitIndexRecord(size_t indexByteSize, size_t uncompressedSize, int64_t& outUsedFileSize);
220 int readDiskInfo(vector<DiskRecordInfo>& outRecords);
221
222 private:
223 FileHandler& file_;
224 const int64_t totalFileSize_;
225 FileFormat::FileHeader& fileHeader_;
226 ProgressLogger* progressLogger_;
227 set<StreamId>& streamIds_;
228 vector<RecordInfo>& index_;
229 unique_ptr<deque<IndexRecord::DiskRecordInfo>> diskIndex_; // only when rewriting the index
230 bool indexComplete_{};
231 bool hasSplitHeadChunk_{};
232 int32_t sortErrorCount_{};
233 int32_t droppedRecordCount_{};
234};
235
238 StreamId streamId;
239 Record::Type recordType;
240
241 bool operator<(const RecordSignature& rhs) const {
242 return this->recordType < rhs.recordType ||
243 (this->recordType == rhs.recordType && this->streamId < rhs.streamId);
244 }
245};
246
247} // namespace IndexRecord
248
249} // namespace vrs
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
FileHandler implementation for disk files, with chunked file support.
Definition DiskFile.h:34
Placeholder layer for endianness support, if we ever need it.
Definition FileFormat.h:63
T get() const
Definition FileFormat.h:75
Class to abstract VRS file system operations, to enable support for alternate storage methods,...
Definition FileHandler.h:71
Helper class to read VRS index records.
Definition IndexRecord.h:188
int rebuildIndex(bool writeFixedIndex)
Definition IndexRecord.cpp:584
Helper class to write VRS index records.
Definition IndexRecord.h:129
ProgressLogger class to be notified of some process' progress.
Definition ProgressLogger.h:31
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:79
Type getRecordType() const
Get the record's record type.
Definition Record.h:147
Type
Definition Record.h:88
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:133
size_t getSize() const
Get the record's payload size, uncompressed.
Definition Record.h:142
VRS stream identifier class.
Definition StreamId.h:242
RecordableTypeId getTypeId() const
Definition StreamId.h:251
uint16_t getInstanceId() const
Definition StreamId.h:257
The WriteFileHandler interface adds write operations to the FileHandler interface.
Definition WriteFileHandler.h:45
@ kSplitIndexFormatVersion
Definition IndexRecord.h:47
@ kClassicIndexFormatVersion
Definition IndexRecord.h:44
Definition AsyncDiskFileChunk.hpp:49
RecordableTypeId
VRS stream type or class identifier enum.
Definition StreamId.h:49
@ Undefined
Value used for default initializations and marking undefined situations.
Every file starts with this header, which may grow but not shrink!
Definition FileFormat.h:89
Every record starts with this header, and is followed by a raw data blob, which semantic is private t...
Definition FileFormat.h:140
Helper class to store details about a single VRS record on disk.
Definition IndexRecord.h:75
Helper class to store StreamID objects on disk.
Definition IndexRecord.h:53
Helper class to hold the details about a single VRS record in memory.
Definition IndexRecord.h:105
Record::Type recordType
type of record
Definition IndexRecord.h:113
double timestamp
timestamp of the record
Definition IndexRecord.h:110
StreamId streamId
creator of the record
Definition IndexRecord.h:112
int64_t fileOffset
absolute byte offset of the record in the whole file
Definition IndexRecord.h:111
This is used to count records to different kinds.
Definition IndexRecord.h:237