VRS
A file format for sensor data.
Loading...
Searching...
No Matches
IndexRecord.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <deque>
20#include <set>
21
22#include <vrs/Compressor.h>
23#include <vrs/DiskFile.h>
24#include <vrs/FileFormat.h>
25#include <vrs/ForwardDefinitions.h>
26#include <vrs/NewChunkHandler.h>
27#include <vrs/Record.h>
28
29namespace vrs {
30
31using std::deque;
32using std::set;
33using std::unique_ptr;
34using std::vector;
35
36class ProgressLogger;
37
38struct DiskRecordIndexStruct;
39
41namespace IndexRecord {
42
43enum {
48};
49
50#pragma pack(push, 1)
51
54 DiskStreamId() : typeId(static_cast<int32_t>(RecordableTypeId::Undefined)), instanceId(0) {}
55 explicit DiskStreamId(StreamId streamId)
56 : typeId(static_cast<int32_t>(streamId.getTypeId())), instanceId(streamId.getInstanceId()) {}
57
58 int32_t typeId;
59 uint16_t instanceId;
60
61 inline RecordableTypeId getTypeId() const {
62 return FileFormat::readRecordableTypeId(typeId);
63 }
64
65 inline uint16_t getInstanceId() const {
66 return static_cast<uint16_t>(instanceId);
67 }
68
69 inline StreamId getStreamId() const {
70 return {getTypeId(), getInstanceId()};
71 }
72};
73
76 DiskRecordInfo() = default;
78 double timestampIn,
79 uint32_t recordSizeIn,
80 StreamId streamIdIn,
81 Record::Type recordTypeIn)
82 : timestamp(timestampIn),
83 recordSize(recordSizeIn),
84 recordType(static_cast<uint8_t>(recordTypeIn)),
85 streamId(streamIdIn) {}
86 DiskRecordInfo(StreamId streamIdIn, Record* record)
87 : timestamp(record->getTimestamp()),
88 recordSize(static_cast<uint32_t>(record->getSize())),
89 recordType(static_cast<uint8_t>(record->getRecordType())),
90 streamId(streamIdIn) {}
91
92 double timestamp{};
93 uint32_t recordSize{};
94 uint8_t recordType{};
95 DiskStreamId streamId{};
96
97 inline Record::Type getRecordType() const {
98 return static_cast<Record::Type>(recordType);
99 }
100
101 inline StreamId getStreamId() const {
102 return streamId.getStreamId();
103 }
104};
105
106#pragma pack(pop)
107
110 RecordInfo() = default;
112 double timestampIn,
113 int64_t fileOffsetIn,
114 StreamId streamIdIn,
115 Record::Type recordTypeIn)
116 : timestamp(timestampIn),
117 fileOffset(fileOffsetIn),
118 streamId(streamIdIn),
119 recordType(recordTypeIn) {}
120
121 double timestamp{};
122 int64_t fileOffset{};
125
126 bool operator<(const RecordInfo& rhs) const {
127 return this->timestamp < rhs.timestamp ||
128 (this->timestamp <= rhs.timestamp &&
129 (this->streamId < rhs.streamId ||
130 (this->streamId == rhs.streamId && this->fileOffset < rhs.fileOffset)));
131 }
132
133 bool operator==(const RecordInfo& rhs) const {
134 return this->timestamp == rhs.timestamp && this->fileOffset == rhs.fileOffset &&
135 this->streamId == rhs.streamId && this->recordType == rhs.recordType;
136 }
137};
138
140class Writer {
141 public:
142 explicit Writer(FileFormat::FileHeader& fileHeader) : fileHeader_{fileHeader} {}
143
144 void reset() {
145 streamIds_.clear();
146 writtenRecords_.clear();
147 writtenBytesCount_ = 0;
148 writtenIndexCount_ = 0;
149 splitHeadFile_.reset();
150 }
151
152 DiskFile& initSplitHead() {
153 splitHeadFile_ = std::make_unique<DiskFile>();
154 return *splitHeadFile_;
155 }
156
157 const std::unique_ptr<DiskFile>& getSplitHead() const {
158 return splitHeadFile_;
159 }
160
161 void addStream(StreamId id) {
162 streamIds_.insert(id);
163 }
164
165 int addRecord(double timestamp, uint32_t size, StreamId id, Record::Type recordType);
166
167 int preallocateClassicIndexRecord(
168 WriteFileHandler& file,
169 const deque<DiskRecordInfo>& preliminaryIndex,
170 uint32_t& outLastRecordSize);
171 void useClassicIndexRecord() {
172 preallocatedIndexRecordSize_ = 0;
173 }
174 int finalizeClassicIndexRecord(
175 WriteFileHandler& file,
176 int64_t endOfRecordsOffset,
177 uint32_t& outLastRecordSize);
178
179 int createSplitIndexRecord(uint32_t& outLastRecordSize);
180 int finalizeSplitIndexRecord(const unique_ptr<NewChunkHandler>& chunkHandler);
181
182 protected:
183 int appendToSplitIndexRecord();
184 int completeSplitIndexRecord();
185
186 private:
187 std::unique_ptr<DiskFile> splitHeadFile_; // When the file head is split from the user records
188 FileFormat::FileHeader& fileHeader_;
189 FileFormat::RecordHeader splitIndexRecordHeader_;
190 uint32_t preallocatedIndexRecordSize_{};
191 Compressor compressor_;
192 set<StreamId> streamIds_;
193 deque<IndexRecord::DiskRecordInfo> writtenRecords_;
194 size_t writtenBytesCount_{}; // how many bytes have been written in a partial index
195 size_t writtenIndexCount_{}; // how many index entries have been written in the partial index
196};
197
199class Reader {
200 public:
201 Reader(
202 FileHandler& file,
203 FileFormat::FileHeader& fileHeader,
204 ProgressLogger* progressLogger,
205 set<StreamId>& outStreamIds,
206 vector<RecordInfo>& outIndex);
207
208 bool isIndexComplete() const {
209 return indexComplete_;
210 }
211
212 int readRecord(int64_t firstUserRecordOffset, int64_t& outUsedFileSize);
213
218 int rebuildIndex(bool writeFixedIndex);
219
220 private:
221 int readRecord(
222 int64_t indexRecordOffset,
223 int64_t firstUserRecordOffset,
224 int64_t& outUsedFileSize);
225 int readClassicIndexRecord(
226 size_t indexRecordPayloadSize,
227 size_t uncompressedSize,
228 int64_t firstUserRecordOffset,
229 int64_t& outUsedFileSize);
230 int readSplitIndexRecord(size_t indexByteSize, size_t uncompressedSize, int64_t& outUsedFileSize);
231 int readDiskInfo(vector<DiskRecordInfo>& outRecords);
232
233 private:
234 FileHandler& file_;
235 const int64_t totalFileSize_;
236 FileFormat::FileHeader& fileHeader_;
237 ProgressLogger* progressLogger_;
238 set<StreamId>& streamIds_;
239 vector<RecordInfo>& index_;
240 unique_ptr<deque<IndexRecord::DiskRecordInfo>> diskIndex_; // only when rewriting the index
241 bool indexComplete_{};
242 bool hasSplitHeadChunk_{};
243 int32_t sortErrorCount_{};
244 int32_t droppedRecordCount_{};
245};
246
249 StreamId streamId;
250 Record::Type recordType;
251
252 bool operator<(const RecordSignature& rhs) const {
253 return this->recordType < rhs.recordType ||
254 (this->recordType == rhs.recordType && this->streamId < rhs.streamId);
255 }
256};
257
258} // namespace IndexRecord
259
260} // namespace vrs
Helper class to compress data using lz4 or zstd presets.
Definition Compressor.h:82
FileHandler implementation for disk files, with chunked file support.
Definition DiskFile.h:33
Class to abstract VRS file system operations, to enable support for alternate storage methods,...
Definition FileHandler.h:75
Helper class to read VRS index records.
Definition IndexRecord.h:199
int rebuildIndex(bool writeFixedIndex)
Definition IndexRecord.cpp:593
Helper class to write VRS index records.
Definition IndexRecord.h:140
ProgressLogger class to be notified of some process' progress.
Definition ProgressLogger.h:31
Essential VRS class holding a record's details and payload in memory during creation.
Definition Record.h:78
Type getRecordType() const
Get the record's record type.
Definition Record.h:146
Type
Definition Record.h:87
double getTimestamp() const
Get the record's timestamp.
Definition Record.h:132
size_t getSize() const
Get the record's payload size, uncompressed.
Definition Record.h:141
VRS stream identifier class.
Definition StreamId.h:245
The WriteFileHandler interface adds write operations to the FileHandler interface.
Definition WriteFileHandler.h:45
@ kSplitIndexFormatVersion
Definition IndexRecord.h:47
@ kClassicIndexFormatVersion
Definition IndexRecord.h:44
Definition Compressor.cpp:112
RecordableTypeId
VRS stream type or class identifier enum.
Definition StreamId.h:49
@ Undefined
Value used for default initializations and marking undefined situations.
Every file starts with this header, which may grow but not shrink!
Definition FileFormat.h:60
Every record starts with this header, and is followed by a raw data blob, which semantic is private t...
Definition FileFormat.h:111
Helper class to store details about a single VRS record on disk.
Definition IndexRecord.h:75
Helper class to store StreamID objects on disk.
Definition IndexRecord.h:53
Helper class to hold the details about a single VRS record in memory.
Definition IndexRecord.h:109
Record::Type recordType
type of record
Definition IndexRecord.h:124
double timestamp
timestamp of the record
Definition IndexRecord.h:121
StreamId streamId
creator of the record
Definition IndexRecord.h:123
int64_t fileOffset
absolute byte offset of the record in the whole file
Definition IndexRecord.h:122
This is used to count records to different kinds.
Definition IndexRecord.h:248