blob: d25eb6a839cd0b81f70870e833200dd3fff48d41 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "stream_load_pipe.h"
#include <glog/logging.h>
#include <algorithm>
#include <ostream>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "vec/common/custom_allocator.h"
namespace doris {
namespace io {
struct IOContext;
StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
int64_t total_length, bool use_proto)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_use_proto(use_proto) {}
StreamLoadPipe::~StreamLoadPipe() {
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
}
Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
*bytes_read = 0;
size_t bytes_req = result.size;
char* to = result.data;
if (UNLIKELY(bytes_req == 0)) {
return Status::OK();
}
while (*bytes_read < bytes_req) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::Cancelled("cancelled: {}", _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
// break the while loop
bytes_req = *bytes_read;
return Status::OK();
}
auto buf = _buf_queue.front();
int64_t copy_size = std::min(bytes_req - *bytes_read, buf->remaining());
buf->get_bytes(to + *bytes_read, copy_size);
*bytes_read += copy_size;
if (!buf->has_remaining()) {
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
_put_cond.notify_one();
}
}
DCHECK(*bytes_read == bytes_req)
<< "*bytes_read=" << *bytes_read << ", bytes_req=" << bytes_req;
return Status::OK();
}
// If _total_length == -1, this should be a Kafka routine load task or stream load with chunked transfer HTTP request,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status StreamLoadPipe::read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) {
if (_total_length < -1) {
return Status::InternalError("invalid, _total_length is: {}", _total_length);
} else if (_total_length == 0) {
// no data
*length = 0;
return Status::OK();
}
if (_total_length == -1) {
return _read_next_buffer(data, length);
}
// _total_length > 0, read the entire data
*data = make_unique_buffer<uint8_t>(_total_length);
Slice result(data->get(), _total_length);
Status st = read_at(0, result, length);
return st;
}
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR(ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1), &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
}
Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
PDataRow* row_ptr = row.get();
{
std::unique_lock<std::mutex> l(_lock);
_data_row_ptrs.emplace_back(std::move(row));
}
return append_and_flush(reinterpret_cast<char*>(&row_ptr), sizeof(row_ptr),
sizeof(PDataRow*) + row_ptr->ByteSizeLong());
}
Status StreamLoadPipe::append(const char* data, size_t size) {
size_t pos = 0;
if (_write_buf != nullptr) {
if (size < _write_buf->remaining()) {
_write_buf->put_bytes(data, size);
return Status::OK();
} else {
pos = _write_buf->remaining();
_write_buf->put_bytes(data, pos);
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
}
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
return _append(buf);
}
// read the next buffer from _buf_queue
Status StreamLoadPipe::_read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::Cancelled("cancelled: {}", _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
data->reset();
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
*data = make_unique_buffer<uint8_t>(*length);
buf->get_bytes((char*)(data->get()), *length);
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
if (_use_proto) {
auto row_ptr = std::move(_data_row_ptrs.front());
_proto_buffered_bytes -= (sizeof(PDataRow*) + row_ptr->GetCachedSize());
_data_row_ptrs.pop_front();
// PlainBinaryLineReader will hold the PDataRow
row_ptr.release();
}
_put_cond.notify_one();
return Status::OK();
}
Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size) {
{
std::unique_lock<std::mutex> l(_lock);
// if _buf_queue is empty, we append this buf without size check
if (_use_proto) {
while (!_cancelled && !_buf_queue.empty() &&
(_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) {
_put_cond.wait(l);
}
} else {
while (!_cancelled && !_buf_queue.empty() &&
_buffered_bytes + buf->remaining() > _max_buffered_bytes) {
_put_cond.wait(l);
}
}
if (_cancelled) {
return Status::Cancelled("cancelled: {}", _cancelled_reason);
}
_buf_queue.push_back(buf);
if (_use_proto) {
_proto_buffered_bytes += proto_byte_size;
} else {
_buffered_bytes += buf->remaining();
}
}
_get_cond.notify_one();
return Status::OK();
}
// called when producer finished
Status StreamLoadPipe::finish() {
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
{
std::lock_guard<std::mutex> l(_lock);
_finished = true;
}
_get_cond.notify_all();
return Status::OK();
}
// called when producer/consumer failed
void StreamLoadPipe::cancel(const std::string& reason) {
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
_cancelled_reason = reason;
}
_get_cond.notify_all();
_put_cond.notify_all();
}
TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) {
TUniqueId pipe_id;
pipe_id.lo = query_id.lo + fragment_id;
pipe_id.hi = query_id.hi;
return pipe_id;
}
size_t StreamLoadPipe::current_capacity() {
std::unique_lock<std::mutex> l(_lock);
if (_use_proto) {
return _proto_buffered_bytes;
} else {
return _buffered_bytes;
}
}
} // namespace io
} // namespace doris