blob: 48718361185eb46c3de85aaf83ea4823cc8b0eb9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/closure_guard.h>
#include <brpc/controller.h>
#include <bthread/bthread.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <butil/iobuf.h>
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gtest/gtest.h>
#include "pipeline/dependency.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_runtime_state.h"
#include "vec/sink/varrow_flight_result_writer.h"
namespace doris::vectorized {
class ArrowResultBlockBufferTest : public ::testing::Test {
public:
ArrowResultBlockBufferTest() = default;
~ArrowResultBlockBufferTest() = default;
};
class MockGetArrowResultBatchCtx : public GetArrowResultBatchCtx {
public:
ENABLE_FACTORY_CREATOR(MockGetArrowResultBatchCtx)
MockGetArrowResultBatchCtx(std::function<void()> fail_cb, std::function<void()> close_cb,
std::function<void()> data_cb, PFetchArrowDataResult* result)
: GetArrowResultBatchCtx(result),
_fail_cb(fail_cb),
_close_cb(close_cb),
_data_cb(data_cb) {}
~MockGetArrowResultBatchCtx() override = default;
void on_failure(const Status& status) override { _fail_cb(); }
void on_close(int64_t packet_seq, int64_t returned_rows = 0) override { _close_cb(); }
Status on_data(const std::shared_ptr<vectorized::Block>& t_result, int64_t packet_seq,
ResultBlockBufferBase* buffer) override {
_data_cb();
return GetArrowResultBatchCtx::on_data(t_result, packet_seq, buffer);
}
private:
std::function<void()> _fail_cb;
std::function<void()> _close_cb;
std::function<void()> _data_cb;
};
TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) {
MockRuntimeState state;
state.batsh_size = 1;
int buffer_size = 16;
auto dep = pipeline::Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
bool fail = false;
bool close = false;
bool data = false;
std::shared_ptr<arrow::Schema> schema;
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
[&]() -> void { data = true; }, &presult);
{
auto num_rows = 2;
auto in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_TRUE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], num_rows);
EXPECT_EQ(buffer._instance_rows_in_queue.back()[ins_id], num_rows);
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 1);
EXPECT_FALSE(dep->ready());
in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_TRUE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], num_rows * 2);
EXPECT_EQ(buffer._instance_rows_in_queue.back()[ins_id], num_rows * 2);
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 1);
EXPECT_FALSE(dep->ready());
}
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 1);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_TRUE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
EXPECT_EQ(presult.empty_batch(), false);
EXPECT_EQ(presult.packet_seq(), 0);
EXPECT_EQ(presult.eos(), false);
EXPECT_EQ(presult.status().status_code(), ErrorCode::OK);
data = false;
}
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 1);
EXPECT_EQ(buffer._packet_num, 1);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
}
{
auto in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_TRUE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 2);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
EXPECT_TRUE(dep->ready());
EXPECT_EQ(presult.empty_batch(), false);
EXPECT_EQ(presult.packet_seq(), 1);
EXPECT_EQ(presult.eos(), false);
EXPECT_EQ(presult.status().status_code(), ErrorCode::OK);
data = false;
}
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 1);
EXPECT_EQ(buffer._packet_num, 2);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
}
{
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
EXPECT_EQ(buffer._packet_num, 2);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_TRUE(close);
EXPECT_FALSE(fail);
close = false;
}
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
EXPECT_EQ(buffer._packet_num, 2);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_TRUE(close);
EXPECT_FALSE(fail);
}
}
TEST_F(ArrowResultBlockBufferTest, TestCancelArrowResultBlockBuffer) {
MockRuntimeState state;
state.batsh_size = 1;
int buffer_size = 16;
auto dep = pipeline::Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
bool fail = false;
bool close = false;
bool data = false;
std::shared_ptr<arrow::Schema> schema;
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
[&]() -> void { data = true; }, &presult);
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 1);
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
}
{
auto cancel_status = Status::InternalError("");
buffer.cancel(cancel_status);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_EQ(buffer._status.code(), ErrorCode::INTERNAL_ERROR);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_TRUE(fail);
fail = false;
}
{
EXPECT_EQ(buffer.get_batch(ctx).code(), ErrorCode::INTERNAL_ERROR);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_TRUE(fail);
EXPECT_TRUE(dep->ready());
fail = false;
}
{
auto in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_FALSE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
EXPECT_TRUE(dep->ready());
}
}
TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
MockRuntimeState state;
state.batsh_size = 1;
int buffer_size = 16;
auto dep = pipeline::Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
bool fail = false;
bool close = false;
bool data = false;
std::shared_ptr<arrow::Schema> schema;
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
[&]() -> void { data = true; }, &presult);
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 1);
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
}
{
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_TRUE(buffer._result_sink_dependencies.empty());
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_TRUE(fail);
fail = false;
}
{
auto new_ins_id = TUniqueId();
new_ins_id.lo = 1;
auto new_dep = pipeline::Dependency::create_shared(0, 0, "Test", true);
buffer.set_dependency(new_ins_id, new_dep);
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
}
}
TEST_F(ArrowResultBlockBufferTest, TestArrowResultSerializeFailure) {
MockRuntimeState state;
state.batsh_size = 1;
int buffer_size = 16;
auto dep = pipeline::Dependency::create_shared(0, 0, "Test", true);
auto ins_id = TUniqueId();
bool fail = false;
bool close = false;
bool data = false;
std::shared_ptr<arrow::Schema> schema;
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
std::shared_ptr<GetArrowResultBatchCtx> ctx = MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
[&]() -> void { data = true; }, &presult);
{
auto num_rows = 2;
auto in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_TRUE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], num_rows);
EXPECT_EQ(buffer._instance_rows_in_queue.back()[ins_id], num_rows);
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 1);
EXPECT_FALSE(dep->ready());
in_block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt64>({1, 2}));
EXPECT_TRUE(buffer.add_batch(&state, in_block).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], num_rows * 2);
EXPECT_EQ(buffer._instance_rows_in_queue.back()[ins_id], num_rows * 2);
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 0);
EXPECT_EQ(buffer._result_batch_queue.size(), 1);
EXPECT_FALSE(dep->ready());
}
{
ctx->_max_msg_size = 0;
EXPECT_TRUE(buffer.get_batch(ctx).ok());
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_TRUE(buffer._waiting_rpc.empty());
EXPECT_EQ(buffer._packet_num, 1);
EXPECT_EQ(buffer._result_batch_queue.size(), 0);
EXPECT_TRUE(dep->ready());
EXPECT_TRUE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
EXPECT_EQ(presult.empty_batch(), false);
EXPECT_EQ(presult.packet_seq(), 0);
EXPECT_EQ(presult.eos(), false);
EXPECT_EQ(presult.status().status_code(), ErrorCode::INTERNAL_ERROR);
data = false;
}
}
} // namespace doris::vectorized