blob: efe53dd622ab9b92c14b555c1f0c32458ab73bae [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <crc32c/crc32c.h>
#include <condition_variable>
#include <cstdint>
#include <fstream>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include "common/status.h"
#include "io/cache/file_block.h"
#include "util/slice.h"
#include "util/threadpool.h"
namespace doris {
namespace io {
enum class BufferType : uint32_t { DOWNLOAD, UPLOAD };
using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
struct OperationState {
OperationState(std::function<bool(Status)> sync_after_complete_task,
std::function<bool()> is_cancelled)
: _sync_after_complete_task(std::move(sync_after_complete_task)),
_is_cancelled(std::move(is_cancelled)) {}
/**
* set the val of this operation state which indicates it failed or succeeded
*
* @param S the execution result
*/
void set_status(Status s = Status::OK()) {
// make sure we wouldn't sync twice
if (_value_set) [[unlikely]] {
return;
}
if (nullptr != _sync_after_complete_task) {
_fail_after_sync = _sync_after_complete_task(std::move(s));
}
_value_set = true;
}
/**
* detect whether the execution task is done
*
* @return is the execution task is done
*/
[[nodiscard]] bool is_cancelled() const {
DCHECK(nullptr != _is_cancelled);
// If _fail_after_sync is true then it means the sync task already returns
// that the task failed and if the outside file writer might already be
// destructed
return _fail_after_sync ? true : _is_cancelled();
}
std::function<bool(Status)> _sync_after_complete_task;
std::function<bool()> _is_cancelled;
bool _value_set = false;
bool _fail_after_sync = false;
};
struct FileBuffer {
FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset,
OperationState state);
virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
*/
static Status submit(std::shared_ptr<FileBuffer> buf);
/**
* append data to the inner memory buffer
*
* @param S the content to be appended
*/
virtual Status append_data(const Slice& s) = 0;
virtual void execute_async() = 0;
/**
* set the val of it's operation state
*
* @param S the execution result
*/
void set_status(Status s) { _state.set_status(s); }
/**
* get the start offset of this file buffer
*
* @return start offset of this file buffer
*/
size_t get_file_offset() const { return _offset; }
/**
* get the size of the buffered data
*
* @return the size of the buffered data
*/
size_t get_size() const { return _size; }
size_t get_capacaticy() const { return _capacity; }
Slice get_slice() const;
/**
* detect whether the execution task is done
*
* @return is the execution task is done
*/
bool is_cancelled() const { return _state.is_cancelled(); }
std::string_view get_string_view_data() const;
BufferType _type;
std::function<FileBlocksHolderPtr()> _alloc_holder;
size_t _offset;
size_t _size;
OperationState _state;
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _capacity;
};
struct DownloadFileBuffer final : public FileBuffer {
DownloadFileBuffer(std::function<Status(Slice&)> download,
std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache,
std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
_download(std::move(download)),
_write_to_local_file_cache(std::move(write_to_cache)),
_write_to_use_buffer(std::move(write_to_use_buffer)) {}
~DownloadFileBuffer() override = default;
/**
* do the download work, it would write the content into local memory buffer
*/
void on_download();
void execute_async() override { on_download(); }
Status append_data(const Slice& s) override { return Status::OK(); }
std::function<Status(Slice&)> _download;
std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
std::function<void(Slice, size_t)> _write_to_use_buffer;
};
struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
_upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
Status append_data(const Slice& s) override;
/**
* read the content from local file cache
* because previously lack of memory buffer
*/
void read_from_cache();
/**
* write the content inside memory buffer into
* local file cache
*/
void upload_to_local_file_cache(bool);
void execute_async() override { on_upload(); }
/**
* do the upload work
* 1. read from cache if the data is written to cache first
* 2. upload content of buffer to S3
* 3. upload content to file cache if necessary
* 4. call the finish callback caller specified
* 5. reclaim self
*/
void on_upload();
/**
*
* @return the stream representing the inner memory buffer
*/
std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
/**
* Currently only used for small file to set callback
*/
void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) {
_upload_to_remote = std::move(cb);
}
private:
std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr;
std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data()
bool _is_cache_allocated {false};
FileBlocksHolderPtr _holder;
decltype(_holder->file_blocks.begin()) _cur_file_block;
size_t _append_offset {0};
uint32_t _crc_value = 0;
};
struct FileBufferBuilder {
FileBufferBuilder() = default;
~FileBufferBuilder() = default;
/**
* build one file buffer using previously set properties
* @return the file buffer's base shared pointer
*/
Status build(std::shared_ptr<FileBuffer>* buf);
/**
* set the file buffer type
*
* @param type enum class for buffer type
*/
FileBufferBuilder& set_type(BufferType type);
/**
* set the download callback which would download the content on cloud into file buffer
*
* @param cb
*/
FileBufferBuilder& set_download_callback(std::function<Status(Slice&)> cb) {
_download = std::move(cb);
return *this;
}
/**
* set the upload callback which would upload the content inside buffer into remote storage
*
* @param cb
*/
FileBufferBuilder& set_upload_callback(std::function<void(UploadFileBuffer& buf)> cb);
/**
* set the callback which would do task sync for the caller
*
* @param cb
*/
FileBufferBuilder& set_sync_after_complete_task(std::function<bool(Status)> cb);
/**
* set the callback which detect whether the task is done
*
* @param cb
*/
FileBufferBuilder& set_is_cancelled(std::function<bool()> cb) {
_is_cancelled = std::move(cb);
return *this;
}
/**
* set the callback which allocate file cache block holder
* **Notice**: Because the load file cache workload coule be done
* asynchronously so you must make sure all the dependencies of this
* cb could last until this cb is invoked
* @param cb
*/
FileBufferBuilder& set_allocate_file_blocks_holder(std::function<FileBlocksHolderPtr()> cb);
/**
* set the file offset of the file buffer
*
* @param cb
*/
FileBufferBuilder& set_file_offset(size_t offset) {
_offset = offset;
return *this;
}
/**
* set the callback which write the content into local file cache
*
* @param cb
*/
FileBufferBuilder& set_write_to_local_file_cache(
std::function<void(FileBlocksHolderPtr, Slice)> cb) {
_write_to_local_file_cache = std::move(cb);
return *this;
}
/**
* set the callback which would write the downloaded content into user's buffer
*
* @param cb
*/
FileBufferBuilder& set_write_to_use_buffer(std::function<void(Slice, size_t)> cb) {
_write_to_use_buffer = std::move(cb);
return *this;
}
BufferType _type;
std::function<void(UploadFileBuffer& buf)> _upload_cb = nullptr;
std::function<bool(Status)> _sync_after_complete_task = nullptr;
std::function<FileBlocksHolderPtr()> _alloc_holder_cb = nullptr;
std::function<bool()> _is_cancelled = nullptr;
std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
std::function<Status(Slice&)> _download;
std::function<void(Slice, size_t)> _write_to_use_buffer;
size_t _offset;
};
} // namespace io
} // namespace doris