blob: c76afb594465353fa0723ea4798e4318d0c9905b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/thread/thread.hpp>
#include "testutil/gtest-util.h"
#include "common/init.h"
#include "common/logging.h"
#include "common/status.h"
#include "codegen/llvm-codegen.h"
#include "exprs/slot-ref.h"
#include "rpc/auth-provider.h"
#include "rpc/thrift-server.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/data-stream-sender.h"
#include "runtime/data-stream-recvr.h"
#include "runtime/descriptors.h"
#include "runtime/client-cache.h"
#include "runtime/backend-client.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "service/fe-support.h"
#include "util/cpu-info.h"
#include "util/disk-info.h"
#include "util/debug-util.h"
#include "util/thread.h"
#include "util/time.h"
#include "util/mem-info.h"
#include "util/test-info.h"
#include "util/tuple-row-compare.h"
#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/Descriptors_types.h"
#include "service/fe-support.h"
#include <iostream>
#include "common/names.h"
using namespace impala;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
DEFINE_int32(port, 20001, "port on which to run Impala test backend");
DECLARE_string(principal);
DECLARE_int32(datastream_sender_timeout_ms);
// We reserve contiguous memory for senders in SetUp. If a test uses more
// senders, a DCHECK will fail and you should increase this value.
static const int MAX_SENDERS = 16;
static const int MAX_RECEIVERS = 16;
static const PlanNodeId DEST_NODE_ID = 1;
static const int BATCH_CAPACITY = 100; // rows
static const int PER_ROW_DATA = 8;
static const int TOTAL_DATA_SIZE = 8 * 1024;
static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
namespace impala {
class ImpalaTestBackend : public ImpalaInternalServiceIf {
public:
ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
virtual ~ImpalaTestBackend() {}
virtual void ExecPlanFragment(
TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) {}
virtual void ReportExecStatus(
TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {}
virtual void CancelPlanFragment(
TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& params) {}
virtual void UpdateFilter(
TUpdateFilterResult& return_val, const TUpdateFilterParams& params) {}
virtual void PublishFilter(
TPublishFilterResult& return_val, const TPublishFilterParams& params) {}
virtual void TransmitData(
TTransmitDataResult& return_val, const TTransmitDataParams& params) {
if (!params.eos) {
mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
params.row_batch, params.sender_id).SetTStatus(&return_val);
} else {
mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
params.sender_id).SetTStatus(&return_val);
}
}
private:
DataStreamMgr* mgr_;
};
class DataStreamTest : public testing::Test {
protected:
DataStreamTest()
: runtime_state_(TQueryCtx(), &exec_env_),
next_val_(0) {
// Initialize Mem trackers for use by the data stream receiver.
exec_env_.InitForFeTests();
runtime_state_.InitMemTrackers(NULL, -1);
// Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
FLAGS_datastream_sender_timeout_ms = 250;
}
virtual void SetUp() {
CreateRowDesc();
CreateTupleComparator();
next_instance_id_.lo = 0;
next_instance_id_.hi = 0;
stream_mgr_ = new DataStreamMgr(new MetricGroup(""));
broadcast_sink_.dest_node_id = DEST_NODE_ID;
broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
random_sink_.dest_node_id = DEST_NODE_ID;
random_sink_.output_partition.type = TPartitionType::RANDOM;
hash_sink_.dest_node_id = DEST_NODE_ID;
hash_sink_.output_partition.type = TPartitionType::HASH_PARTITIONED;
// there's only one column to partition on
TExprNode expr_node;
expr_node.node_type = TExprNodeType::SLOT_REF;
expr_node.type.types.push_back(TTypeNode());
expr_node.type.types.back().__isset.scalar_type = true;
expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
expr_node.num_children = 0;
TSlotRef slot_ref;
slot_ref.slot_id = 0;
expr_node.__set_slot_ref(slot_ref);
TExpr expr;
expr.nodes.push_back(expr_node);
hash_sink_.output_partition.__isset.partition_exprs = true;
hash_sink_.output_partition.partition_exprs.push_back(expr);
// Ensure that individual sender info addresses don't change
sender_info_.reserve(MAX_SENDERS);
receiver_info_.reserve(MAX_RECEIVERS);
StartBackend();
}
const TDataStreamSink& GetSink(TPartitionType::type partition_type) {
switch (partition_type) {
case TPartitionType::UNPARTITIONED: return broadcast_sink_;
case TPartitionType::RANDOM: return random_sink_;
case TPartitionType::HASH_PARTITIONED: return hash_sink_;
default: EXPECT_TRUE(false) << "Unhandled sink type: " << partition_type;
}
// Should never reach this.
return broadcast_sink_;
}
virtual void TearDown() {
lhs_slot_ctx_->Close(NULL);
rhs_slot_ctx_->Close(NULL);
exec_env_.impalad_client_cache()->TestShutdown();
StopBackend();
}
void Reset() {
sender_info_.clear();
receiver_info_.clear();
dest_.clear();
}
ObjectPool obj_pool_;
MemTracker tracker_;
DescriptorTbl* desc_tbl_;
const RowDescriptor* row_desc_;
TupleRowComparator* less_than_;
ExecEnv exec_env_;
RuntimeState runtime_state_;
TUniqueId next_instance_id_;
string stmt_;
// RowBatch generation
scoped_ptr<RowBatch> batch_;
int next_val_;
int64_t* tuple_mem_;
// receiving node
DataStreamMgr* stream_mgr_;
ThriftServer* server_;
// sending node(s)
TDataStreamSink broadcast_sink_;
TDataStreamSink random_sink_;
TDataStreamSink hash_sink_;
vector<TPlanFragmentDestination> dest_;
struct SenderInfo {
thread* thread_handle;
Status status;
int num_bytes_sent;
SenderInfo(): thread_handle(NULL), num_bytes_sent(0) {}
};
vector<SenderInfo> sender_info_;
struct ReceiverInfo {
TPartitionType::type stream_type;
int num_senders;
int receiver_num;
thread* thread_handle;
shared_ptr<DataStreamRecvr> stream_recvr;
Status status;
int num_rows_received;
multiset<int64_t> data_values;
ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
: stream_type(stream_type),
num_senders(num_senders),
receiver_num(receiver_num),
thread_handle(NULL),
num_rows_received(0) {}
~ReceiverInfo() {
delete thread_handle;
stream_recvr.reset();
}
};
vector<ReceiverInfo> receiver_info_;
// Create an instance id and add it to dest_
void GetNextInstanceId(TUniqueId* instance_id) {
dest_.push_back(TPlanFragmentDestination());
TPlanFragmentDestination& dest = dest_.back();
dest.fragment_instance_id = next_instance_id_;
dest.server.hostname = "127.0.0.1";
dest.server.port = FLAGS_port;
*instance_id = next_instance_id_;
++next_instance_id_.lo;
}
// RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot
// isn't nullable
void CreateRowDesc() {
// create DescriptorTbl
TTupleDescriptor tuple_desc;
tuple_desc.__set_id(0);
tuple_desc.__set_byteSize(8);
tuple_desc.__set_numNullBytes(0);
TDescriptorTable thrift_desc_tbl;
thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc);
TSlotDescriptor slot_desc;
slot_desc.__set_id(0);
slot_desc.__set_parent(0);
ColumnType type(TYPE_BIGINT);
slot_desc.__set_slotType(type.ToThrift());
slot_desc.__set_materializedPath(vector<int>(1, 0));
slot_desc.__set_byteOffset(0);
slot_desc.__set_nullIndicatorByte(0);
slot_desc.__set_nullIndicatorBit(-1);
slot_desc.__set_slotIdx(0);
thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
runtime_state_.set_desc_tbl(desc_tbl_);
vector<TTupleId> row_tids;
row_tids.push_back(0);
vector<bool> nullable_tuples;
nullable_tuples.push_back(false);
row_desc_ = obj_pool_.Add(new RowDescriptor(*desc_tbl_, row_tids, nullable_tuples));
}
// Create a tuple comparator to sort in ascending order on the single bigint column.
void CreateTupleComparator() {
SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
lhs_slot_ctx_ = obj_pool_.Add(new ExprContext(lhs_slot));
SlotRef* rhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
rhs_slot_ctx_ = obj_pool_.Add(new ExprContext(rhs_slot));
lhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
rhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
lhs_slot_ctx_->Open(NULL);
rhs_slot_ctx_->Open(NULL);
SortExecExprs* sort_exprs = obj_pool_.Add(new SortExecExprs());
sort_exprs->Init(
vector<ExprContext*>(1, lhs_slot_ctx_), vector<ExprContext*>(1, rhs_slot_ctx_));
less_than_ = obj_pool_.Add(new TupleRowComparator(
*sort_exprs, vector<bool>(1, true), vector<bool>(1, false)));
}
// Create batch_, but don't fill it with data yet. Assumes we created row_desc_.
RowBatch* CreateRowBatch() {
RowBatch* batch = new RowBatch(*row_desc_, BATCH_CAPACITY, &tracker_);
int64_t* tuple_mem = reinterpret_cast<int64_t*>(
batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8));
bzero(tuple_mem, BATCH_CAPACITY * 8);
for (int i = 0; i < BATCH_CAPACITY; ++i) {
int idx = batch->AddRow();
TupleRow* row = batch->GetRow(idx);
row->SetTuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
batch->CommitLastRow();
}
return batch;
}
void GetNextBatch(RowBatch* batch, int* next_val) {
for (int i = 0; i < BATCH_CAPACITY; ++i) {
TupleRow* row = batch->GetRow(i);
int64_t* val = reinterpret_cast<int64_t*>(row->GetTuple(0)->GetSlot(0));
*val = (*next_val)++;
}
}
// Start receiver (expecting given number of senders) in separate thread.
void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num,
int buffer_size, bool is_merging, TUniqueId* out_id = NULL) {
VLOG_QUERY << "start receiver";
RuntimeProfile* profile =
obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
ReceiverInfo& info = receiver_info_.back();
info.stream_recvr =
stream_mgr_->CreateRecvr(&runtime_state_,
*row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, profile,
is_merging);
if (!is_merging) {
info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
} else {
info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info,
profile);
}
if (out_id != NULL) *out_id = instance_id;
}
void JoinReceivers() {
VLOG_QUERY << "join receiver\n";
for (int i = 0; i < receiver_info_.size(); ++i) {
receiver_info_[i].thread_handle->join();
receiver_info_[i].stream_recvr->Close();
}
}
// Deplete stream and print batches
void ReadStream(ReceiverInfo* info) {
RowBatch* batch;
VLOG_QUERY << "start reading";
while (!(info->status = info->stream_recvr->GetBatch(&batch)).IsCancelled() &&
(batch != NULL)) {
VLOG_QUERY << "read batch #rows=" << batch->num_rows();
for (int i = 0; i < batch->num_rows(); ++i) {
TupleRow* row = batch->GetRow(i);
info->data_values.insert(*static_cast<int64_t*>(row->GetTuple(0)->GetSlot(0)));
}
SleepForMs(100); // slow down receiver to exercise buffering logic
}
if (info->status.IsCancelled()) VLOG_QUERY << "reader is cancelled";
VLOG_QUERY << "done reading";
}
void ReadStreamMerging(ReceiverInfo* info, RuntimeProfile* profile) {
info->status = info->stream_recvr->CreateMerger(*less_than_);
if (info->status.IsCancelled()) return;
RowBatch batch(*row_desc_, 1024, &tracker_);
VLOG_QUERY << "start reading merging";
bool eos;
while (!(info->status = info->stream_recvr->GetNext(&batch, &eos)).IsCancelled()) {
VLOG_QUERY << "read batch #rows=" << batch.num_rows();
for (int i = 0; i < batch.num_rows(); ++i) {
TupleRow* row = batch.GetRow(i);
info->data_values.insert(*static_cast<int64_t*>(row->GetTuple(0)->GetSlot(0)));
}
SleepForMs(100);
batch.Reset();
if (eos) break;
}
if (info->status.IsCancelled()) VLOG_QUERY << "reader is cancelled";
VLOG_QUERY << "done reading";
}
// Verify correctness of receivers' data values.
void CheckReceivers(TPartitionType::type stream_type, int num_senders) {
int64_t total = 0;
multiset<int64_t> all_data_values;
for (int i = 0; i < receiver_info_.size(); ++i) {
ReceiverInfo& info = receiver_info_[i];
EXPECT_OK(info.status);
total += info.data_values.size();
ASSERT_EQ(info.stream_type, stream_type);
ASSERT_EQ(info.num_senders, num_senders);
if (stream_type == TPartitionType::UNPARTITIONED) {
EXPECT_EQ(
NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size());
}
all_data_values.insert(info.data_values.begin(), info.data_values.end());
int k = 0;
for (multiset<int64_t>::iterator j = info.data_values.begin();
j != info.data_values.end(); ++j, ++k) {
if (stream_type == TPartitionType::UNPARTITIONED) {
// unpartitioned streams contain all values as many times as there are
// senders
EXPECT_EQ(k / num_senders, *j);
} else if (stream_type == TPartitionType::HASH_PARTITIONED) {
// hash-partitioned streams send values to the right partition
int64_t value = *j;
uint32_t hash_val =
RawValue::GetHashValueFnv(&value, TYPE_BIGINT, HashUtil::FNV_SEED);
EXPECT_EQ(hash_val % receiver_info_.size(), info.receiver_num);
}
}
}
if (stream_type == TPartitionType::HASH_PARTITIONED) {
EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
int k = 0;
for (multiset<int64_t>::iterator j = all_data_values.begin();
j != all_data_values.end(); ++j, ++k) {
// each sender sent all values
EXPECT_EQ(k / num_senders, *j);
if (k/num_senders != *j) break;
}
}
}
void CheckSenders() {
for (int i = 0; i < sender_info_.size(); ++i) {
EXPECT_OK(sender_info_[i].status);
EXPECT_GT(sender_info_[i].num_bytes_sent, 0);
}
}
// Start backend in separate thread.
void StartBackend() {
boost::shared_ptr<ImpalaTestBackend> handler(new ImpalaTestBackend(stream_mgr_));
boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
server_ = new ThriftServer("DataStreamTest backend", processor, FLAGS_port, NULL);
server_->Start();
}
void StopBackend() {
VLOG_QUERY << "stop backend\n";
server_->StopForTesting();
delete server_;
}
void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
int channel_buffer_size = 1024) {
VLOG_QUERY << "start sender";
int num_senders = sender_info_.size();
ASSERT_LT(num_senders, MAX_SENDERS);
sender_info_.push_back(SenderInfo());
SenderInfo& info = sender_info_.back();
info.thread_handle =
new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
partition_type);
}
void JoinSenders() {
VLOG_QUERY << "join senders\n";
for (int i = 0; i < sender_info_.size(); ++i) {
sender_info_[i].thread_handle->join();
}
}
void Sender(int sender_num, int channel_buffer_size,
TPartitionType::type partition_type) {
RuntimeState state(TQueryCtx(), &exec_env_);
state.set_desc_tbl(desc_tbl_);
state.InitMemTrackers(NULL, -1);
VLOG_QUERY << "create sender " << sender_num;
const TDataStreamSink& sink = GetSink(partition_type);
DataStreamSender sender(
&obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size);
EXPECT_OK(sender.Prepare(&state, &tracker_));
EXPECT_OK(sender.Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
SenderInfo& info = sender_info_[sender_num];
int next_val = 0;
for (int i = 0; i < NUM_BATCHES; ++i) {
GetNextBatch(batch.get(), &next_val);
VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
info.status = sender.Send(&state, batch.get());
if (!info.status.ok()) break;
}
VLOG_QUERY << "closing sender" << sender_num;
sender.FlushFinal(&state);
sender.Close(&state);
info.num_bytes_sent = sender.GetNumDataBytesSent();
batch->Reset();
}
void TestStream(TPartitionType::type stream_type, int num_senders,
int num_receivers, int buffer_size, bool is_merging) {
VLOG_QUERY << "Testing stream=" << stream_type << " #senders=" << num_senders
<< " #receivers=" << num_receivers << " buffer_size=" << buffer_size
<< " is_merging=" << is_merging;
Reset();
for (int i = 0; i < num_receivers; ++i) {
StartReceiver(stream_type, num_senders, i, buffer_size, is_merging);
}
for (int i = 0; i < num_senders; ++i) {
StartSender(stream_type, buffer_size);
}
JoinSenders();
CheckSenders();
JoinReceivers();
CheckReceivers(stream_type, num_senders);
}
private:
ExprContext* lhs_slot_ctx_;
ExprContext* rhs_slot_ctx_;
};
TEST_F(DataStreamTest, UnknownSenderSmallResult) {
// starting a sender w/o a corresponding receiver results in an error. No bytes should
// be sent.
// case 1: entire query result fits in single buffer
TUniqueId dummy_id;
GetNextInstanceId(&dummy_id);
StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
JoinSenders();
EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
}
TEST_F(DataStreamTest, UnknownSenderLargeResult) {
// case 2: query result requires multiple buffers
TUniqueId dummy_id;
GetNextInstanceId(&dummy_id);
StartSender();
JoinSenders();
EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
}
TEST_F(DataStreamTest, Cancel) {
TUniqueId instance_id;
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
stream_mgr_->Cancel(instance_id);
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
stream_mgr_->Cancel(instance_id);
JoinReceivers();
EXPECT_TRUE(receiver_info_[0].status.IsCancelled());
EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
}
TEST_F(DataStreamTest, BasicTest) {
// TODO: also test that all client connections have been returned
TPartitionType::type stream_types[] =
{TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
TPartitionType::HASH_PARTITIONED};
int sender_nums[] = {1, 4};
int receiver_nums[] = {1, 4};
int buffer_sizes[] = {1024, 1024 * 1024};
bool merging[] = {false, true};
for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) {
for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) {
for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) {
for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) {
for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) {
TestStream(stream_types[i], sender_nums[j], receiver_nums[k],
buffer_sizes[l], merging[m]);
}
}
}
}
}
}
// This test checks for the avoidance of IMPALA-2931, which is a crash that would occur if
// the parent memtracker of a DataStreamRecvr's memtracker was deleted before the
// DataStreamRecvr was destroyed. The fix was to move decoupling the child tracker from
// the parent into DataStreamRecvr::Close() which should always be called before the
// parent is destroyed. In practice the parent is a member of the query's runtime state.
//
// TODO: Make lifecycle requirements more explicit.
TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
runtime_state->InitMemTrackers(NULL, -1);
scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
// Start just one receiver.
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
shared_ptr<DataStreamRecvr> stream_recvr = stream_mgr_->CreateRecvr(runtime_state.get(),
*row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false);
// Perform tear down, but keep a reference to the receiver so that it is deleted last
// (to confirm that the destructor does not access invalid state after tear-down).
stream_recvr->Close();
// Force deletion of the parent memtracker by destroying it's owning runtime state.
runtime_state.reset();
// Send an eos RPC to the receiver. Not required for tear-down, but confirms that the
// RPC does not cause an error (the receiver will still be called, since it is only
// Close()'d, not deleted from the data stream manager).
Status rpc_status;
ImpalaBackendConnection client(exec_env_.impalad_client_cache(),
MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
EXPECT_OK(rpc_status);
TTransmitDataParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
params.__set_eos(true);
params.__set_dest_fragment_instance_id(instance_id);
params.__set_dest_node_id(DEST_NODE_ID);
TUniqueId dummy_id;
params.__set_sender_id(0);
TTransmitDataResult result;
rpc_status = client.DoRpc(&ImpalaBackendClient::TransmitData, params, &result);
// Finally, stream_recvr destructor happens here. Before fix for IMPALA-2931, this
// would have resulted in a crash.
stream_recvr.reset();
}
// TODO: more tests:
// - test case for transmission error in last batch
// - receivers getting created concurrently
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
InitFeSupport();
impala::LlvmCodeGen::InitializeLlvm();
return RUN_ALL_TESTS();
}