blob: 82eff9dd289806d6488faa7f98238a423b1e0b43 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/closure_guard.h>
#include <brpc/controller.h>
#include <bthread/bthread.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <butil/iobuf.h>
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gtest/gtest.h>
#include "vec/sink/varrow_flight_result_writer.h"
#include "vec/sink/vmysql_result_writer.h"
namespace doris::vectorized {
class GetResultBatchCtxTest : public ::testing::Test {
public:
GetResultBatchCtxTest() = default;
~GetResultBatchCtxTest() = default;
};
class MockClosure : public google::protobuf::Closure {
public:
MockClosure(std::function<void()> cb) : _cb(cb) {}
void Run() override { _cb(); }
private:
std::function<void()> _cb;
};
TEST_F(GetResultBatchCtxTest, TestGetResultBatchCtx) {
PFetchDataResult result;
bool done_flag = false;
MockClosure done([&]() -> void { done_flag = true; });
auto ctx = GetResultBatchCtx::create_shared(&result, &done);
{
// on_failure
Status failure = Status::InternalError("Test");
ctx->on_failure(failure);
EXPECT_TRUE(done_flag);
EXPECT_EQ(result.status().status_code(), ErrorCode::INTERNAL_ERROR);
done_flag = false;
}
{
// on_close
int64_t packet_seq = 3;
int64_t returned_rows = 4;
ctx->on_close(packet_seq, returned_rows);
EXPECT_TRUE(done_flag);
EXPECT_EQ(result.query_statistics().returned_rows(), returned_rows);
EXPECT_EQ(result.packet_seq(), packet_seq);
done_flag = false;
}
{
// on_data with 1 row
uint8_t test_byte = 1;
auto num_rows = 1;
auto packet_seq = 0;
auto tresult = std::make_shared<TFetchDataResult>();
tresult->result_batch.rows.resize(num_rows);
tresult->result_batch.rows[0].append((const char*)&test_byte, sizeof(test_byte));
EXPECT_TRUE(ctx->on_data(tresult, packet_seq, nullptr).ok());
EXPECT_TRUE(done_flag);
EXPECT_EQ(result.empty_batch(), false);
EXPECT_EQ(result.packet_seq(), packet_seq);
EXPECT_EQ(result.status().status_code(), ErrorCode::OK);
EXPECT_GT(result.row_batch().length(), 0) << result.row_batch();
done_flag = false;
}
{
// on_data with empty result
auto packet_seq = 0;
auto tresult = std::shared_ptr<TFetchDataResult>();
EXPECT_TRUE(ctx->on_data(tresult, packet_seq, nullptr).ok());
EXPECT_TRUE(done_flag);
EXPECT_EQ(result.empty_batch(), true);
EXPECT_EQ(result.packet_seq(), packet_seq);
EXPECT_EQ(result.status().status_code(), ErrorCode::OK);
EXPECT_EQ(result.row_batch().length(), 0) << result.row_batch();
done_flag = false;
}
{
// on_data with empty result
ctx->_max_msg_size = 0;
uint8_t test_byte = 1;
auto num_rows = 1;
auto packet_seq = 0;
auto tresult = std::make_shared<TFetchDataResult>();
tresult->result_batch.rows.resize(num_rows);
tresult->result_batch.rows[0].append((const char*)&test_byte, sizeof(test_byte));
EXPECT_TRUE(ctx->on_data(tresult, packet_seq, nullptr).ok());
EXPECT_TRUE(done_flag);
EXPECT_EQ(result.empty_batch(), true);
EXPECT_EQ(result.packet_seq(), packet_seq);
EXPECT_EQ(result.status().status_code(), ErrorCode::INTERNAL_ERROR);
EXPECT_EQ(result.row_batch().length(), 0) << result.row_batch();
done_flag = false;
}
}
TEST_F(GetResultBatchCtxTest, TestGetArrowResultBatchCtx) {
PFetchArrowDataResult result;
auto ctx = GetArrowResultBatchCtx::create_shared(&result);
{
// on_failure
Status failure = Status::InternalError("Test");
ctx->on_failure(failure);
EXPECT_EQ(result.status().status_code(), ErrorCode::INTERNAL_ERROR);
}
{
// on_close
int64_t packet_seq = 3;
int64_t returned_rows = 4;
ctx->on_close(packet_seq, returned_rows);
EXPECT_EQ(result.packet_seq(), packet_seq);
EXPECT_EQ(result.eos(), true);
}
}
} // namespace doris::vectorized