19#include <vrs/DiskFile.h>
21#if VRS_ASYNC_DISKFILE_SUPPORTED()
29#define POSIX_AIO_SUPPORTED() (IS_MAC_PLATFORM() || IS_LINUX_PLATFORM())
31#if IS_WINDOWS_PLATFORM()
34#elif POSIX_AIO_SUPPORTED()
44#include <condition_variable>
51#include <vrs/ErrorCode.h>
52#include <vrs/os/Platform.h>
54#define VRS_DISKFILECHUNK "AsyncDiskFileChunk"
58#if IS_WINDOWS_PLATFORM()
60using ssize_t = int64_t;
61#define O_DIRECT 0x80000000U
63struct AsyncWindowsHandle {
64 AsyncWindowsHandle() : h_(INVALID_HANDLE_VALUE) {}
65 AsyncWindowsHandle(HANDLE h) : h_(h) {}
66 AsyncWindowsHandle(AsyncWindowsHandle&& rhs) : h_(rhs.h_) {
67 rhs.h_ = INVALID_HANDLE_VALUE;
69 AsyncWindowsHandle(AsyncWindowsHandle& rhs) : h_(rhs.h_) {}
70 AsyncWindowsHandle& operator=(AsyncWindowsHandle&& rhs) {
72 rhs.h_ = INVALID_HANDLE_VALUE;
76 bool isOpened()
const;
77 int open(
const std::string& path,
const char* modes,
int flags);
79 int pwrite(
const void* buf,
size_t count, int64_t offset,
size_t& outWriteSize);
80 int read(
void* buf,
size_t count, int64_t offset,
size_t& outReadSize);
81 int truncate(int64_t newSize);
82 int seek(int64_t pos,
int origin, int64_t& outFilepos);
85 int _readwrite(
bool readNotWrite,
void* buf,
size_t count, int64_t offset,
size_t& outSize);
88 HANDLE h_ = INVALID_HANDLE_VALUE;
91using AsyncHandle = AsyncWindowsHandle;
93struct AsyncFileDescriptor {
94 static constexpr int INVALID_FILE_DESCRIPTOR = -1;
96 AsyncFileDescriptor() =
default;
97 explicit AsyncFileDescriptor(
int fd) : fd_(fd) {}
98 AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
99 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
101 AsyncFileDescriptor(
const AsyncFileDescriptor& rhs)
noexcept =
delete;
102 AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs)
noexcept {
104 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
107 AsyncFileDescriptor& operator=(
const AsyncFileDescriptor& rhs) =
delete;
109 bool operator==(
int fd)
const {
113 int open(
const std::string& path,
const char* modes,
int flags);
114 [[nodiscard]]
bool isOpened()
const;
115 int read(
void* ptr,
size_t bufferSize,
size_t offset,
size_t& outReadSize);
116 int truncate(int64_t newSize);
117 int seek(int64_t pos,
int origin, int64_t& outFilepos);
118 int pwrite(
const void* buf,
size_t count, off_t offset,
size_t& written);
121 int fd_ = INVALID_FILE_DESCRIPTOR;
123using AsyncHandle = AsyncFileDescriptor;
128 void* aligned_buffer_ =
nullptr;
129 size_t capacity_ = 0;
133 AlignedBuffer(
size_t size,
size_t memalign,
size_t lenalign);
134 virtual ~AlignedBuffer();
136 [[nodiscard]]
inline size_t size()
const {
139 [[nodiscard]]
inline size_t capacity()
const {
142 [[nodiscard]]
inline bool empty()
const {
145 [[nodiscard]]
inline bool full()
const {
146 return size() == capacity();
151 [[nodiscard]]
inline void* data()
const {
152 return aligned_buffer_;
154 [[nodiscard]]
inline char* bdata()
const {
155 return reinterpret_cast<char*
>(aligned_buffer_);
157 [[nodiscard]] ssize_t add(
const void* buffer,
size_t size);
161#if IS_WINDOWS_PLATFORM()
162struct AsyncOVERLAPPED {
169class AsyncBuffer :
public AlignedBuffer {
171 using complete_write_callback = std::function<void(ssize_t io_return,
int io_errno)>;
173 AsyncBuffer(
size_t size,
size_t memalign,
size_t lenalign)
174 : AlignedBuffer(size, memalign, lenalign) {}
175 ~AsyncBuffer()
override =
default;
177 void complete_write(ssize_t io_return,
int io_errno);
179 start_write(
const AsyncHandle& file, int64_t offset, complete_write_callback on_complete);
182#if IS_WINDOWS_PLATFORM()
184 static void CompletedWriteRoutine(DWORD dwErr, DWORD cbBytesWritten, LPOVERLAPPED lpOverlapped);
185#elif POSIX_AIO_SUPPORTED()
186 struct aiocb aiocb_{};
187 static void SigEvNotifyFunction(
union sigval val);
189 complete_write_callback on_complete_ =
nullptr;
192class AsyncDiskFileChunk {
194 AsyncDiskFileChunk() =
default;
195 AsyncDiskFileChunk(std::string path, int64_t offset, int64_t size)
196 : path_{std::move(path)}, offset_{offset}, size_{size} {}
197 AsyncDiskFileChunk(AsyncDiskFileChunk&& other)
noexcept;
200 AsyncDiskFileChunk(
const AsyncDiskFileChunk& other)
noexcept =
delete;
201 AsyncDiskFileChunk& operator=(
const AsyncDiskFileChunk& other)
noexcept =
delete;
202 AsyncDiskFileChunk& operator=(AsyncDiskFileChunk&& rhs)
noexcept =
delete;
204 ~AsyncDiskFileChunk();
206 int create(
const std::string& newpath,
const FileSpec::Extras& options);
207 int open(
bool readOnly,
const FileSpec::Extras& options);
210 [[nodiscard]]
bool eof()
const;
212 int write(
const void* buffer,
size_t count,
size_t& outWrittenSize);
213 void setSize(int64_t newSize);
215 int truncate(int64_t newSize);
216 int read(
void* buffer,
size_t count,
size_t& outReadSize);
217 [[nodiscard]] int64_t getSize()
const;
218 [[nodiscard]]
bool contains(int64_t fileOffset)
const;
219 int tell(int64_t& outFilepos)
const;
220 int seek(int64_t pos,
int origin);
221 [[nodiscard]]
const std::string& getPath()
const;
222 void setOffset(int64_t newOffset);
223 [[nodiscard]] int64_t getOffset()
const;
225 enum class IoEngine {
233 AsyncBuffer* buffer_;
236 const AsyncHandle& file_;
238 AsyncBuffer::complete_write_callback callback_;
243 AsyncBuffer::complete_write_callback callback)
244 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
247 int flushWriteBuffer();
248 int ensureOpenNonDirect();
249 int ensureOpenDirect();
250 int ensureOpen_(
int requested_flags);
251 void complete_write(AsyncBuffer* buffer, ssize_t io_return,
int io_errno);
252 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock);
253 AsyncBuffer* get_free_buffer();
254 void free_buffer(AsyncBuffer*& buffer);
255 void free_buffer_locked(std::unique_lock<std::mutex>& lock, AsyncBuffer*& buffer);
257 void pump_buffers_locked();
258 int alloc_write_buffers();
259 int free_write_buffers();
260 int init_parameters(
const FileSpec::Extras& options);
268 int64_t file_position_ = 0;
270 const char* file_mode_ =
nullptr;
273 int current_flags_ = 0;
275 int supported_flags_ = 0;
280 std::mutex buffers_mutex_;
282 std::condition_variable buffer_freed_cv_;
284 std::vector<AsyncBuffer*> buffers_free_;
286 std::deque<QueuedWrite> buffers_queued_;
291 size_t buffers_writing_ = 0;
294 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
297 AsyncBuffer* current_buffer_ =
nullptr;
301 std::atomic<int> async_error_ = SUCCESS;
305 IoEngine ioengine_ = IoEngine::AIO;
306 bool use_directio_ =
true;
308 size_t num_buffers_ = 0;
310 size_t buffer_size_ = 0;
314 size_t offset_align_ = 0;
316 size_t mem_align_ = 0;
int read(const string &cacheFile, set< StreamId > &outStreamIds, map< string, string > &outFileTags, map< StreamId, StreamTags > &outStreamTags, vector< IndexRecord::RecordInfo > &outRecordIndex, bool &outFileHasIndex)
Definition FileDetailsCache.cpp:266
int write(const string &cacheFile, const set< StreamId > &streamIds, const map< string, string > &fileTags, const map< StreamId, StreamTags > &streamTags, const vector< IndexRecord::RecordInfo > &recordIndex, bool fileHasIndex)
Definition FileDetailsCache.cpp:225
Definition Compressor.cpp:113