21#if VRS_ASYNC_DISKFILE_SUPPORTED()
36#include <condition_variable>
43#include <vrs/ErrorCode.h>
44#include <vrs/os/Platform.h>
46#define VRS_DISKFILECHUNK "AsyncDiskFileChunk"
52using ssize_t = int64_t;
53#define O_DIRECT 0x80000000U
55struct AsyncWindowsHandle {
56 AsyncWindowsHandle() : h_(INVALID_HANDLE_VALUE) {}
57 AsyncWindowsHandle(HANDLE h) : h_(h) {}
58 AsyncWindowsHandle(AsyncWindowsHandle&& rhs) : h_(rhs.h_) {
59 rhs.h_ = INVALID_HANDLE_VALUE;
61 AsyncWindowsHandle(AsyncWindowsHandle& rhs) : h_(rhs.h_) {}
62 AsyncWindowsHandle& operator=(AsyncWindowsHandle&& rhs) {
64 rhs.h_ = INVALID_HANDLE_VALUE;
68 bool isOpened()
const;
69 int open(
const std::string& path,
const char* modes,
int flags);
71 int pwrite(
const void* buf,
size_t count, int64_t offset,
size_t& outWriteSize);
72 int read(
void* buf,
size_t count, int64_t offset,
size_t& outReadSize);
73 int truncate(int64_t newSize);
74 int seek(int64_t pos,
int origin, int64_t& outFilepos);
77 int _readwrite(
bool readNotWrite,
void* buf,
size_t count, int64_t offset,
size_t& outSize);
80 HANDLE h_ = INVALID_HANDLE_VALUE;
83using AsyncHandle = AsyncWindowsHandle;
85struct AsyncFileDescriptor {
86 static constexpr int INVALID_FILE_DESCRIPTOR = -1;
88 AsyncFileDescriptor() =
default;
89 explicit AsyncFileDescriptor(
int fd) : fd_(fd) {}
90 AsyncFileDescriptor(AsyncFileDescriptor&& rhs) noexcept : fd_(rhs.fd_) {
91 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
93 AsyncFileDescriptor(
const AsyncFileDescriptor& rhs)
noexcept =
delete;
94 AsyncFileDescriptor& operator=(AsyncFileDescriptor&& rhs)
noexcept {
96 rhs.fd_ = INVALID_FILE_DESCRIPTOR;
99 AsyncFileDescriptor& operator=(
const AsyncFileDescriptor& rhs) =
delete;
101 bool operator==(
int fd)
const {
105 int open(
const std::string& path,
const char* modes,
int flags);
106 [[nodiscard]]
bool isOpened()
const;
107 int read(
void* ptr,
size_t bufferSize,
size_t offset,
size_t& outReadSize);
108 int truncate(int64_t newSize);
109 int seek(int64_t pos,
int origin, int64_t& outFilepos);
110 int pwrite(
const void* buf,
size_t count, off_t offset,
size_t& written);
113 int fd_ = INVALID_FILE_DESCRIPTOR;
115using AsyncHandle = AsyncFileDescriptor;
120 void* aligned_buffer_ =
nullptr;
121 size_t capacity_ = 0;
125 AlignedBuffer(
size_t size,
size_t memalign,
size_t lenalign);
126 virtual ~AlignedBuffer();
128 [[nodiscard]]
inline size_t size()
const {
131 [[nodiscard]]
inline size_t capacity()
const {
134 [[nodiscard]]
inline bool empty()
const {
137 [[nodiscard]]
inline bool full()
const {
138 return size() == capacity();
143 [[nodiscard]]
inline void* data()
const {
144 return aligned_buffer_;
146 [[nodiscard]]
inline char* bdata()
const {
147 return reinterpret_cast<char*
>(aligned_buffer_);
149 [[nodiscard]] ssize_t add(
const void* buffer,
size_t size);
154struct AsyncOVERLAPPED {
161class AsyncBuffer :
public AlignedBuffer {
163 using complete_write_callback = std::function<void(ssize_t io_return,
int io_errno)>;
165 AsyncBuffer(
size_t size,
size_t memalign,
size_t lenalign)
166 : AlignedBuffer(size, memalign, lenalign) {}
167 ~AsyncBuffer()
override =
default;
169 void complete_write(ssize_t io_return,
int io_errno);
171 start_write(
const AsyncHandle& file, int64_t offset, complete_write_callback on_complete);
176 static void CompletedWriteRoutine(DWORD dwErr, DWORD cbBytesWritten, LPOVERLAPPED lpOverlapped);
178 struct aiocb aiocb_ {};
179 static void SigEvNotifyFunction(
union sigval val);
181 complete_write_callback on_complete_ =
nullptr;
184class AsyncDiskFileChunk {
186 AsyncDiskFileChunk() =
default;
187 AsyncDiskFileChunk(std::string path, int64_t offset, int64_t size)
188 : path_{std::move(path)}, offset_{offset}, size_{size} {}
189 AsyncDiskFileChunk(AsyncDiskFileChunk&& other)
noexcept;
192 AsyncDiskFileChunk(
const AsyncDiskFileChunk& other)
noexcept =
delete;
193 AsyncDiskFileChunk& operator=(
const AsyncDiskFileChunk& other)
noexcept =
delete;
194 AsyncDiskFileChunk& operator=(AsyncDiskFileChunk&& rhs)
noexcept =
delete;
196 ~AsyncDiskFileChunk();
198 int create(
const std::string& newpath,
const std::map<std::string, std::string>& options);
199 int open(
bool readOnly,
const std::map<std::string, std::string>& options);
202 [[nodiscard]]
bool eof()
const;
204 int write(
const void* buffer,
size_t count,
size_t& outWrittenSize);
205 void setSize(int64_t newSize);
207 int truncate(int64_t newSize);
208 int read(
void* buffer,
size_t count,
size_t& outReadSize);
209 [[nodiscard]] int64_t getSize()
const;
210 [[nodiscard]]
bool contains(int64_t fileOffset)
const;
211 int tell(int64_t& outFilepos)
const;
212 int seek(int64_t pos,
int origin);
213 [[nodiscard]]
const std::string& getPath()
const;
214 void setOffset(int64_t newOffset);
215 [[nodiscard]] int64_t getOffset()
const;
217 enum class IoEngine {
225 AsyncBuffer* buffer_;
228 const AsyncHandle& file_;
230 AsyncBuffer::complete_write_callback callback_;
235 AsyncBuffer::complete_write_callback callback)
236 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
239 int flushWriteBuffer();
240 int ensureOpenNonDirect();
241 int ensureOpenDirect();
242 int ensureOpen_(
int requested_flags);
243 void complete_write(AsyncBuffer* buffer, ssize_t io_return,
int io_errno);
244 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock);
245 AsyncBuffer* get_free_buffer();
246 void free_buffer(AsyncBuffer*& buffer);
247 void free_buffer_locked(std::unique_lock<std::mutex>& lock, AsyncBuffer*& buffer);
249 void pump_buffers_locked();
250 int alloc_write_buffers();
251 int free_write_buffers();
252 int init_parameters(
const std::map<std::string, std::string>& options);
260 int64_t file_position_ = 0;
262 const char* file_mode_ =
nullptr;
265 int current_flags_ = 0;
267 int supported_flags_ = 0;
272 std::mutex buffers_mutex_;
274 std::condition_variable buffer_freed_cv_;
276 std::vector<AsyncBuffer*> buffers_free_;
278 std::deque<QueuedWrite> buffers_queued_;
283 size_t buffers_writing_ = 0;
286 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
289 AsyncBuffer* current_buffer_ =
nullptr;
293 std::atomic<int> async_error_ = SUCCESS;
297 IoEngine ioengine_ = IoEngine::AIO;
298 bool use_directio_ =
true;
300 size_t num_buffers_ = 0;
302 size_t buffer_size_ = 0;
306 size_t offset_align_ = 0;
308 size_t mem_align_ = 0;
int read(const string &cacheFile, set< StreamId > &outStreamIds, map< string, string > &outFileTags, map< StreamId, StreamTags > &outStreamTags, vector< IndexRecord::RecordInfo > &outRecordIndex, bool &outFileHasIndex)
Definition FileDetailsCache.cpp:268
int write(const string &cacheFile, const set< StreamId > &streamIds, const map< string, string > &fileTags, const map< StreamId, StreamTags > &streamTags, const vector< IndexRecord::RecordInfo > &recordIndex, bool fileHasIndex)
Definition FileDetailsCache.cpp:226
Definition Compressor.cpp:113