VRS
A file format for sensor data.
Loading...
Searching...
No Matches
AsyncDiskFileChunk.h
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <vrs/DiskFile.h>
20
21#if VRS_ASYNC_DISKFILE_SUPPORTED()
22
23#include <cassert>
24#include <cstdio>
25
26// POSIX AIO (aio_write/aio_error/aio_return) is available on Linux/Mac but not Android NDK.
27// When AIO is not available, fallback to PSync (O_DIRECT + pwrite) which bypasses page cache
28// without needing POSIX AIO.
29#define POSIX_AIO_SUPPORTED() (IS_MAC_PLATFORM() || IS_LINUX_PLATFORM())
30
31#if IS_WINDOWS_PLATFORM()
32#define VC_EXTRALEAN
33#include <Windows.h>
34#elif POSIX_AIO_SUPPORTED()
35#include <aio.h>
36#include <fcntl.h>
37#include <unistd.h>
38#else
39#include <fcntl.h>
40#include <unistd.h>
41#endif
42
43#include <atomic>
44#include <condition_variable>
45#include <deque>
46#include <functional>
47#include <map>
48#include <memory>
49#include <string>
50
51#include <vrs/ErrorCode.h>
52#include <vrs/os/Platform.h>
53
54#define VRS_DISKFILECHUNK "AsyncDiskFileChunk"
55
56namespace vrs {
57
58#if IS_WINDOWS_PLATFORM()
59// Windows doesn't normally define these.
60using ssize_t = int64_t;
61#define O_DIRECT 0x80000000U
62
63struct AsyncWindowsHandle {
64 AsyncWindowsHandle() : h_(INVALID_HANDLE_VALUE) {}
65 AsyncWindowsHandle(HANDLE h) : h_(h) {}
66 AsyncWindowsHandle(AsyncWindowsHandle&& rhs) : h_(rhs.h_) {
67 rhs.h_ = INVALID_HANDLE_VALUE;
68 }
69 AsyncWindowsHandle(AsyncWindowsHandle& rhs) : h_(rhs.h_) {}
70 AsyncWindowsHandle& operator=(AsyncWindowsHandle&& rhs) {
71 h_ = rhs.h_;
72 rhs.h_ = INVALID_HANDLE_VALUE;
73 return *this;
74 }
75
76 bool isOpened() const;
77 int open(const std::string& path, const char* modes, int flags);
78 int close();
79 int pwrite(const void* buf, size_t count, int64_t offset, size_t& outWriteSize);
80 int read(void* buf, size_t count, int64_t offset, size_t& outReadSize);
81 int truncate(int64_t newSize);
82 int seek(int64_t pos, int origin, int64_t& outFilepos);
83
84 private:
85 int _readwrite(bool readNotWrite, void* buf, size_t count, int64_t offset, size_t& outSize);
86
87 public:
88 HANDLE h_ = INVALID_HANDLE_VALUE;
89 std::mutex mtx_;
90};
91using AsyncHandle = AsyncWindowsHandle;
92#else
93struct AsyncFileDescriptor {
94 static constexpr int INVALID_FILE_DESCRIPTOR = -1;
95
96 AsyncFileDescriptor() = default;
97 explicit AsyncFileDescriptor(int fd) : fd_(fd) {}
98 AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
99 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
100 }
101 AsyncFileDescriptor(const AsyncFileDescriptor& rhs) noexcept = delete;
102 AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs) noexcept {
103 fd_ = rhs.fd_;
104 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
105 return *this;
106 }
107 AsyncFileDescriptor& operator=(const AsyncFileDescriptor& rhs) = delete;
108
109 bool operator==(int fd) const {
110 return fd_ == fd;
111 }
112
113 int open(const std::string& path, const char* modes, int flags);
114 [[nodiscard]] bool isOpened() const;
115 int read(void* ptr, size_t bufferSize, size_t offset, size_t& outReadSize);
116 int truncate(int64_t newSize);
117 int seek(int64_t pos, int origin, int64_t& outFilepos);
118 int pwrite(const void* buf, size_t count, off_t offset, size_t& written);
119 int close();
120
121 int fd_ = INVALID_FILE_DESCRIPTOR;
122};
123using AsyncHandle = AsyncFileDescriptor;
124#endif
125
126class AlignedBuffer {
127 private:
128 void* aligned_buffer_ = nullptr;
129 size_t capacity_ = 0;
130 size_t size_ = 0;
131
132 public:
133 AlignedBuffer(size_t size, size_t memalign, size_t lenalign);
134 virtual ~AlignedBuffer();
135
136 [[nodiscard]] inline size_t size() const {
137 return size_;
138 }
139 [[nodiscard]] inline size_t capacity() const {
140 return capacity_;
141 }
142 [[nodiscard]] inline bool empty() const {
143 return !size();
144 }
145 [[nodiscard]] inline bool full() const {
146 return size() == capacity();
147 }
148
149 void free();
150 void clear();
151 [[nodiscard]] inline void* data() const {
152 return aligned_buffer_;
153 }
154 [[nodiscard]] inline char* bdata() const {
155 return reinterpret_cast<char*>(aligned_buffer_);
156 }
157 [[nodiscard]] ssize_t add(const void* buffer, size_t size);
158};
159
160class AsyncBuffer;
161#if IS_WINDOWS_PLATFORM()
162struct AsyncOVERLAPPED {
163 OVERLAPPED ov;
164 // Allows the completion routine to recover a pointer to the containing AsyncBuffer
165 AsyncBuffer* self;
166};
167#endif
168
169class AsyncBuffer : public AlignedBuffer {
170 public:
171 using complete_write_callback = std::function<void(ssize_t io_return, int io_errno)>;
172
173 AsyncBuffer(size_t size, size_t memalign, size_t lenalign)
174 : AlignedBuffer(size, memalign, lenalign) {}
175 ~AsyncBuffer() override = default;
176
177 void complete_write(ssize_t io_return, int io_errno);
178 [[nodiscard]] int
179 start_write(const AsyncHandle& file, int64_t offset, complete_write_callback on_complete);
180
181 private:
182#if IS_WINDOWS_PLATFORM()
183 AsyncOVERLAPPED ov_;
184 static void CompletedWriteRoutine(DWORD dwErr, DWORD cbBytesWritten, LPOVERLAPPED lpOverlapped);
185#elif POSIX_AIO_SUPPORTED()
186 struct aiocb aiocb_{};
187 static void SigEvNotifyFunction(union sigval val);
188#endif
189 complete_write_callback on_complete_ = nullptr;
190};
191
192class AsyncDiskFileChunk {
193 public:
194 AsyncDiskFileChunk() = default;
195 AsyncDiskFileChunk(std::string path, int64_t offset, int64_t size)
196 : path_{std::move(path)}, offset_{offset}, size_{size} {}
197 AsyncDiskFileChunk(AsyncDiskFileChunk&& other) noexcept;
198
199 // Prevent copying
200 AsyncDiskFileChunk(const AsyncDiskFileChunk& other) noexcept = delete;
201 AsyncDiskFileChunk& operator=(const AsyncDiskFileChunk& other) noexcept = delete;
202 AsyncDiskFileChunk& operator=(AsyncDiskFileChunk&& rhs) noexcept = delete;
203
204 ~AsyncDiskFileChunk();
205
206 int create(const std::string& newpath, const FileSpec::Extras& options);
207 int open(bool readOnly, const FileSpec::Extras& options);
208 int close();
209 int rewind();
210 [[nodiscard]] bool eof() const;
211 bool isOpened();
212 int write(const void* buffer, size_t count, size_t& outWrittenSize);
213 void setSize(int64_t newSize);
214 int flush();
215 int truncate(int64_t newSize);
216 int read(void* buffer, size_t count, size_t& outReadSize);
217 [[nodiscard]] int64_t getSize() const;
218 [[nodiscard]] bool contains(int64_t fileOffset) const;
219 int tell(int64_t& outFilepos) const;
220 int seek(int64_t pos, int origin);
221 [[nodiscard]] const std::string& getPath() const;
222 void setOffset(int64_t newOffset);
223 [[nodiscard]] int64_t getOffset() const;
224
225 enum class IoEngine {
226 Sync,
227 AIO,
228 PSync,
229 };
230
231 private:
232 struct QueuedWrite {
233 AsyncBuffer* buffer_;
234 // N.B. QueuedWrite's are guaranteed to be flushed before the associated file descriptor is
235 // close, so storing this via reference is safe.
236 const AsyncHandle& file_;
237 off_t offset_;
238 AsyncBuffer::complete_write_callback callback_;
239 QueuedWrite(
240 AsyncBuffer* buffer,
241 AsyncHandle& file,
242 off_t offset,
243 AsyncBuffer::complete_write_callback callback)
244 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
245 };
246
247 int flushWriteBuffer();
248 int ensureOpenNonDirect();
249 int ensureOpenDirect();
250 int ensureOpen_(int requested_flags);
251 void complete_write(AsyncBuffer* buffer, ssize_t io_return, int io_errno);
252 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock);
253 AsyncBuffer* get_free_buffer();
254 void free_buffer(AsyncBuffer*& buffer);
255 void free_buffer_locked(std::unique_lock<std::mutex>& lock, AsyncBuffer*& buffer);
256 void pump_buffers();
257 void pump_buffers_locked();
258 int alloc_write_buffers();
259 int free_write_buffers();
260 int init_parameters(const FileSpec::Extras& options);
261
262 AsyncHandle file_{};
263 std::string path_; // path of this chunk
264 int64_t offset_{}; // offset of this chunk in the file
265 int64_t size_{}; // size of the chunk
266
267 // Keeps track of the current read/write position in the file of the current buffer.
268 int64_t file_position_ = 0;
269
270 const char* file_mode_ = nullptr;
271 // Keeps track of the flags currently in force for the opened fd_. Typically a subset of the
272 // supported_flags_
273 int current_flags_ = 0;
274 // The flags supported by the underlying path_ file
275 int supported_flags_ = 0;
276
277 // Protects the following members from the writing thread as well as the asyncio callback
278 // thread(s). Note that this lock is not really required on Windows, as the callbacks are
279 // delivered on the dispatching thread when it's in an alertable state.
280 std::mutex buffers_mutex_;
281 // Used to notify a waiting writing thread that a buffer was freed.
282 std::condition_variable buffer_freed_cv_;
283 // The list of free buffers
284 std::vector<AsyncBuffer*> buffers_free_;
285 // The list of buffers to be written. Drained by pump_buffers()
286 std::deque<QueuedWrite> buffers_queued_;
287 // A count of the number of buffers waiting on async completions
288 //
289 // This could be a std::atomic<size_t>, but the current implementation has to take the lock
290 // anyway to manage the list of buffers_free_, so don't bother.
291 size_t buffers_writing_ = 0;
292 // A list of all the buffers to keep them alive when they are being written (no longer in any
293 // other queue)
294 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
295 // The current buffer (if any) being filled by calls to `write()`. It will either be queued
296 // for async write by `write()`, or written out by `flushWriteBuffer()`
297 AsyncBuffer* current_buffer_ = nullptr;
298 // If != SUCCESS, represents errors that were signaled by async writes completing. Typically
299 // returned to the caller as the result of another, later operation (e.g. another write after
300 // the failure, or a call to flushWriteBuffer(), etc)
301 std::atomic<int> async_error_ = SUCCESS;
302
303 // Operational parameters initialized from the FileSpec extra params/options at create/open
304 // time. These can be tuned by the user via uri parameters.
305 IoEngine ioengine_ = IoEngine::AIO;
306 bool use_directio_ = true;
307 // How many asyncio buffers to allocate and fill
308 size_t num_buffers_ = 0;
309 // The size of each individual buffer
310 size_t buffer_size_ = 0;
311 // The maximum number of simultaneous async_write operations allowed
312 size_t iodepth_ = 4;
313 // The requested alignment of buffer lengths and file offsets
314 size_t offset_align_ = 0;
315 // The requested length of memory alignment
316 size_t mem_align_ = 0;
317};
318
319} // namespace vrs
320
321#endif
int read(const string &cacheFile, set< StreamId > &outStreamIds, map< string, string > &outFileTags, map< StreamId, StreamTags > &outStreamTags, vector< IndexRecord::RecordInfo > &outRecordIndex, bool &outFileHasIndex)
Definition FileDetailsCache.cpp:266
int write(const string &cacheFile, const set< StreamId > &streamIds, const map< string, string > &fileTags, const map< StreamId, StreamTags > &streamTags, const vector< IndexRecord::RecordInfo > &recordIndex, bool fileHasIndex)
Definition FileDetailsCache.cpp:225
Definition Compressor.cpp:113