VRS
A file format for sensor data.
Loading...
Searching...
No Matches
AsyncDiskFileChunk.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include "DiskFile.h"
20
21#if VRS_ASYNC_DISKFILE_SUPPORTED()
22
23#include <cassert>
24#include <cstdio>
25
26#ifdef _WIN32
27#define VC_EXTRALEAN
28#include <Windows.h>
29#else
30#include <aio.h>
31#include <fcntl.h>
32#include <unistd.h>
33#endif
34
35#include <atomic>
36#include <condition_variable>
37#include <deque>
38#include <functional>
39#include <map>
40#include <memory>
41#include <string>
42
43#include <vrs/ErrorCode.h>
44#include <vrs/os/Platform.h>
45
46#define VRS_DISKFILECHUNK "AsyncDiskFileChunk"
47
48namespace vrs {
49
50#ifdef _WIN32
51// Windows doesn't normally define these.
52using ssize_t = int64_t;
53#define O_DIRECT 0x80000000U
54
55struct AsyncWindowsHandle {
56 AsyncWindowsHandle() : h_(INVALID_HANDLE_VALUE) {}
57 AsyncWindowsHandle(HANDLE h) : h_(h) {}
58 AsyncWindowsHandle(AsyncWindowsHandle&& rhs) : h_(rhs.h_) {
59 rhs.h_ = INVALID_HANDLE_VALUE;
60 }
61 AsyncWindowsHandle(AsyncWindowsHandle& rhs) : h_(rhs.h_) {}
62 AsyncWindowsHandle& operator=(AsyncWindowsHandle&& rhs) {
63 h_ = rhs.h_;
64 rhs.h_ = INVALID_HANDLE_VALUE;
65 return *this;
66 }
67
68 bool isOpened() const;
69 int open(const std::string& path, const char* modes, int flags);
70 int close();
71 int pwrite(const void* buf, size_t count, int64_t offset, size_t& outWriteSize);
72 int read(void* buf, size_t count, int64_t offset, size_t& outReadSize);
73 int truncate(int64_t newSize);
74 int seek(int64_t pos, int origin, int64_t& outFilepos);
75
76 private:
77 int _readwrite(bool readNotWrite, void* buf, size_t count, int64_t offset, size_t& outSize);
78
79 public:
80 HANDLE h_ = INVALID_HANDLE_VALUE;
81 std::mutex mtx_;
82};
83using AsyncHandle = AsyncWindowsHandle;
84#else
85struct AsyncFileDescriptor {
86 static constexpr int INVALID_FILE_DESCRIPTOR = -1;
87
88 AsyncFileDescriptor() = default;
89 explicit AsyncFileDescriptor(int fd) : fd_(fd) {}
90 AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
91 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
92 }
93 AsyncFileDescriptor(const AsyncFileDescriptor& rhs) noexcept = delete;
94 AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs) noexcept {
95 fd_ = rhs.fd_;
96 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
97 return *this;
98 }
99 AsyncFileDescriptor& operator=(const AsyncFileDescriptor& rhs) = delete;
100
101 bool operator==(int fd) const {
102 return fd_ == fd;
103 }
104
105 int open(const std::string& path, const char* modes, int flags);
106 [[nodiscard]] bool isOpened() const;
107 int read(void* ptr, size_t bufferSize, size_t offset, size_t& outReadSize);
108 int truncate(int64_t newSize);
109 int seek(int64_t pos, int origin, int64_t& outFilepos);
110 int pwrite(const void* buf, size_t count, off_t offset, size_t& written);
111 int close();
112
113 int fd_ = INVALID_FILE_DESCRIPTOR;
114};
115using AsyncHandle = AsyncFileDescriptor;
116#endif
117
118class AlignedBuffer {
119 private:
120 void* aligned_buffer_ = nullptr;
121 size_t capacity_ = 0;
122 size_t size_ = 0;
123
124 public:
125 AlignedBuffer(size_t size, size_t memalign, size_t lenalign);
126 virtual ~AlignedBuffer();
127
128 [[nodiscard]] inline size_t size() const {
129 return size_;
130 }
131 [[nodiscard]] inline size_t capacity() const {
132 return capacity_;
133 }
134 [[nodiscard]] inline bool empty() const {
135 return !size();
136 }
137 [[nodiscard]] inline bool full() const {
138 return size() == capacity();
139 }
140
141 void free();
142 void clear();
143 [[nodiscard]] inline void* data() const {
144 return aligned_buffer_;
145 }
146 [[nodiscard]] inline char* bdata() const {
147 return reinterpret_cast<char*>(aligned_buffer_);
148 }
149 [[nodiscard]] ssize_t add(const void* buffer, size_t size);
150};
151
152class AsyncBuffer;
153#ifdef _WIN32
154struct AsyncOVERLAPPED {
155 OVERLAPPED ov;
156 // Allows the completion routine to recover a pointer to the containing AsyncBuffer
157 AsyncBuffer* self;
158};
159#endif
160
161class AsyncBuffer : public AlignedBuffer {
162 public:
163 using complete_write_callback = std::function<void(ssize_t io_return, int io_errno)>;
164
165 AsyncBuffer(size_t size, size_t memalign, size_t lenalign)
166 : AlignedBuffer(size, memalign, lenalign) {}
167 ~AsyncBuffer() override = default;
168
169 void complete_write(ssize_t io_return, int io_errno);
170 [[nodiscard]] int
171 start_write(const AsyncHandle& file, int64_t offset, complete_write_callback on_complete);
172
173 private:
174#ifdef _WIN32
175 AsyncOVERLAPPED ov_;
176 static void CompletedWriteRoutine(DWORD dwErr, DWORD cbBytesWritten, LPOVERLAPPED lpOverlapped);
177#else
178 struct aiocb aiocb_ {};
179 static void SigEvNotifyFunction(union sigval val);
180#endif
181 complete_write_callback on_complete_ = nullptr;
182};
183
184class AsyncDiskFileChunk {
185 public:
186 AsyncDiskFileChunk() = default;
187 AsyncDiskFileChunk(std::string path, int64_t offset, int64_t size)
188 : path_{std::move(path)}, offset_{offset}, size_{size} {}
189 AsyncDiskFileChunk(AsyncDiskFileChunk&& other) noexcept;
190
191 // Prevent copying
192 AsyncDiskFileChunk(const AsyncDiskFileChunk& other) noexcept = delete;
193 AsyncDiskFileChunk& operator=(const AsyncDiskFileChunk& other) noexcept = delete;
194 AsyncDiskFileChunk& operator=(AsyncDiskFileChunk&& rhs) noexcept = delete;
195
196 ~AsyncDiskFileChunk();
197
198 int create(const std::string& newpath, const std::map<std::string, std::string>& options);
199 int open(bool readOnly, const std::map<std::string, std::string>& options);
200 int close();
201 int rewind();
202 [[nodiscard]] bool eof() const;
203 bool isOpened();
204 int write(const void* buffer, size_t count, size_t& outWrittenSize);
205 void setSize(int64_t newSize);
206 int flush();
207 int truncate(int64_t newSize);
208 int read(void* buffer, size_t count, size_t& outReadSize);
209 [[nodiscard]] int64_t getSize() const;
210 [[nodiscard]] bool contains(int64_t fileOffset) const;
211 int tell(int64_t& outFilepos) const;
212 int seek(int64_t pos, int origin);
213 [[nodiscard]] const std::string& getPath() const;
214 void setOffset(int64_t newOffset);
215 [[nodiscard]] int64_t getOffset() const;
216
217 enum class IoEngine {
218 Sync,
219 AIO,
220 PSync,
221 };
222
223 private:
224 struct QueuedWrite {
225 AsyncBuffer* buffer_;
226 // N.B. QueuedWrite's are guaranteed to be flushed before the associated file descriptor is
227 // close, so storing this via reference is safe.
228 const AsyncHandle& file_;
229 off_t offset_;
230 AsyncBuffer::complete_write_callback callback_;
231 QueuedWrite(
232 AsyncBuffer* buffer,
233 AsyncHandle& file,
234 off_t offset,
235 AsyncBuffer::complete_write_callback callback)
236 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
237 };
238
239 int flushWriteBuffer();
240 int ensureOpenNonDirect();
241 int ensureOpenDirect();
242 int ensureOpen_(int requested_flags);
243 void complete_write(AsyncBuffer* buffer, ssize_t io_return, int io_errno);
244 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock);
245 AsyncBuffer* get_free_buffer();
246 void free_buffer(AsyncBuffer*& buffer);
247 void free_buffer_locked(std::unique_lock<std::mutex>& lock, AsyncBuffer*& buffer);
248 void pump_buffers();
249 void pump_buffers_locked();
250 int alloc_write_buffers();
251 int free_write_buffers();
252 int init_parameters(const std::map<std::string, std::string>& options);
253
254 AsyncHandle file_{};
255 std::string path_; // path of this chunk
256 int64_t offset_{}; // offset of this chunk in the file
257 int64_t size_{}; // size of the chunk
258
259 // Keeps track of the current read/write position in the file of the current buffer.
260 int64_t file_position_ = 0;
261
262 const char* file_mode_ = nullptr;
263 // Keeps track of the flags currently in force for the opened fd_. Typically a subset of the
264 // supported_flags_
265 int current_flags_ = 0;
266 // The flags supported by the underlying path_ file
267 int supported_flags_ = 0;
268
269 // Protects the following members from the writing thread as well as the asyncio callback
270 // thread(s). Note that this lock is not really required on Windows, as the callbacks are
271 // delivered on the dispatching thread when it's in an alertable state.
272 std::mutex buffers_mutex_;
273 // Used to notify a waiting writing thread that a buffer was freed.
274 std::condition_variable buffer_freed_cv_;
275 // The list of free buffers
276 std::vector<AsyncBuffer*> buffers_free_;
277 // The list of buffers to be written. Drained by pump_buffers()
278 std::deque<QueuedWrite> buffers_queued_;
279 // A count of the number of buffers waiting on async completions
280 //
281 // This could be a std::atomic<size_t>, but the current implementation has to take the lock
282 // anyway to manage the list of buffers_free_, so don't bother.
283 size_t buffers_writing_ = 0;
284 // A list of all the buffers to keep them alive when they are being written (no longer in any
285 // other queue)
286 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
287 // The current buffer (if any) being filled by calls to `write()`. It will either be queued
288 // for async write by `write()`, or written out by `flushWriteBuffer()`
289 AsyncBuffer* current_buffer_ = nullptr;
290 // If != SUCCESS, represents errors that were signaled by async writes completing. Typically
291 // returned to the caller as the result of another, later operation (e.g. another write after
292 // the failure, or a call to flushWriteBuffer(), etc)
293 std::atomic<int> async_error_ = SUCCESS;
294
295 // Operational parameters initialized from the FileSpec extra params/options at create/open
296 // time. These can be tuned by the user via uri parameeters.
297 IoEngine ioengine_ = IoEngine::AIO;
298 bool use_directio_ = true;
299 // How many asyncio buffers to allocate and fill
300 size_t num_buffers_ = 0;
301 // The size of each individual buffer
302 size_t buffer_size_ = 0;
303 // The maximum number of simultaneous async_write operations allowed
304 size_t iodepth_ = 4;
305 // The requested alignment of buffer lengths and file offsets
306 size_t offset_align_ = 0;
307 // The requested length of memory alignment
308 size_t mem_align_ = 0;
309};
310
311} // namespace vrs
312
313#endif
int read(const string &cacheFile, set< StreamId > &outStreamIds, map< string, string > &outFileTags, map< StreamId, StreamTags > &outStreamTags, vector< IndexRecord::RecordInfo > &outRecordIndex, bool &outFileHasIndex)
Definition FileDetailsCache.cpp:268
int write(const string &cacheFile, const set< StreamId > &streamIds, const map< string, string > &fileTags, const map< StreamId, StreamTags > &streamTags, const vector< IndexRecord::RecordInfo > &recordIndex, bool fileHasIndex)
Definition FileDetailsCache.cpp:226
Definition Compressor.cpp:113