625 : path_{std::move(path)}, offset_{offset}, size_{size} {}
627 file_ = std::move(other.file_);
628 path_ = std::move(other.path_);
629 offset_ = other.offset_;
633 file_position_ = other.file_position_;
635 file_mode_ = other.file_mode_;
636 current_flags_ = other.current_flags_;
637 supported_flags_ = other.supported_flags_;
643 buffers_free_ = std::move(other.buffers_free_);
644 buffers_queued_ = std::move(other.buffers_queued_);
645 buffers_writing_ = other.buffers_writing_;
646 other.buffers_writing_ = 0;
647 buffers_ = std::move(other.buffers_);
648 current_buffer_ = other.current_buffer_;
649 other.current_buffer_ =
nullptr;
650 async_error_ = other.async_error_.load();
652 ioengine_ = other.ioengine_;
653 use_directio_ = other.use_directio_;
654 num_buffers_ = other.num_buffers_;
655 buffer_size_ = other.buffer_size_;
656 iodepth_ = other.iodepth_;
657 offset_align_ = other.offset_align_;
658 mem_align_ = other.mem_align_;
669 }
catch (std::exception& e) {
670 XR_LOGCE(VRS_DISKFILECHUNK,
"Exception on close() during destruction: {}", e.what());
674 int create(
const std::string& newpath,
const std::map<std::string, std::string>& options) {
682 async_error_ = SUCCESS;
685 IF_ERROR_RETURN(init_parameters(options));
686 int error = ensureOpenDirect();
687 if (error != 0 && 0 != (O_DIRECT & supported_flags_)) {
688 error = ensureOpenNonDirect();
692 "O_DIRECT appears not to be supported for {}, falling back to non-direct IO",
694 supported_flags_ &= ~O_DIRECT;
699#if IS_ANDROID_PLATFORM()
700 const size_t kBufferingSize = 128 * 1024;
701 error = setvbuf(file_.fd_,
nullptr, _IOFBF, kBufferingSize);
708 int open(
bool readOnly,
const std::map<std::string, std::string>& options) {
712 async_error_ = SUCCESS;
713 file_mode_ = readOnly ?
"rb" :
"rb+";
715 IF_ERROR_RETURN(init_parameters(options));
716 return ensureOpenNonDirect();
724 int error = flushWriteBuffer();
728 free_write_buffers();
730 int error2 = file_.close();
731 return error != 0 ? error : error2;
742 IF_ERROR_RETURN(flushWriteBuffer());
745 async_error_ = SUCCESS;
750 [[nodiscard]]
bool eof()
const {
752 if (tell(pos) != 0) {
756 return pos == getSize();
760 return file_.isOpened();
763 int write(
const void* buffer,
size_t count,
size_t& outWrittenSize) {
766 }
else if (!isOpened()) {
767 XR_LOGCE(VRS_DISKFILECHUNK,
"DiskFile not opened");
768 return DISKFILE_NOT_OPEN;
770 const auto* bbuffer =
static_cast<const char*
>(buffer);
774 return DISKFILE_NOT_OPEN;
783 if (ioengine_ == IoEngine::Sync) {
787 use_directio_ && (current_buffer_ ==
nullptr || current_buffer_->empty()) &&
788 (file_position_ % offset_align_) != 0) {
790 towrite = std::min<size_t>(count, offset_align_ - (file_position_ % offset_align_));
799 IF_ERROR_RETURN(flushWriteBuffer());
801 IF_ERROR_RETURN(ensureOpenNonDirect());
803 size_t thiswritten = 0;
804 IF_ERROR_RETURN(file_.pwrite(bbuffer, towrite, file_position_, thiswritten));
805 bbuffer += thiswritten;
806 count -= thiswritten;
807 outWrittenSize += thiswritten;
808 file_position_ += thiswritten;
811 if (count != 0 && current_buffer_ ==
nullptr) {
812 current_buffer_ = get_free_buffer();
813 if (current_buffer_ ==
nullptr) {
820 ssize_t additionalBuffered = current_buffer_->
add(bbuffer, count);
821 if (additionalBuffered <= 0) {
822 return DISKFILE_PARTIAL_WRITE_ERROR;
824 bbuffer += additionalBuffered;
825 count -= additionalBuffered;
826 outWrittenSize += additionalBuffered;
828 if (current_buffer_->full()) {
829 IF_ERROR_RETURN(ensureOpenDirect());
831 towrite = current_buffer_->size();
833 case IoEngine::AIO: {
836 std::unique_lock lock{buffers_mutex_};
837 buffers_queued_.emplace_back(
841 [
this, buffer = current_buffer_](ssize_t io_return,
int io_errno) {
842 this->complete_write(buffer, io_return, io_errno);
844 current_buffer_ =
nullptr;
845 file_position_ += towrite;
846 pump_buffers_locked();
849 current_buffer_ = get_free_buffer_locked(lock);
850 if (!current_buffer_) {
856 case IoEngine::PSync: {
857 size_t thiswritten = 0;
858 int err = file_.pwrite(
859 current_buffer_->data(), current_buffer_->size(), file_position_, thiswritten);
862 current_buffer_->clear();
863 file_position_ += thiswritten;
870 XR_LOGCE(VRS_DISKFILECHUNK,
"Unhandled ioengine");
871 return VRSERROR_INTERNAL_ERROR;
878 void setSize(int64_t newSize) {
883 return flushWriteBuffer();
886 int truncate(int64_t newSize) {
887 IF_ERROR_RETURN(flushWriteBuffer());
889 IF_ERROR_RETURN(file_.truncate(newSize));
894 int read(
void* buffer,
size_t count,
size_t& outReadSize) {
897 return DISKFILE_NOT_OPEN;
901 IF_ERROR_RETURN(flushWriteBuffer());
902 IF_ERROR_RETURN(ensureOpenNonDirect());
904 int error = file_.read(buffer, count, file_position_, outReadSize);
905 file_position_ += outReadSize;
909 [[nodiscard]] int64_t getSize()
const {
913 [[nodiscard]]
bool contains(int64_t fileOffset)
const {
914 return fileOffset >= offset_ && fileOffset < offset_ + size_;
917 int tell(int64_t& outFilepos)
const {
918 outFilepos = file_position_ + (current_buffer_ ? current_buffer_->size() : 0);
923 int seek(int64_t pos,
int origin) {
926 IF_ERROR_RETURN(flushWriteBuffer());
930 IF_ERROR_RETURN(file_.seek(file_position_, SEEK_SET, file_position_));
931 IF_ERROR_RETURN(file_.seek(pos, origin, file_position_));
936 [[nodiscard]]
const std::string& getPath()
const {
940 void setOffset(int64_t newOffset) {
944 [[nodiscard]] int64_t getOffset()
const {
949 enum class IoEngine {
961 AsyncBuffer::complete_write_callback callback_;
966 AsyncBuffer::complete_write_callback callback)
967 : buffer_(buffer), file_(file), offset_(offset), callback_(std::move(callback)) {}
970 int flushWriteBuffer() {
973 if (!buffers_.empty()) {
974 std::unique_lock lock{buffers_mutex_};
975 size_t expected_free = buffers_.size() - (current_buffer_ ? 1 : 0);
976 while (buffers_free_.size() != expected_free) {
978 buffer_freed_cv_.wait(
979 lock, [
this, expected_free] {
return buffers_free_.size() == expected_free; });
982 int async_error = async_error_.exchange(0);
983 if (async_error != 0) {
984 XR_LOGCE(VRS_DISKFILECHUNK,
"Returning async error on flush {}", async_error);
989 if (current_buffer_ && !current_buffer_->empty()) {
990 IF_ERROR_RETURN(ensureOpenNonDirect());
992 while (current_buffer_) {
993 size_t towrite = current_buffer_->size();
997 size_t thiswritten = 0;
998 int error = file_.pwrite(current_buffer_->data(), towrite, file_position_, thiswritten);
999 free_buffer(current_buffer_);
1003 file_position_ += thiswritten;
1007 if (current_buffer_) {
1008 free_buffer(current_buffer_);
1014 int ensureOpenNonDirect() {
1015 return ensureOpen_(supported_flags_ & ~O_DIRECT);
1018 int ensureOpenDirect() {
1019 return ensureOpen_(supported_flags_);
1022 int ensureOpen_(
int requested_flags) {
1023 bool no_truncate =
false;
1024 if (file_.isOpened()) {
1025 if (requested_flags == current_flags_) {
1032 const char* mode = file_mode_;
1033 if (mode ==
nullptr) {
1034 return DISKFILE_NOT_OPEN;
1037 bool readOnly = (mode[0] ==
'w') && (strchr(mode,
'+') ==
nullptr);
1044 if (mode[0] ==
'w' && no_truncate) {
1049 IF_ERROR_RETURN(alloc_write_buffers());
1052 int error = file_.open(path_, mode, requested_flags);
1057 current_flags_ = requested_flags;
1059#if IS_ANDROID_PLATFORM()
1060 const size_t kBufferingSize = 128 * 1024;
1061 IF_ERROR_LOG(setvbuf(newFd,
nullptr, _IOFBF, kBufferingSize));
1067 void complete_write(
AsyncBuffer* buffer, ssize_t io_return,
int io_errno) {
1071 if (io_return == buffer->size()) {
1072 if (io_errno != SUCCESS) {
1075 "io_return was the size of the buffer, but io_errno is {}",
1080 if (io_return < 0) {
1081 if (io_errno == SUCCESS) {
1082 XR_LOGCD(VRS_DISKFILECHUNK,
"io_errno is 0 but io_return < 0");
1083 io_errno = DISKFILE_INVALID_STATE;
1087 io_errno = DISKFILE_PARTIAL_WRITE_ERROR;
1091 if (io_errno != SUCCESS) {
1092 int current_error = async_error_.load();
1093 while (current_error == SUCCESS &&
1094 !async_error_.compare_exchange_weak(current_error, io_errno)) {
1099 std::unique_lock lock{buffers_mutex_};
1100 free_buffer_locked(lock, buffer);
1101 buffers_writing_ -= 1;
1102 pump_buffers_locked();
1106 AsyncBuffer* get_free_buffer_locked(std::unique_lock<std::mutex>& lock) {
1107 if (buffers_free_.empty()) {
1108 buffer_freed_cv_.wait(lock, [
this] {
return !buffers_free_.empty(); });
1110 assert(!buffers_free_.empty());
1111 auto* buffer = buffers_free_.back();
1112 buffers_free_.pop_back();
1114 assert(buffer->empty());
1119 std::unique_lock lock{buffers_mutex_};
1120 return get_free_buffer_locked(lock);
1124 std::unique_lock lock{buffers_mutex_};
1125 free_buffer_locked(lock, buffer);
1128 void free_buffer_locked(std::unique_lock<std::mutex>& ,
AsyncBuffer*& buffer) {
1130 buffers_free_.push_back(buffer);
1132 buffer_freed_cv_.notify_one();
1135 void pump_buffers() {
1136 std::unique_lock lock{buffers_mutex_};
1137 pump_buffers_locked();
1140 void pump_buffers_locked() {
1144 while (buffers_writing_ < iodepth_ && !buffers_queued_.empty()) {
1145 int result = SUCCESS;
1149 auto& item = buffers_queued_.front();
1150 result = item.buffer_->start_write(item.file_, item.offset_, std::move(item.callback_));
1152 buffers_queued_.pop_front();
1155 if (result == SUCCESS) {
1156 buffers_writing_ += 1;
1161 int alloc_write_buffers() {
1162 assert(buffers_writing_ == 0);
1163 buffers_free_.reserve(num_buffers_);
1164 buffers_.reserve(num_buffers_);
1165 while (buffers_.size() < num_buffers_) {
1166 auto buffer = std::make_unique<AsyncBuffer>(buffer_size_, mem_align_, offset_align_);
1170 buffers_free_.push_back(buffer.get());
1171 buffers_.push_back(std::move(buffer));
1176 int free_write_buffers() {
1177 assert(buffers_free_.size() == buffers_.size());
1178 assert(buffers_writing_ == 0);
1179 assert(buffers_queued_.empty());
1180 current_buffer_ =
nullptr;
1181 buffers_free_.clear();
1186 int init_parameters(
const std::map<std::string, std::string>& options) {
1187 static const char* sIoEngineTypes[] = {
"sync",
"aio",
"psync"};
1188 struct IoEngineTypeConverter :
public EnumStringConverter<
1191 COUNT_OF(sIoEngineTypes),
1198 if (!helpers::getBool(options,
"direct", use_directio_) &&
1199 !helpers::getBool(options,
"directio", use_directio_)) {
1200 use_directio_ =
true;
1203#ifdef VRS_BUILDTYPE_TSAN
1208 ioengine_ = IoEngine::Sync;
1211 ioengine_ = IoEngine::AIO;
1212 auto it = options.find(
"ioengine");
1213 if (it != options.end()) {
1217 ioengine_ = IoEngineTypeConverter::toEnum(it->second);
1222 bool needBuffers = use_directio_ || (ioengine_ != IoEngine::Sync);
1224 supported_flags_ = 0;
1232 "asyncdiskfile configuration: IO Engine={} DirectIO={} (no internal buffers)",
1233 IoEngineTypeConverter::toString(ioengine_),
1238 if (use_directio_) {
1239 supported_flags_ |= O_DIRECT;
1242 mem_align_ = 4 * 1024;
1243 offset_align_ = 4 * 1024;
1245#ifdef STATX_DIOALIGN
1249 struct statx statxbuf;
1250 int statErr = ::statx(AT_FDCWD, chunkFilePath.c_str(), 0, STATX_DIOALIGN, &statxbuf);
1252 XR_LOGCE(VRS_DISKFILECHUNK,
"statx failed: %s", strerror(errno));
1255 mem_align_ = statxbuf.stx_dio_mem_align;
1256 offset_align_ = statxbuf.stx_dio_mem_align;
1260 "statx reports blocksize:{:#x} mem_align:{:#x} offset_align:{:#x}",
1261 statxbuf.stx_blksize,
1266 if (0 == mem_align_ || 0 == offset_align_) {
1267 XR_LOGCE(VRS_DISKFILECHUNK,
"failed to get alignment info");
1268 return DISKFILE_NOT_OPEN;
1275 uint64_t temp_u64 = 0;
1276 mem_align_ = helpers::getByteSize(options,
"mem_align", temp_u64) ? temp_u64 : mem_align_;
1277 mem_align_ = std::clamp<size_t>(mem_align_, 1, 16 * 1024);
1279 helpers::getByteSize(options,
"offset_align", temp_u64) ? temp_u64 : offset_align_;
1280 offset_align_ = std::clamp<size_t>(offset_align_, 1, 16 * 1024);
1289 helpers::getByteSize(options,
"buffer_size", temp_u64) ? temp_u64 : 32 * 1024 * 1024;
1290 buffer_size_ = std::clamp<size_t>(buffer_size_, 512, 512 * 1024 * 1024);
1291 num_buffers_ = helpers::getUInt64(options,
"buffer_count", temp_u64) ? temp_u64 : 4;
1292 num_buffers_ = std::clamp<size_t>(num_buffers_, 1, 512);
1294 if (ioengine_ == IoEngine::PSync && num_buffers_ > 1) {
1297 "The psync ioengine can only make use of a single buffer, not {}.",
1303 iodepth_ = helpers::getUInt64(options,
"iodepth", temp_u64) ? temp_u64 : num_buffers_;
1304 iodepth_ = std::clamp<size_t>(iodepth_, 1, 512);
1306 if ((buffer_size_ % offset_align_ != 0) || (buffer_size_ % mem_align_ != 0)) {
1309 "buffer_size={} doesn't conform to offset_align={} or mem_align={}",
1310 helpers::humanReadableFileSize(buffer_size_),
1311 helpers::humanReadableFileSize(offset_align_),
1312 helpers::humanReadableFileSize(mem_align_));
1313 return DISKFILE_INVALID_STATE;
1317 "asyncdiskfile configuration: IOEngine={} DirectIO={} iodepth={} buffer_count={} "
1318 "buffer_size={} offset_align={} mem_align={}",
1319 IoEngineTypeConverter::toString(ioengine_),
1323 helpers::humanReadableFileSize(buffer_size_),
1324 helpers::humanReadableFileSize(offset_align_),
1325 helpers::humanReadableFileSize(mem_align_));
1335 int64_t file_position_ = 0;
1337 const char* file_mode_ =
nullptr;
1340 int current_flags_ = 0;
1342 int supported_flags_ = 0;
1347 std::mutex buffers_mutex_;
1349 std::condition_variable buffer_freed_cv_;
1351 std::vector<AsyncBuffer*> buffers_free_;
1353 std::deque<QueuedWrite> buffers_queued_;
1358 size_t buffers_writing_ = 0;
1361 std::vector<std::unique_ptr<AsyncBuffer>> buffers_;
1368 std::atomic<int> async_error_ = SUCCESS;
1372 IoEngine ioengine_ = IoEngine::AIO;
1373 bool use_directio_ =
true;
1375 size_t num_buffers_ = 0;
1377 size_t buffer_size_ = 0;
1379 size_t iodepth_ = 4;
1381 size_t offset_align_ = 0;
1383 size_t mem_align_ = 0;