VRS
A file format for sensor data.
Loading...
Searching...
No Matches
AsyncDiskFileChunk.hpp
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <cassert>
20#include <cstdio>
21
22#ifdef _WIN32
23#include <windows.h>
24#else
25#include <aio.h>
26#include <fcntl.h>
27#include <unistd.h>
28#endif
29
30#include <atomic>
31#include <condition_variable>
32#include <deque>
33#include <functional>
34#include <map>
35#include <memory>
36#include <string>
37
38#include <logging/Log.h>
39
40#include <vrs/ErrorCode.h>
41#include <vrs/helpers/EnumStringConverter.h>
42#include <vrs/helpers/FileMacros.h>
43#include <vrs/helpers/Strings.h>
44#include <vrs/os/Platform.h>
45#include <vrs/os/Utils.h>
46
47#define VRS_DISKFILECHUNK "AsyncDiskFileChunk"
48
49namespace vrs {
50
51#ifdef _WIN32
52// Windows doesn't normally define these.
53using ssize_t = int64_t;
54#define O_DIRECT 0x80000000U
55
56struct AsyncWindowsHandle {
57 AsyncWindowsHandle() : h_(INVALID_HANDLE_VALUE) {}
58 AsyncWindowsHandle(HANDLE h) : h_(h) {}
59 AsyncWindowsHandle(AsyncWindowsHandle&& rhs) : h_(rhs.h_) {
60 rhs.h_ = INVALID_HANDLE_VALUE;
61 }
62 AsyncWindowsHandle(AsyncWindowsHandle& rhs) : h_(rhs.h_) {}
63 AsyncWindowsHandle& operator=(AsyncWindowsHandle&& rhs) {
64 h_ = rhs.h_;
65 rhs.h_ = INVALID_HANDLE_VALUE;
66 return *this;
67 }
68
69 bool isOpened() const {
70 return h_ != INVALID_HANDLE_VALUE;
71 }
72
73 int open(const std::string& path, const char* modes, int flags) {
74 // O_DIRECT is roughly equivalent to (FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH)
75 // We always open with FILE_FLAG_OVERLAPPED
76
77 DWORD dwDesiredAccess = 0;
78
79 bool badmode = false;
80 if (modes[0] != 0) {
81 for (size_t i = 1; modes[i] != 0; ++i) {
82 switch (modes[i]) {
83 case 'b':
84 // do nothing: binary mode is the only mode available
85 break;
86 case '+':
87 dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
88 break;
89
90 default:
91 badmode = true;
92 }
93 }
94 }
95
96 DWORD dwCreationDisposition = 0;
97 DWORD dwShareMode = 0;
98 int whence = SEEK_SET;
99 switch (modes[0]) {
100 case 'r':
101 dwCreationDisposition = dwDesiredAccess == 0 ? OPEN_EXISTING : OPEN_ALWAYS;
102 dwDesiredAccess |= GENERIC_READ;
103 dwShareMode = FILE_SHARE_READ;
104 break;
105 case 'w':
106 dwCreationDisposition = CREATE_ALWAYS;
107 dwDesiredAccess |= GENERIC_WRITE;
108 break;
109 case 'a':
110 dwCreationDisposition = OPEN_ALWAYS;
111 dwDesiredAccess |= GENERIC_WRITE;
112 dwShareMode = FILE_SHARE_READ;
113 whence = SEEK_END;
114 break;
115 default:
116 badmode = true;
117 }
118
119 if (badmode) {
120 XR_LOGCE(VRS_DISKFILECHUNK, "Unsupported open mode: '%s'", modes);
121 return INVALID_PARAMETER;
122 }
123
124 DWORD dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
125 if (flags & O_DIRECT) {
126 dwFlagsAndAttributes |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
127 }
128
129 h_ = CreateFileA(
130 path.c_str(),
131 dwDesiredAccess,
132 dwShareMode,
133 NULL,
134 dwCreationDisposition,
135 dwFlagsAndAttributes,
136 NULL);
137
138 if (h_ == INVALID_HANDLE_VALUE) {
139 return GetLastError();
140 }
141
142 int64_t pos;
143 return seek(0, whence, pos);
144 }
145
146 int close() {
147 if (!isOpened()) {
148 return SUCCESS;
149 }
150 HANDLE h = h_;
151 h_ = INVALID_HANDLE_VALUE;
152 int error = CloseHandle(h) ? SUCCESS : GetLastError();
153
154 return error;
155 }
156
157 int pwrite(const void* buf, size_t count, int64_t offset, size_t& outWriteSize) {
158 return _readwrite(false, (void*)buf, count, offset, outWriteSize);
159 }
160 int read(void* buf, size_t count, int64_t offset, size_t& outReadSize) {
161 return _readwrite(true, buf, count, offset, outReadSize);
162 }
163
164 int _readwrite(bool readNotWrite, void* buf, size_t count, int64_t offset, size_t& outSize) {
165 // This assumes that the file is opened with FILE_FLAG_OVERLAPPED
166
167 outSize = 0;
168
169 // TODO: do we ever need to support larger-than 4GB accesses?
170 DWORD dwToXfer = count;
171 if ((decltype(count))dwToXfer != count) {
172 return readNotWrite ? DISKFILE_NOT_ENOUGH_DATA : DISKFILE_PARTIAL_WRITE_ERROR;
173 }
174
175 // N.B. this does not create an hEvent for the OVERLAPPED structure, instead using the file
176 // handle. This is only a valid thing to do if there are NO other IO operations occuring during
177 // this one. The calls to flushWriteBuffer() before calling this ensures this is the case.
178 OVERLAPPED ov = {};
179 ov.Offset = (DWORD)offset;
180 ov.OffsetHigh = (DWORD)(offset >> 32);
181
182 DWORD dwNumberOfBytesTransferred = 0;
183 bool success = false;
184 if (readNotWrite) {
185 success = ReadFile(h_, buf, dwToXfer, &dwNumberOfBytesTransferred, &ov);
186 } else {
187 success = WriteFile(h_, buf, dwToXfer, &dwNumberOfBytesTransferred, &ov);
188 }
189
190 if (!success) {
191 int error = GetLastError();
192 if (error != ERROR_IO_PENDING) {
193 return error;
194 }
195
196 if (!GetOverlappedResult(h_, &ov, &dwNumberOfBytesTransferred, TRUE)) {
197 return GetLastError();
198 }
199 }
200
201 outSize = dwNumberOfBytesTransferred;
202 if (dwNumberOfBytesTransferred != count) {
203 return readNotWrite ? DISKFILE_NOT_ENOUGH_DATA : DISKFILE_PARTIAL_WRITE_ERROR;
204 }
205 return SUCCESS;
206 }
207
208 int truncate(int64_t newSize) {
209 LARGE_INTEGER distanceToMove, currentFilePointer;
210 // Get current filepointer
211 distanceToMove.QuadPart = 0;
212 if (!SetFilePointerEx(h_, distanceToMove, &currentFilePointer, FILE_CURRENT)) {
213 return GetLastError();
214 }
215
216 if (currentFilePointer.QuadPart > newSize) {
217 return DISKFILE_INVALID_STATE;
218 }
219
220 distanceToMove.QuadPart = newSize;
221 if (!SetFilePointerEx(h_, distanceToMove, nullptr, FILE_BEGIN)) {
222 return GetLastError();
223 }
224
225 if (!SetEndOfFile(h_)) {
226 return GetLastError();
227 }
228
229 if (!SetFilePointerEx(h_, currentFilePointer, nullptr, FILE_BEGIN)) {
230 return GetLastError();
231 }
232
233 return SUCCESS;
234 }
235
236 int seek(int64_t pos, int origin, int64_t& outFilepos) {
237 LARGE_INTEGER liPos, liNewPos;
238 liPos.QuadPart = pos;
239 outFilepos = 0;
240 static_assert(SEEK_SET == FILE_BEGIN);
241 static_assert(SEEK_END == FILE_END);
242 static_assert(SEEK_CUR == FILE_CURRENT);
243 if (!SetFilePointerEx(h_, liPos, &liNewPos, origin)) {
244 return GetLastError();
245 } else {
246 outFilepos = liNewPos.QuadPart;
247 return SUCCESS;
248 }
249 }
250
251 HANDLE h_ = INVALID_HANDLE_VALUE;
252 std::mutex mtx_;
253};
254using AsyncHandle = AsyncWindowsHandle;
255#else
257 static constexpr int INVALID_FILE_DESCRIPTOR = -1;
258
259 AsyncFileDescriptor() = default;
260 explicit AsyncFileDescriptor(int fd) : fd_(fd) {}
261 AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
262 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
263 }
264 AsyncFileDescriptor(const AsyncFileDescriptor& rhs) noexcept = delete;
265 AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs) noexcept {
266 fd_ = rhs.fd_;
267 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
268 return *this;
269 }
270 AsyncFileDescriptor& operator=(const AsyncFileDescriptor& rhs) = delete;
271
272 bool operator==(int fd) const {
273 return fd_ == fd;
274 }
275
276 int open(const std::string& path, const char* modes, int flags) {
277 assert(!isOpened());
278
279 int permissions = 0666;
280
281 bool badmode = false;
282 bool rdwr = false;
283 if (modes[0] != 0) {
284 for (size_t i = 1; modes[i] != 0; ++i) {
285 switch (modes[i]) {
286 case 'b':
287 // Linux has no O_BINARY
288 break;
289 case '+':
290 rdwr = true;
291 break;
292
293 default:
294 badmode = true;
295 }
296 }
297 }
298
299 int whence = SEEK_SET;
300 switch (modes[0]) {
301 case 'r':
302 flags |= rdwr ? O_RDWR : O_RDONLY;
303 break;
304 case 'w':
305 flags |= O_CREAT | O_TRUNC;
306 flags |= rdwr ? O_RDWR : O_WRONLY;
307 break;
308 case 'a':
309 flags |= rdwr ? O_RDWR : O_WRONLY;
310 flags |= O_CREAT | O_APPEND;
311 whence = rdwr ? SEEK_END : SEEK_SET;
312 break;
313 default:
314 badmode = true;
315 }
316
317 if (badmode) {
318 XR_LOGCE(VRS_DISKFILECHUNK, "Unsupported open mode: '%s'", modes);
319 return INVALID_PARAMETER;
320 }
321 int newFd = ::open(path.c_str(), flags, permissions);
322 if (newFd < 0) {
323 return errno;
324 }
325 if (::lseek64(newFd, 0, whence) < 0) {
326 ::close(newFd);
327 return errno;
328 }
329 fd_ = newFd;
330 return SUCCESS;
331 }
332
333 [[nodiscard]] bool isOpened() const {
334 return fd_ >= 0;
335 }
336
337 int read(void* ptr, size_t bufferSize, size_t offset, size_t& outReadSize) {
338 ssize_t ret = ::pread(fd_, ptr, bufferSize, offset);
339 if (ret < 0) {
340 outReadSize = 0;
341 return errno;
342 }
343
344 outReadSize = ret;
345 if (outReadSize != bufferSize) {
346 return DISKFILE_NOT_ENOUGH_DATA;
347 }
348 return SUCCESS;
349 }
350
351 int truncate(int64_t newSize) {
352#if IS_WINDOWS_PLATFORM()
353 return ::_chsize_s(fd_, newSize);
354#elif (defined(__ANDROID_API__) && __ANDROID_API__ >= 21)
355 return ::ftruncate64(fd_, static_cast<off64_t>(newSize));
356#else
357 return ::ftruncate(fd_, newSize);
358#endif
359 }
360
361 int seek(int64_t pos, int origin, int64_t& outFilepos) {
362 off64_t result = ::lseek64(fd_, pos, origin);
363 if (result < 0) {
364 outFilepos = 0;
365 return errno;
366 } else {
367 outFilepos = result;
368 return 0;
369 }
370 }
371
372 int pwrite(const void* buf, size_t count, off_t offset, size_t& written) {
373 ssize_t result = ::pwrite(fd_, buf, count, offset);
374 written = result;
375 if (result != count) {
376 if (result < 0) {
377 written = 0;
378 return errno;
379 }
380 return DISKFILE_PARTIAL_WRITE_ERROR;
381 }
382 return SUCCESS;
383 }
384
385 int close() {
386 if (fd_ < 0) {
387 return SUCCESS;
388 }
389 int fd = fd_;
390 fd_ = INVALID_FILE_DESCRIPTOR;
391 return ::close(fd);
392 }
393
394 int fd_ = INVALID_FILE_DESCRIPTOR;
395};
397#endif
398
400 private:
401 void* aligned_buffer_ = nullptr;
402 size_t capacity_ = 0;
403 size_t size_ = 0;
404
405 public:
406 AlignedBuffer(size_t size, size_t memalign, size_t lenalign) : capacity_(size) {
407 if (lenalign && 0 != (capacity_ % lenalign)) {
408 throw std::runtime_error("Capacity is not a multiple of lenalign");
409 }
410#ifdef _WIN32
411 aligned_buffer_ = _aligned_malloc(capacity_, memalign);
412#else
413 if (0 != posix_memalign(&aligned_buffer_, memalign, capacity_)) {
414 aligned_buffer_ = nullptr;
415 }
416#endif
417 if (aligned_buffer_ == nullptr) {
418 throw std::runtime_error("Failed to allocate aligned buffer");
419 }
420 }
421
422 virtual ~AlignedBuffer() {
423 free();
424 }
425 [[nodiscard]] inline size_t size() const {
426 return size_;
427 }
428 [[nodiscard]] inline size_t capacity() const {
429 return capacity_;
430 }
431 [[nodiscard]] inline bool empty() const {
432 return !size();
433 }
434
435 [[nodiscard]] inline bool full() const {
436 return size() == capacity();
437 }
438
439 void free() {
440 if (aligned_buffer_ == nullptr) {
441 return;
442 }
443#if defined(_WIN32)
444 _aligned_free(aligned_buffer_);
445#else
446 ::free(aligned_buffer_);
447#endif
448 aligned_buffer_ = nullptr;
449 capacity_ = 0;
450 size_ = 0;
451 }
452
453 void clear() {
454 size_ = 0;
455 }
456
457 [[nodiscard]] inline void* data() const {
458 return aligned_buffer_;
459 }
460 [[nodiscard]] inline char* bdata() const {
461 return reinterpret_cast<char*>(aligned_buffer_);
462 }
463
467 [[nodiscard]] ssize_t add(const void* buffer, size_t size) {
468 assert(size);
469
470 size_t capacity = this->capacity();
471 if (capacity == 0) {
472 return -1;
473 }
474 if (size_ >= capacity) {
475 throw std::runtime_error("buffer is already at capacity");
476 }
477 size_t tocopy = std::min<size_t>(size, capacity - size_);
478 memcpy(bdata() + size_, buffer, tocopy);
479 size_ += tocopy;
480
481 return tocopy;
482 }
483};
484
485class AsyncBuffer;
486#ifdef _WIN32
487struct AsyncOVERLAPPED {
488 OVERLAPPED ov;
489 // Allows the completion routine to recover a pointer to the containing AsyncBuffer
490 AsyncBuffer* self;
491};
492#endif
493
495 public:
496 using complete_write_callback = std::function<void(ssize_t io_return, int io_errno)>;
497
498 AsyncBuffer(size_t size, size_t memalign, size_t lenalign)
499 : AlignedBuffer(size, memalign, lenalign) {}
500 ~AsyncBuffer() override = default;
501
502 void complete_write(ssize_t io_return, int io_errno) {
503 on_complete_(io_return, io_errno);
504 }
505
506 [[nodiscard]] int
507 start_write(const AsyncHandle& file, int64_t offset, complete_write_callback on_complete) {
508 ssize_t io_return = 0;
509 int io_errno = SUCCESS;
510
511#ifdef _WIN32
512 ov_.self = this;
513 ov_.ov = {};
514 ov_.ov.Offset = (DWORD)offset;
515 ov_.ov.OffsetHigh = (DWORD)(offset >> 32);
516 if (!WriteFileEx(file.h_, AlignedBuffer::data(), size(), &ov_.ov, CompletedWriteRoutine)) {
517 io_return = -1;
518 io_errno = GetLastError();
519 if (io_errno == 0) {
520 io_errno = ERROR_GEN_FAILURE;
521 }
522 }
523
524#else
525 aiocb_ = {};
526 aiocb_.aio_fildes = file.fd_;
527 aiocb_.aio_offset = offset;
528 aiocb_.aio_buf = AlignedBuffer::data();
529 aiocb_.aio_nbytes = size();
530 aiocb_.aio_reqprio = 0;
531 aiocb_.aio_sigevent.sigev_notify = SIGEV_THREAD;
532 aiocb_.aio_sigevent.sigev_notify_function = SigEvNotifyFunction;
533 aiocb_.aio_sigevent.sigev_value.sival_ptr = this;
534 aiocb_.aio_sigevent.sigev_notify_attributes = nullptr;
535 aiocb_.aio_lio_opcode = 0; // used for lio_lioio only, unused
536 on_complete_ = std::move(on_complete);
537
538 if (aio_write(&aiocb_) != 0) {
539 io_return = -1;
540 io_errno = errno;
541 if (io_errno == 0) {
542 XR_LOGCD(VRS_DISKFILECHUNK, "aio_write failed, errno is 0");
543 io_errno = -1;
544 }
545 }
546 // The submission failed, call the cleanup function.
547 // Note that the return value of aio_write is a subset of the aio_return (which is what a normal
548 // completion calls). `aio_write` will either return -1 and set perror (same as aio_return), or
549 // will return 0
550#endif
551
552 if (io_return != 0) {
553 // If aio_write failed call the completion callback immediately to free the buffer
554 on_complete_(io_return, io_errno);
555 }
556 return io_return;
557 }
558
559 private:
560#ifdef _WIN32
561 AsyncOVERLAPPED ov_;
562
563 static void CompletedWriteRoutine(DWORD dwErr, DWORD cbBytesWritten, LPOVERLAPPED lpOverlapped) {
564 auto self = reinterpret_cast<AsyncOVERLAPPED*>(
565 reinterpret_cast<char*>(lpOverlapped) - offsetof(AsyncOVERLAPPED, ov))
566 ->self;
567 ssize_t io_return;
568 int io_errno;
569
570 if (dwErr == 0) {
571 io_errno = 0;
572 io_return = cbBytesWritten;
573 } else {
574 io_return = -1;
575 io_errno = 0;
576 }
577
578 self->complete_write(io_return, io_errno);
579 }
580#else
581 struct aiocb aiocb_ {};
582
583 static void SigEvNotifyFunction(union sigval val) {
584 auto* self = reinterpret_cast<AsyncBuffer*>(val.sival_ptr);
585
586 ssize_t io_return = 0;
587 int io_errno = 0;
588
589 io_errno = aio_error(&self->aiocb_);
590 if (io_errno == 0) {
591 io_return = aio_return(&self->aiocb_);
592 if (io_return < 0) {
593 throw std::runtime_error(
594 "aio_return returned a negative number despire aio_error indicating success");
595 }
596 } else if (io_errno == EINPROGRESS) {
597 throw std::runtime_error("aio_error()==EINPROGRESS on a completed aio_write");
598 } else if (io_errno == ECANCELED) {
599 // If canceled, aio_return will give -1
600 io_return = aio_return(&self->aiocb_);
601 if (io_return >= 0) {
602 throw std::runtime_error(
603 "aio_error() signaled cancellation, but aio_return indicated success");
604 }
605 } else if (io_errno > 0) {
606 io_return = aio_return(&self->aiocb_);
607 if (io_return >= 0) {
608 throw std::runtime_error("aio_error() signaled an error, but aio_return indicated success");
609 }
610 } else {
611 throw std::runtime_error("aio_error() returned an unexpected negative number");
612 }
613
614 self->complete_write(io_return, io_errno);
615 }
616
617#endif
618 complete_write_callback on_complete_ = nullptr;
619};
620
622 public:
623 AsyncDiskFileChunk() = default;
624 AsyncDiskFileChunk(std::string path, int64_t offset, int64_t size)
625 : path_{std::move(path)}, offset_{offset}, size_{size} {}
626 AsyncDiskFileChunk(AsyncDiskFileChunk&& other) noexcept {
627 file_ = std::move(other.file_);
628 path_ = std::move(other.path_);
629 offset_ = other.offset_;
630 size_ = other.size_;
631
632 // Keeps track of the current read/write position in the file of the current buffer.
633 file_position_ = other.file_position_;
634
635 file_mode_ = other.file_mode_;
636 current_flags_ = other.current_flags_;
637 supported_flags_ = other.supported_flags_;
638
639 // Note that these members are not movable
640 // buffers_mutex_
641 // buffer_freed_cv_
642
643 buffers_free_ = std::move(other.buffers_free_);
644 buffers_queued_ = std::move(other.buffers_queued_);
645 buffers_writing_ = other.buffers_writing_;
646 other.buffers_writing_ = 0;
647 buffers_ = std::move(other.buffers_);
648 current_buffer_ = other.current_buffer_;
649 other.current_buffer_ = nullptr;
650 async_error_ = other.async_error_.load();
651
652 ioengine_ = other.ioengine_;
653 use_directio_ = other.use_directio_;
654 num_buffers_ = other.num_buffers_;
655 buffer_size_ = other.buffer_size_;
656 iodepth_ = other.iodepth_;
657 offset_align_ = other.offset_align_;
658 mem_align_ = other.mem_align_;
659 }
660
661 // Prevent copying
662 AsyncDiskFileChunk(const AsyncDiskFileChunk& other) noexcept = delete;
663 AsyncDiskFileChunk& operator=(const AsyncDiskFileChunk& other) noexcept = delete;
664 AsyncDiskFileChunk& operator=(AsyncDiskFileChunk&& rhs) noexcept = delete;
665
667 try {
668 close();
669 } catch (std::exception& e) {
670 XR_LOGCE(VRS_DISKFILECHUNK, "Exception on close() during destruction: {}", e.what());
671 }
672 }
673
674 int create(const std::string& newpath, const std::map<std::string, std::string>& options) {
675 close();
676
677 path_ = newpath;
678 offset_ = 0;
679 size_ = 0;
680
681 file_position_ = 0;
682 async_error_ = SUCCESS;
683 file_mode_ = "wb+";
684
685 IF_ERROR_RETURN(init_parameters(options));
686 int error = ensureOpenDirect();
687 if (error != 0 && 0 != (O_DIRECT & supported_flags_)) {
688 error = ensureOpenNonDirect();
689 if (error == 0) {
690 XR_LOGCW(
691 VRS_DISKFILECHUNK,
692 "O_DIRECT appears not to be supported for {}, falling back to non-direct IO",
693 newpath);
694 supported_flags_ &= ~O_DIRECT;
695 }
696 }
697
698 if (error == 0) {
699#if IS_ANDROID_PLATFORM()
700 const size_t kBufferingSize = 128 * 1024;
701 error = setvbuf(file_.fd_, nullptr, _IOFBF, kBufferingSize);
702#endif
703 }
704
705 return error;
706 }
707
708 int open(bool readOnly, const std::map<std::string, std::string>& options) {
709 close();
710
711 file_position_ = 0;
712 async_error_ = SUCCESS;
713 file_mode_ = readOnly ? "rb" : "rb+";
714
715 IF_ERROR_RETURN(init_parameters(options));
716 return ensureOpenNonDirect();
717 }
718
719 int close() {
720 if (!isOpened()) {
721 return SUCCESS;
722 }
723
724 int error = flushWriteBuffer();
725
726 // Release the write buffers, if any. File chunking is a rare enough event that it's not worth
727 // trying to move these to the next currentChunk.
728 free_write_buffers();
729
730 int error2 = file_.close();
731 return error != 0 ? error : error2;
732 }
733
734 int rewind() {
735 // Normally rewind can't return an error, but this may be the only spot we have to return a
736 // deferred error If we can't return errors here, we'll need to remember that we must flush on
737 // the next operation, whatever it is, and THEN reset the file_position_. Pain.
738 //
739 // TODO: DiskFile doesn't currently check the return value of this, why would it? In a world in
740 // which writes may be failing asynchronously, what should DiskFile or RecordFileWriter do to
741 // recover, if anything?
742 IF_ERROR_RETURN(flushWriteBuffer());
743
744 file_position_ = 0;
745 async_error_ = SUCCESS;
746
747 return SUCCESS;
748 }
749
750 [[nodiscard]] bool eof() const {
751 int64_t pos = 0;
752 if (tell(pos) != 0) {
753 return false;
754 }
755
756 return pos == getSize();
757 }
758
759 bool isOpened() {
760 return file_.isOpened();
761 }
762
763 int write(const void* buffer, size_t count, size_t& outWrittenSize) {
764 if (count == 0) {
765 return SUCCESS;
766 } else if (!isOpened()) {
767 XR_LOGCE(VRS_DISKFILECHUNK, "DiskFile not opened");
768 return DISKFILE_NOT_OPEN;
769 }
770 const auto* bbuffer = static_cast<const char*>(buffer);
771 outWrittenSize = 0;
772
773 if (!isOpened()) {
774 return DISKFILE_NOT_OPEN;
775 }
776
777 if (count == 0) {
778 return SUCCESS;
779 }
780
781 // compute the number of bytes to write synchronously, if any.
782 size_t towrite = 0;
783 if (ioengine_ == IoEngine::Sync) {
784 // Write the entire buffer synchronously
785 towrite = count;
786 } else if (
787 use_directio_ && (current_buffer_ == nullptr || current_buffer_->empty()) &&
788 (file_position_ % offset_align_) != 0) {
789 // Write as much as we need to in order to hit offset_align_, then fill the buffers
790 towrite = std::min<size_t>(count, offset_align_ - (file_position_ % offset_align_));
791 } // Otherwise writes can be aligned to anything, write nothing synchronously here
792
793 if (towrite != 0) {
794 // Rather than read-modify-write lenalign chunks of the file, and deal with all of the
795 // corner cases of "do we overlap the end of the file or not, previously written data or
796 // not, etc", we just close/reopen the file here to do the handful of partial writes
797 // required by the library.
798
799 IF_ERROR_RETURN(flushWriteBuffer());
800
801 IF_ERROR_RETURN(ensureOpenNonDirect());
802
803 size_t thiswritten = 0;
804 IF_ERROR_RETURN(file_.pwrite(bbuffer, towrite, file_position_, thiswritten));
805 bbuffer += thiswritten;
806 count -= thiswritten;
807 outWrittenSize += thiswritten;
808 file_position_ += thiswritten;
809 }
810
811 if (count != 0 && current_buffer_ == nullptr) {
812 current_buffer_ = get_free_buffer();
813 if (current_buffer_ == nullptr) {
814 return ENOMEM;
815 }
816 }
817
818 while (count != 0) {
819 // This data is aligned to lenalign, so cache it in the current_buffer_
820 ssize_t additionalBuffered = current_buffer_->add(bbuffer, count);
821 if (additionalBuffered <= 0) {
822 return DISKFILE_PARTIAL_WRITE_ERROR;
823 }
824 bbuffer += additionalBuffered;
825 count -= additionalBuffered;
826 outWrittenSize += additionalBuffered;
827
828 if (current_buffer_->full()) {
829 IF_ERROR_RETURN(ensureOpenDirect());
830
831 towrite = current_buffer_->size();
832 switch (ioengine_) {
833 case IoEngine::AIO: {
834 // Other async IO engines like uring or libaio would go here in the fugure, and the
835 // `start_write` call would dispatch it
836 std::unique_lock lock{buffers_mutex_};
837 buffers_queued_.emplace_back(
838 current_buffer_,
839 file_,
840 file_position_,
841 [this, buffer = current_buffer_](ssize_t io_return, int io_errno) {
842 this->complete_write(buffer, io_return, io_errno);
843 });
844 current_buffer_ = nullptr;
845 file_position_ += towrite;
846 pump_buffers_locked();
847
848 if (count != 0) {
849 current_buffer_ = get_free_buffer_locked(lock);
850 if (!current_buffer_) {
851 return ENOMEM;
852 }
853 }
854 break;
855 }
856 case IoEngine::PSync: {
857 size_t thiswritten = 0;
858 int err = file_.pwrite(
859 current_buffer_->data(), current_buffer_->size(), file_position_, thiswritten);
860 // There's no need to release this buffer, as we've already written it. Save a fetch
861 // later
862 current_buffer_->clear();
863 file_position_ += thiswritten;
864 if (err) {
865 return err;
866 }
867 break;
868 }
869 default:
870 XR_LOGCE(VRS_DISKFILECHUNK, "Unhandled ioengine");
871 return VRSERROR_INTERNAL_ERROR;
872 }
873 }
874 }
875 return SUCCESS;
876 }
877
878 void setSize(int64_t newSize) {
879 size_ = newSize;
880 }
881
882 int flush() {
883 return flushWriteBuffer();
884 }
885
886 int truncate(int64_t newSize) {
887 IF_ERROR_RETURN(flushWriteBuffer());
888
889 IF_ERROR_RETURN(file_.truncate(newSize));
890 size_ = newSize;
891 return SUCCESS;
892 }
893
894 int read(void* buffer, size_t count, size_t& outReadSize) {
895 outReadSize = 0;
896 if (!isOpened()) {
897 return DISKFILE_NOT_OPEN;
898 }
899
900 // Finish writes in case we'll be reading data from pending writes
901 IF_ERROR_RETURN(flushWriteBuffer());
902 IF_ERROR_RETURN(ensureOpenNonDirect());
903
904 int error = file_.read(buffer, count, file_position_, outReadSize);
905 file_position_ += outReadSize;
906 return error;
907 }
908
909 [[nodiscard]] int64_t getSize() const {
910 return size_;
911 }
912
913 [[nodiscard]] bool contains(int64_t fileOffset) const {
914 return fileOffset >= offset_ && fileOffset < offset_ + size_;
915 }
916
917 int tell(int64_t& outFilepos) const {
918 outFilepos = file_position_ + (current_buffer_ ? current_buffer_->size() : 0);
919
920 return SUCCESS;
921 }
922
923 int seek(int64_t pos, int origin) {
924 // We don't know if we'll be reading or overwriting existing data, flush the buffers, and return
925 // any errors that may surface from the completing operations
926 IF_ERROR_RETURN(flushWriteBuffer());
927
928 // we track the file offset ourselves, but let os::fileSeek do the actual work to compute the
929 // final position, as our own `size_` member may not reflect the current size of the chunk.
930 IF_ERROR_RETURN(file_.seek(file_position_, SEEK_SET, file_position_));
931 IF_ERROR_RETURN(file_.seek(pos, origin, file_position_));
932
933 return SUCCESS;
934 }
935
936 [[nodiscard]] const std::string& getPath() const {
937 return path_;
938 }
939
940 void setOffset(int64_t newOffset) {
941 offset_ = newOffset;
942 }
943
944 [[nodiscard]] int64_t getOffset() const {
945 return offset_;
946 }
947
948 private:
949 enum class IoEngine {
950 Sync,
951 AIO,
952 PSync,
953 };
954
955 struct QueuedWrite {
956 AsyncBuffer* buffer_;
957 // N.B. QueuedWrite's are guaranteed to be flushed before the associated file descriptor is
958 // close, so storing this via reference is safe.
959 const AsyncHandle& file_;
960 off_t offset_;
961 AsyncBuffer::complete_write_callback callback_;
962 QueuedWrite(
963 AsyncBuffer* buffer,
964 AsyncHandle& file,
965 off_t offset,
966 AsyncBuffer::complete_write_callback callback)
967 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
968 };
969
970 int flushWriteBuffer() {
971 // Allow any other aio writes to complete. Note that `buffers_` may be empty because of
972 // default construction and swapping.
973 if (!buffers_.empty()) {
974 std::unique_lock lock{buffers_mutex_};
975 size_t expected_free = buffers_.size() - (current_buffer_ ? 1 : 0);
976 while (buffers_free_.size() != expected_free) {
977 // N.B. as buffers are freed they pump the queue to completion.
978 buffer_freed_cv_.wait(
979 lock, [this, expected_free] { return buffers_free_.size() == expected_free; });
980 }
981
982 int async_error = async_error_.exchange(0);
983 if (async_error != 0) {
984 XR_LOGCE(VRS_DISKFILECHUNK, "Returning async error on flush {}", async_error);
985 return async_error;
986 }
987 }
988
989 if (current_buffer_ && !current_buffer_->empty()) {
990 IF_ERROR_RETURN(ensureOpenNonDirect());
991
992 while (current_buffer_) {
993 size_t towrite = current_buffer_->size();
994
995 // if we've gotten here we're flushing, so just pwrite() the contents, don't bother being
996 // fast about it
997 size_t thiswritten = 0;
998 int error = file_.pwrite(current_buffer_->data(), towrite, file_position_, thiswritten);
999 free_buffer(current_buffer_);
1000 if (error != 0) {
1001 return error;
1002 }
1003 file_position_ += thiswritten;
1004 }
1005 }
1006
1007 if (current_buffer_) {
1008 free_buffer(current_buffer_);
1009 }
1010
1011 return SUCCESS;
1012 }
1013
1014 int ensureOpenNonDirect() {
1015 return ensureOpen_(supported_flags_ & ~O_DIRECT);
1016 }
1017
1018 int ensureOpenDirect() {
1019 return ensureOpen_(supported_flags_);
1020 }
1021
1022 int ensureOpen_(int requested_flags) {
1023 bool no_truncate = false;
1024 if (file_.isOpened()) {
1025 if (requested_flags == current_flags_) {
1026 return SUCCESS;
1027 }
1028 no_truncate = true;
1029 file_.close();
1030 }
1031
1032 const char* mode = file_mode_;
1033 if (mode == nullptr) {
1034 return DISKFILE_NOT_OPEN;
1035 }
1036
1037 bool readOnly = (mode[0] == 'w') && (strchr(mode, '+') == nullptr);
1038
1039 // When re-opening a file here, we must convert 'w' modes to 'r+' modes to ensure that we do not
1040 // truncate the file. This could fail if we don't have read permissions on the drive. If so,
1041 // we'd need to refactor so that we can provide the `O_TRUNC` or not flag to `open()`
1042 //
1043 // We assume that all VRS modes are binary here to avoid more string manipulation.
1044 if (mode[0] == 'w' && no_truncate) {
1045 mode = "rb+";
1046 }
1047
1048 if (!readOnly) {
1049 IF_ERROR_RETURN(alloc_write_buffers());
1050 }
1051
1052 int error = file_.open(path_, mode, requested_flags);
1053 if (error != 0) {
1054 close();
1055 return error;
1056 }
1057 current_flags_ = requested_flags;
1058
1059#if IS_ANDROID_PLATFORM()
1060 const size_t kBufferingSize = 128 * 1024;
1061 IF_ERROR_LOG(setvbuf(newFd, nullptr, _IOFBF, kBufferingSize));
1062#endif
1063
1064 return SUCCESS;
1065 }
1066
1067 void complete_write(AsyncBuffer* buffer, ssize_t io_return, int io_errno) {
1068 // N.B. this is called asynchronously by the write completion "thread" from the kernel, it
1069 // must be thread-safe
1070
1071 if (io_return == buffer->size()) {
1072 if (io_errno != SUCCESS) {
1073 XR_LOGCD(
1074 VRS_DISKFILECHUNK,
1075 "io_return was the size of the buffer, but io_errno is {}",
1076 io_errno);
1077 io_errno = SUCCESS;
1078 }
1079 } else {
1080 if (io_return < 0) {
1081 if (io_errno == SUCCESS) {
1082 XR_LOGCD(VRS_DISKFILECHUNK, "io_errno is 0 but io_return < 0");
1083 io_errno = DISKFILE_INVALID_STATE;
1084 }
1085 } else {
1086 // this was a partial write. Ignore io_errno, and signal it ourselves
1087 io_errno = DISKFILE_PARTIAL_WRITE_ERROR;
1088 }
1089 }
1090
1091 if (io_errno != SUCCESS) {
1092 int current_error = async_error_.load();
1093 while (current_error == SUCCESS &&
1094 !async_error_.compare_exchange_weak(current_error, io_errno)) {
1095 }
1096 }
1097
1098 {
1099 std::unique_lock lock{buffers_mutex_};
1100 free_buffer_locked(lock, buffer);
1101 buffers_writing_ -= 1;
1102 pump_buffers_locked();
1103 }
1104 }
1105
1106 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock) {
1107 if (buffers_free_.empty()) {
1108 buffer_freed_cv_.wait(lock, [this] { return !buffers_free_.empty(); });
1109 }
1110 assert(!buffers_free_.empty());
1111 auto* buffer = buffers_free_.back();
1112 buffers_free_.pop_back();
1113 assert(buffer);
1114 assert(buffer->empty());
1115 return buffer;
1116 }
1117
1118 AsyncBuffer* get_free_buffer() {
1119 std::unique_lock lock{buffers_mutex_};
1120 return get_free_buffer_locked(lock);
1121 }
1122
1123 void free_buffer(AsyncBuffer*& buffer) {
1124 std::unique_lock lock{buffers_mutex_};
1125 free_buffer_locked(lock, buffer);
1126 }
1127
1128 void free_buffer_locked(std::unique_lock<std::mutex>& /* lock */, AsyncBuffer*& buffer) {
1129 buffer->clear();
1130 buffers_free_.push_back(buffer);
1131 buffer = nullptr;
1132 buffer_freed_cv_.notify_one();
1133 }
1134
1135 void pump_buffers() {
1136 std::unique_lock lock{buffers_mutex_};
1137 pump_buffers_locked();
1138 }
1139
1140 void pump_buffers_locked() {
1141 // You must own a lock on buffers_mutex_ to call this
1142
1143 // Move as many queued buffers as we can to the writing state
1144 while (buffers_writing_ < iodepth_ && !buffers_queued_.empty()) {
1145 int result = SUCCESS;
1146 {
1147 // N.B. item's storage in in buffers_queued_, don't `pop_front()` until we're done using
1148 // it
1149 auto& item = buffers_queued_.front();
1150 result = item.buffer_->start_write(item.file_, item.offset_, std::move(item.callback_));
1151 }
1152 buffers_queued_.pop_front();
1153 // Note that `async_error_` is set by the completion routine on a start_write failure, no need
1154 // to modify it here
1155 if (result == SUCCESS) {
1156 buffers_writing_ += 1;
1157 }
1158 }
1159 }
1160
1161 int alloc_write_buffers() {
1162 assert(buffers_writing_ == 0);
1163 buffers_free_.reserve(num_buffers_);
1164 buffers_.reserve(num_buffers_);
1165 while (buffers_.size() < num_buffers_) {
1166 auto buffer = std::make_unique<AsyncBuffer>(buffer_size_, mem_align_, offset_align_);
1167 if (!buffer) {
1168 return ENOMEM;
1169 }
1170 buffers_free_.push_back(buffer.get());
1171 buffers_.push_back(std::move(buffer));
1172 }
1173 return SUCCESS;
1174 }
1175
1176 int free_write_buffers() {
1177 assert(buffers_free_.size() == buffers_.size());
1178 assert(buffers_writing_ == 0);
1179 assert(buffers_queued_.empty());
1180 current_buffer_ = nullptr;
1181 buffers_free_.clear();
1182 buffers_.clear();
1183 return SUCCESS;
1184 }
1185
1186 int init_parameters(const std::map<std::string, std::string>& options) {
1187 static const char* sIoEngineTypes[] = {"sync", "aio", "psync"};
1188 struct IoEngineTypeConverter : public EnumStringConverter<
1189 IoEngine,
1190 sIoEngineTypes,
1191 COUNT_OF(sIoEngineTypes),
1192 IoEngine::AIO,
1193 IoEngine::AIO,
1194 true> {};
1195
1196 // The VRS_DISKFILECHUNKASYNC_* options are primarily used for running the test suite with
1197 // different default IO configurations
1198 if (!helpers::getBool(options, "direct", use_directio_) &&
1199 !helpers::getBool(options, "directio", use_directio_)) {
1200 use_directio_ = true;
1201 }
1202
1203#ifdef VRS_BUILDTYPE_TSAN
1204 // N.B. The aio_notify completions come in on a thread spawned from glibc that is not
1205 // tsan-instrumented. As a result, the `malloc()` call in the `aio_notify()` (which does go
1206 // through the tsan version) crashes when it tries to access the tsan thread state for tracking
1207 // the allocation. Force the use of the non-aio APIs in this case.
1208 ioengine_ = IoEngine::Sync;
1209#else
1210 {
1211 ioengine_ = IoEngine::AIO; // default, unless overridden
1212 auto it = options.find("ioengine");
1213 if (it != options.end()) {
1214 // ioengine names here have been chosen to correspond to the `fio` program's `ioengine` as
1215 // closely as possible, except `sync`, which acts like the basic DiskFileChunk.hpp, which
1216 // synchronously writes the buffer to disk write away, no buffering in this class.
1217 ioengine_ = IoEngineTypeConverter::toEnum(it->second);
1218 }
1219 }
1220#endif
1221
1222 bool needBuffers = use_directio_ || (ioengine_ != IoEngine::Sync);
1223 if (!needBuffers) {
1224 supported_flags_ = 0;
1225 mem_align_ = 0;
1226 offset_align_ = 0;
1227 buffer_size_ = 0;
1228 num_buffers_ = 0;
1229 iodepth_ = 0;
1230 XR_LOGCI(
1231 VRS_DISKFILECHUNK,
1232 "asyncdiskfile configuration: IO Engine={} DirectIO={} (no internal buffers)",
1233 IoEngineTypeConverter::toString(ioengine_),
1234 use_directio_);
1235 return SUCCESS;
1236 }
1237
1238 if (use_directio_) {
1239 supported_flags_ |= O_DIRECT;
1240 }
1241
1242 mem_align_ = 4 * 1024;
1243 offset_align_ = 4 * 1024;
1244
1245#ifdef STATX_DIOALIGN
1246 // Current kernel versions deployed around Meta don't have statx. Rely on the defaults/users
1247 // to set this up correctly for now.
1248 {
1249 struct statx statxbuf;
1250 int statErr = ::statx(AT_FDCWD, chunkFilePath.c_str(), 0, STATX_DIOALIGN, &statxbuf);
1251 if (statErr != 0) {
1252 XR_LOGCE(VRS_DISKFILECHUNK, "statx failed: %s", strerror(errno));
1253 return errno;
1254 } else {
1255 mem_align_ = statxbuf.stx_dio_mem_align;
1256 offset_align_ = statxbuf.stx_dio_mem_align;
1257
1258 XR_LOGCD(
1259 VRS_DISKFILECHUNK,
1260 "statx reports blocksize:{:#x} mem_align:{:#x} offset_align:{:#x}",
1261 statxbuf.stx_blksize,
1262 mem_align_,
1263 offset_align_);
1264 }
1265
1266 if (0 == mem_align_ || 0 == offset_align_) {
1267 XR_LOGCE(VRS_DISKFILECHUNK, "failed to get alignment info");
1268 return DISKFILE_NOT_OPEN;
1269 }
1270#endif
1271
1272 // Allow overrides, but don't bother checking that they are powers of two or anything, on
1273 // the assumption that the underlying write() calls will fail if they're bad values.
1274
1275 uint64_t temp_u64 = 0;
1276 mem_align_ = helpers::getByteSize(options, "mem_align", temp_u64) ? temp_u64 : mem_align_;
1277 mem_align_ = std::clamp<size_t>(mem_align_, 1, 16 * 1024);
1278 offset_align_ =
1279 helpers::getByteSize(options, "offset_align", temp_u64) ? temp_u64 : offset_align_;
1280 offset_align_ = std::clamp<size_t>(offset_align_, 1, 16 * 1024);
1281
1282 // The defaults below might not be optimal for your rig.
1283 // They can still be overwritten with the parameter names below from the input URI.
1284 // fio testing showed each worker using 32MB buffers for non-pre-allocated disk was pretty
1285 // good. Avoids using more than 128 outstanding IO requests at a time, beyond which IO calls
1286 // were blocking.
1287
1288 buffer_size_ =
1289 helpers::getByteSize(options, "buffer_size", temp_u64) ? temp_u64 : 32 * 1024 * 1024;
1290 buffer_size_ = std::clamp<size_t>(buffer_size_, 512, 512 * 1024 * 1024);
1291 num_buffers_ = helpers::getUInt64(options, "buffer_count", temp_u64) ? temp_u64 : 4;
1292 num_buffers_ = std::clamp<size_t>(num_buffers_, 1, 512);
1293
1294 if (ioengine_ == IoEngine::PSync && num_buffers_ > 1) {
1295 XR_LOGCW(
1296 VRS_DISKFILECHUNK,
1297 "The psync ioengine can only make use of a single buffer, not {}.",
1298 num_buffers_);
1299 num_buffers_ = 1;
1300 }
1301
1302 // fio testing showed that we really only need to keep a couple of these at a time
1303 iodepth_ = helpers::getUInt64(options, "iodepth", temp_u64) ? temp_u64 : num_buffers_;
1304 iodepth_ = std::clamp<size_t>(iodepth_, 1, 512);
1305
1306 if ((buffer_size_ % offset_align_ != 0) || (buffer_size_ % mem_align_ != 0)) {
1307 XR_LOGCE(
1308 VRS_DISKFILECHUNK,
1309 "buffer_size={} doesn't conform to offset_align={} or mem_align={}",
1310 helpers::humanReadableFileSize(buffer_size_),
1311 helpers::humanReadableFileSize(offset_align_),
1312 helpers::humanReadableFileSize(mem_align_));
1313 return DISKFILE_INVALID_STATE;
1314 }
1315 XR_LOGCI(
1316 VRS_DISKFILECHUNK,
1317 "asyncdiskfile configuration: IOEngine={} DirectIO={} iodepth={} buffer_count={} "
1318 "buffer_size={} offset_align={} mem_align={}",
1319 IoEngineTypeConverter::toString(ioengine_),
1320 use_directio_,
1321 iodepth_,
1322 num_buffers_,
1323 helpers::humanReadableFileSize(buffer_size_),
1324 helpers::humanReadableFileSize(offset_align_),
1325 helpers::humanReadableFileSize(mem_align_));
1326 return SUCCESS;
1327 }
1328
1329 AsyncHandle file_{};
1330 std::string path_; // path of this chunk
1331 int64_t offset_{}; // offset of this chunk in the file
1332 int64_t size_{}; // size of the chunk
1333
1334 // Keeps track of the current read/write position in the file of the current buffer.
1335 int64_t file_position_ = 0;
1336
1337 const char* file_mode_ = nullptr;
1338 // Keeps track of the flags currently in force for the opened fd_. Typically a subset of the
1339 // supported_flags_
1340 int current_flags_ = 0;
1341 // The flags supported by the underlying path_ file
1342 int supported_flags_ = 0;
1343
1344 // Protects the following members from the writing thread as well as the asyncio callback
1345 // thread(s). Note that this lock is not really required on Windows, as the callbacks are
1346 // delivered on the dispatching thread when it's in an alertable state.
1347 std::mutex buffers_mutex_;
1348 // Used to notify a waiting writing thread that a buffer was freed.
1349 std::condition_variable buffer_freed_cv_;
1350 // The list of free buffers
1351 std::vector<AsyncBuffer*> buffers_free_;
1352 // The list of buffers to be written. Drained by pump_buffers()
1353 std::deque<QueuedWrite> buffers_queued_;
1354 // A count of the number of buffers waiting on async completions
1355 //
1356 // This could be a std::atomic<size_t>, but the current implementation has to take the lock
1357 // anyway to manage the list of buffers_free_, so don't bother.
1358 size_t buffers_writing_ = 0;
1359 // A list of all the buffers to keep them alive when they are being written (no longer in any
1360 // other queue)
1361 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
1362 // The current buffer (if any) being filled by calls to `write()`. It will either be queued
1363 // for async write by `write()`, or written out by `flushWriteBuffer()`
1364 AsyncBuffer* current_buffer_ = nullptr;
1365 // If != SUCCESS, represents errors that were signaled by async writes completing. Typically
1366 // returned to the caller as the result of another, later operation (e.g. another write after
1367 // the failure, or a call to flushWriteBuffer(), etc)
1368 std::atomic<int> async_error_ = SUCCESS;
1369
1370 // Operational parameters initialized from the FileSpec extra params/options at create/open
1371 // time. These can be tuned by the user via uri parameeters.
1372 IoEngine ioengine_ = IoEngine::AIO;
1373 bool use_directio_ = true;
1374 // How many asyncio buffers to allocate and fill
1375 size_t num_buffers_ = 0;
1376 // The size of each individual buffer
1377 size_t buffer_size_ = 0;
1378 // The maximum number of simultaneous async_write operations allowed
1379 size_t iodepth_ = 4;
1380 // The requested alignment of buffer lengths and file offsets
1381 size_t offset_align_ = 0;
1382 // The requested length of memory alignment
1383 size_t mem_align_ = 0;
1384 };
1385
1386} // namespace vrs
Definition AsyncDiskFileChunk.hpp:399
ssize_t add(const void *buffer, size_t size)
Definition AsyncDiskFileChunk.hpp:467
Definition AsyncDiskFileChunk.hpp:494
Definition AsyncDiskFileChunk.hpp:621
Definition AsyncDiskFileChunk.hpp:49
Definition AsyncDiskFileChunk.hpp:256