blob: 4524620b754a4a462031490cdc7e9b4a34a8e0ca [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/impala-http-handler.h"
#include "testutil/gtest-util.h"
#include "common/names.h"
#include <rapidjson/schema.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
using namespace impala;
using namespace std;
using namespace rapidjson;
namespace impala {
class ImpalaHttpHandlerTest : public testing::Test {
static TPlanNodeId node_id;
static const string PLAN_SCHEMA_JSON;
public:
static void validatePlanSchema(const Document& plan_document) {
Document sd;
ASSERT_FALSE(sd.Parse(PLAN_SCHEMA_JSON).HasParseError());
SchemaDocument schema(sd);
SchemaValidator validator(schema);
if (!plan_document.Accept(validator)) {
StringBuffer sb;
validator.GetInvalidSchemaPointer().StringifyUriFragment(sb);
printf("Invalid schema: %s\n", sb.GetString());
printf("Invalid keyword: %s\n", validator.GetInvalidSchemaKeyword());
sb.Clear();
validator.GetInvalidDocumentPointer().StringifyUriFragment(sb);
printf("Invalid document: %s\n", sb.GetString());
FAIL();
}
}
static TPlanNode createTPlanNode(TPlanNodeId id, const string& label,
const string& label_detail, size_t num_children) {
TPlanNode node;
node.__set_node_id(id);
node.__set_label(label);
node.__set_label_detail(label_detail);
node.__set_num_children(num_children);
return node;
}
static TPlanFragment createTPlanFragment(TFragmentIdx id) {
TPlanFragment plan_fragment;
plan_fragment.__set_idx(id);
return plan_fragment;
}
static void addStatsToSummary(int baseline, TPlanNodeExecSummary* summary) {
TExecStats stats;
// Set some dummy values as stats.
stats.__set_cardinality((baseline + 1) * 2);
stats.__set_latency_ns((baseline + 1) * 2000);
summary->exec_stats.push_back(stats);
}
static TPlanNodeExecSummary createTPlanNodeExecSummary(TPlanNodeId id) {
TPlanNodeExecSummary node_summary;
node_summary.__set_node_id(id);
// Set some dummy values for the summary.
if (id % 2 == 0) {
node_summary.__set_is_broadcast(true);
}
addStatsToSummary(id + 1, &node_summary);
if (id % 3 == 0) {
addStatsToSummary(id + 2, &node_summary);
}
return node_summary;
}
// Should be kept synced with the logic in createTPlanNodeExecSummary.
static void checkPlanNodeStats(const Value& plan_node, TPlanNodeId id) {
ASSERT_TRUE(plan_node.IsObject());
int output_card = (id + 2) * 2;
if (id % 3 == 0) {
if (id % 2 == 0) {
output_card = (id + 3) * 2;
} else {
output_card += (id + 3) * 2;
}
}
EXPECT_EQ(plan_node["output_card"].GetInt(), output_card);
if (id % 3 == 0) {
EXPECT_EQ(plan_node["num_instances"].GetInt(), 2);
} else {
EXPECT_EQ(plan_node["num_instances"].GetInt(), 1);
}
if (id % 2 == 0) {
EXPECT_TRUE(plan_node["is_broadcast"].GetBool());
} else {
EXPECT_FALSE(plan_node.HasMember("is_broadcast"));
}
int max_time = id % 3 == 0 ? (id + 3) * 2000 : (id + 2) * 2000;
EXPECT_EQ(plan_node["max_time_val"].GetInt(), max_time);
EXPECT_EQ(
plan_node["max_time"].GetString(), std::to_string(max_time / 1000) + ".000us");
EXPECT_TRUE(plan_node["children"].IsArray());
}
static void addNodeToFragment(const string& label, const string& label_detail,
size_t num_children, TPlanFragment* fragment, TExecSummary* summary) {
if (fragment->__isset.plan) {
fragment->plan.nodes.push_back(
createTPlanNode(node_id, label, label_detail, num_children));
} else {
TPlan plan;
plan.nodes.push_back(createTPlanNode(node_id, label, label_detail, num_children));
fragment->__set_plan(plan);
}
summary->nodes.push_back(createTPlanNodeExecSummary(node_id));
node_id++;
}
static TDataSink createStreamSink(TPlanNodeId dest_node_id) {
TDataSink sink;
TDataStreamSink stream_sink;
stream_sink.__set_dest_node_id(dest_node_id);
sink.__set_label("EXCHANGE SENDER");
sink.__set_type(TDataSinkType::DATA_STREAM_SINK);
sink.__set_stream_sink(stream_sink);
return sink;
}
static TDataSink createPlanRootSink() {
TDataSink sink;
TPlanRootSink plan_root_sink;
sink.__set_label("PLAN-ROOT SINK");
sink.__set_type(TDataSinkType::PLAN_ROOT_SINK);
sink.__set_plan_root_sink(plan_root_sink);
return sink;
}
static TDataSink createHdfsTableSink() {
TDataSink sink;
TTableSink table_sink;
THdfsTableSink hdfs_table_sink;
table_sink.__set_action(TSinkAction::INSERT);
table_sink.__set_type(TTableSinkType::HDFS);
table_sink.__set_hdfs_table_sink(hdfs_table_sink);
sink.__set_label("HDFS WRITER");
sink.__set_type(TDataSinkType::TABLE_SINK);
sink.__set_table_sink(table_sink);
return sink;
}
static TDataSink createIcebergDeleteTableSink() {
TDataSink sink;
TTableSink table_sink;
TIcebergDeleteSink iceberg_delete_sink;
table_sink.__set_action(TSinkAction::DELETE);
table_sink.__set_type(TTableSinkType::HDFS);
table_sink.__set_iceberg_delete_sink(iceberg_delete_sink);
sink.__set_label("ICEBERG BUFFERED DELETER");
sink.__set_type(TDataSinkType::TABLE_SINK);
sink.__set_table_sink(table_sink);
return sink;
}
static TDataSink createMultiDataSink(const vector<TDataSink>& child_sinks) {
TDataSink sink;
sink.__set_type(TDataSinkType::MULTI_DATA_SINK);
sink.__set_label("MULTI DATA SINK");
sink.__set_child_data_sinks(child_sinks);
return sink;
}
static TDataSink createMergeSink(const vector<TDataSink>& child_sinks) {
TDataSink sink;
sink.__set_type(TDataSinkType::MERGE_SINK);
sink.__set_label("MERGE SINK");
sink.__set_child_data_sinks(child_sinks);
return sink;
}
static void addSinkToFragment(
const TDataSink& sink, TPlanFragment* fragment, TExecSummary* summary) {
fragment->__set_output_sink(sink);
// sinks are stored in summary with -1 as node_id
summary->nodes.push_back(createTPlanNodeExecSummary(-1));
}
ImpalaHttpHandlerTest() { node_id = 0; }
};
TPlanNodeId ImpalaHttpHandlerTest::node_id;
const string ImpalaHttpHandlerTest::PLAN_SCHEMA_JSON = R"({
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"plan_nodes": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string"
},
"label_detail": {
"type": "string"
},
"output_card": {
"type": "integer"
},
"num_instances": {
"type": "integer"
},
"is_broadcast" : {
"const": "true"
},
"max_time": {
"type": "string"
},
"max_time_val": {
"type": "integer"
},
"avg_time": {
"type": "string"
},
"children": {
"$ref": "#/properties/plan_nodes"
},
"data_stream_target": {
"type": "string"
},
"join_build_target": {
"type": "string"
}
},
"required": [
"label",
"label_detail",
"output_card",
"num_instances",
"max_time",
"max_time_val",
"avg_time",
"children"
],
"additionalProperties": false
}
}
},
"required": [ "plan_nodes" ],
"additionalProperties": false
})";
// Prepares query plan for a select statement with two fragments, each with one node.
// The first fragment has an exchange node and the second fragment has a scan node.
//
// F01:PLAN FRAGMENT
// |
// PLAN-ROOT SINK
// |
// 01:EXCHANGE
// |
// F00:PLAN FRAGMENT
// 00:SCAN HDFS
//
// Expected JSON output for the plan will look like this:
// {
// "plan_nodes": [
// {
// "label": "01:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 6,
// "num_instances": 1,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [],
// "data_stream_target": "01:EXCHANGE"
// }
// ]
// }
static void prepareSelectStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 2);
ASSERT_EQ(label_details.size(), 2);
// F01:PLAN FRAGMENT
TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink plan_root_sink = ImpalaHttpHandlerTest::createPlanRootSink();
ImpalaHttpHandlerTest::addSinkToFragment(plan_root_sink, &fragment01, summary);
// 01:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 0, &fragment01, summary);
fragments->push_back(fragment01);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(1);
TDataSink stream_sink =
ImpalaHttpHandlerTest::createStreamSink(fragment01.plan.nodes[0].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(1), label_details.at(1), 0, &fragment00, summary);
fragments->push_back(fragment00);
}
// Prepares query plan for a CTAS statement with two fragments, each with one node.
// Very similar to prepareSelectStatement(), but the first fragment has a table sink.
//
// F01:PLAN FRAGMENT
// |
// WRITE TO HDFS
// |
// 01:EXCHANGE
// |
// F00:PLAN FRAGMENT
// 00:SCAN HDFS
//
// Expected JSON output for the plan:
// {
// "plan_nodes": [
// {
// "label": "HDFS WRITER",
// "label_detail": "INSERT",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "01:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// }
// ]
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 6,
// "num_instances": 1,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [],
// "data_stream_target": "01:EXCHANGE"
// }
// ]
// }
static void prepareCreateTableAsSelectStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 2);
ASSERT_EQ(label_details.size(), 2);
// F01:PLAN FRAGMENT
TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink table_sink = ImpalaHttpHandlerTest::createHdfsTableSink();
ImpalaHttpHandlerTest::addSinkToFragment(table_sink, &fragment01, summary);
// 01:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 0, &fragment01, summary);
fragments->push_back(fragment01);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(1);
TDataSink stream_sink =
ImpalaHttpHandlerTest::createStreamSink(fragment01.plan.nodes[0].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(1), label_details.at(1), 0, &fragment00, summary);
fragments->push_back(fragment00);
}
// Prepares query plan for a join statement with three fragments.
//
// F02:PLAN FRAGMENT
// |
// PLAN-ROOT SINK
// |
// 04:EXCHANGE
// |
// F00:PLAN FRAGMENT
// 02:HASH JOIN
// |
// |--03:EXCHANGE
// | |
// | F01:PLAN FRAGMENT
// | 00:SCAN HDFS
// |
// 01:SCAN HDFS
//
// Expected JSON output for the plan:
//
// {
// "plan_nodes": [
// {
// "label": "04:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// },
// {
// "label": "02:HASH_JOIN",
// "label_detail": "INNER JOIN, BROADCAST",
// "output_card": 6,
// "num_instances": 1,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [
// {
// "label": "01:SCAN HDFS",
// "label_detail": "default.table1 t1",
// "output_card": 8,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "8.000us",
// "max_time_val": 8000,
// "avg_time": "8.000us",
// "children": []
// },
// {
// "label": "03:EXCHANGE",
// "label_detail": "BROADCAST",
// "output_card": 22,
// "num_instances": 2,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "11.000us",
// "children": []
// }
// ],
// "data_stream_target": "04:EXCHANGE"
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table2 t2",
// "output_card": 12,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "12.000us",
// "children": [],
// "data_stream_target": "03:EXCHANGE"
// }
// ]
// }
static void prepareJoinStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 5);
ASSERT_EQ(label_details.size(), 5);
// F02:PLAN FRAGMENT
TPlanFragment fragment02 = ImpalaHttpHandlerTest::createTPlanFragment(2);
// 04:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 0, &fragment02, summary);
TDataSink plan_root_sink = ImpalaHttpHandlerTest::createPlanRootSink();
ImpalaHttpHandlerTest::addSinkToFragment(plan_root_sink, &fragment02, summary);
fragments->push_back(fragment02);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink stream_sink1 =
ImpalaHttpHandlerTest::createStreamSink(fragment02.plan.nodes[0].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink1, &fragment00, summary);
// 02:HASH_JOIN
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(1), label_details.at(1), 2, &fragment00, summary);
// 01:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(2), label_details.at(2), 0, &fragment00, summary);
// 03:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(3), label_details.at(3), 0, &fragment00, summary);
fragments->push_back(fragment00);
// F01:PLAN FRAGMENT
TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(1);
// data stream sink pointing to the 03:EXCHANGE node
TDataSink stream_sink2 =
ImpalaHttpHandlerTest::createStreamSink(fragment00.plan.nodes[2].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink2, &fragment01, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(4), label_details.at(4), 0, &fragment01, summary);
fragments->push_back(fragment01);
}
// Prepares query plan for an Iceberg update statement. Root sink will be a multi data
// sink, consisting of an HDFS table sink and an Iceberg delete table sink.
//
// F00:PLAN FRAGMENT
// |
// MULTI DATA SINK
// |->WRITE TO HDFS
// |
// |->BUFFERED DELETE FROM ICEBERG
// |
// 00:SCAN HDFS
//
// Expected JSON output for the plan will look like this:
// {
// "plan_nodes": [
// {
// "label": "MULTI DATA SINK",
// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// }
// ]
// }
// ]
// }
static void prepareUpdateStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(label_details.size(), 1);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink multi_data_sink = ImpalaHttpHandlerTest::createMultiDataSink(
{ImpalaHttpHandlerTest::createHdfsTableSink(),
ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
ImpalaHttpHandlerTest::addSinkToFragment(multi_data_sink, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 0, &fragment00, summary);
fragments->push_back(fragment00);
}
// Prepares query plan for an Iceberg merge statement. The root sink will be a
// merge sink, consisting of an HDFS writer and an Iceberg buffered delete sink.
//
// F00:PLAN FRAGMENT
// |
// MERGE SINK
// |->WRITE TO HDFS
// |
// |->BUFFERED DELETE FROM ICEBERG
// |
// 03:MERGE
// |
// 02:HASH JOIN
// |
// |--04:EXCHANGE
// | |
// | F01:PLAN FRAGMENT
// | 01:SCAN HDFS
// |
// 00:SCAN HDFS [functional_parquet.target, RANDOM]
//
// Expected JSON output for the plan will look like this:
// {
// "plan_nodes": [
// {
// "label": "MERGE SINK",
// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "03:MERGE",
// "label_detail": "",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": [
// {
// "label": "02:HASH JOIN",
// "label_detail": "INNER JOIN, BROADCAST",
// "output_card": 6,
// "num_instances": 1,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [
// {
// "label": "04:EXCHANGE",
// "label_detail": "BROADCAST",
// "output_card": 8,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "8.000us",
// "max_time_val": 8000,
// "avg_time": "8.000us",
// "children": []
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.target",
// "output_card": 22,
// "num_instances": 2,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "11.000us",
// "children": []
// }
// ]
// }
// ]
// }
// ]
// },
// {
// "label": "01:SCAN HDFS",
// "label_detail": "default.source",
// "output_card": 12,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "12.000us",
// "children": [],
// "data_stream_target": "04:EXCHANGE"
// }
// ]
// }
static void prepareMergeStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 5);
ASSERT_EQ(label_details.size(), 5);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink merge_sink = ImpalaHttpHandlerTest::createMergeSink(
{ImpalaHttpHandlerTest::createHdfsTableSink(),
ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
ImpalaHttpHandlerTest::addSinkToFragment(merge_sink, &fragment00, summary);
// 03:MERGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 1, &fragment00, summary);
// 02:HASH JOIN
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(1), label_details.at(1), 2, &fragment00, summary);
// 04:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(2), label_details.at(2), 0, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(3), label_details.at(3), 0, &fragment00, summary);
fragments->push_back(fragment00);
// F01:PLAN FRAGMENT
TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(1);
TDataSink stream_sink =
ImpalaHttpHandlerTest::createStreamSink(fragment00.plan.nodes[2].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment01, summary);
// 01:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(4), label_details.at(4), 0, &fragment01, summary);
fragments->push_back(fragment01);
}
} // namespace impala
TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
const vector<string> SELECT_LABELS = {"01:EXCHANGE", "00:SCAN HDFS"};
const vector<string> SELECT_LABEL_DETAILS = {"UNPARTITIONED", "default.table"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareSelectStatement(SELECT_LABELS, SELECT_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
// Check the two nodes in the plan
auto array = document["plan_nodes"].GetArray();
ASSERT_EQ(array.Size(), 2);
for (size_t i = 0; i < array.Size(); ++i) {
EXPECT_EQ(array[i]["label"].GetString(), SELECT_LABELS.at(i));
EXPECT_EQ(array[i]["label_detail"].GetString(), SELECT_LABEL_DETAILS.at(i));
checkPlanNodeStats(array[i], i);
EXPECT_EQ(array[i]["children"].Size(), 0);
}
// The data_stream_target of 00:SCAN HDFS should point to the 01:EXCHANGE node
ASSERT_TRUE(array[1].HasMember("data_stream_target"));
EXPECT_EQ(array[1]["data_stream_target"].GetString(), SELECT_LABELS[0]);
}
TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) {
const vector<string> CTAS_LABELS = {"01:EXCHANGE", "00:SCAN HDFS"};
const vector<string> CTAS_LABEL_DETAILS = {"UNPARTITIONED", "default.table"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareCreateTableAsSelectStatement(
CTAS_LABELS, CTAS_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 2);
// Check the HDFS WRITER node
ASSERT_GE(array.Size(), 1);
auto& hdfs_writer_node = array[0];
EXPECT_STREQ(hdfs_writer_node["label"].GetString(), "HDFS WRITER");
EXPECT_STREQ(hdfs_writer_node["label_detail"].GetString(), "INSERT");
checkPlanNodeStats(hdfs_writer_node, -1);
// Check 01:EXCHANGE, which is the child of HDFS WRITER node
EXPECT_EQ(hdfs_writer_node["children"].Size(), 1);
auto& exch01_node = hdfs_writer_node["children"][0];
EXPECT_EQ(exch01_node["label"].GetString(), CTAS_LABELS[0]);
EXPECT_EQ(exch01_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[0]);
checkPlanNodeStats(exch01_node, 0);
EXPECT_EQ(exch01_node["children"].Size(), 0);
// Check the 00:SCAN HDFS node
ASSERT_GE(array.Size(), 2);
auto& scan_node = array[1];
EXPECT_EQ(scan_node["label"].GetString(), CTAS_LABELS[1]);
EXPECT_EQ(scan_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[1]);
checkPlanNodeStats(scan_node, 1);
EXPECT_EQ(scan_node["children"].Size(), 0);
// The data_stream_target should point to the 01:EXCHANGE node
ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), CTAS_LABELS[0]);
}
TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
const vector<string> JOIN_LABELS = {
"04:EXCHANGE", "02:HASH_JOIN", "01:SCAN HDFS", "03:EXCHANGE", "00:SCAN HDFS"};
const vector<string> JOIN_LABEL_DETAILS = {"UNPARTITIONED", "INNER JOIN, BROADCAST",
"default.table1 t1", "BROADCAST", "default.table2 t2"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareJoinStatement(JOIN_LABELS, JOIN_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 3);
// Check the 04:EXCHANGE node
ASSERT_GE(array.Size(), 1);
auto& exch04_node = array[0];
EXPECT_EQ(exch04_node["label"].GetString(), JOIN_LABELS[0]);
EXPECT_EQ(exch04_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[0]);
checkPlanNodeStats(exch04_node, 0);
EXPECT_EQ(exch04_node["children"].Size(), 0);
// Check the 02:HASH JOIN node
ASSERT_GE(array.Size(), 2);
auto& join_node = array[1];
EXPECT_EQ(join_node["label"].GetString(), JOIN_LABELS[1]);
EXPECT_EQ(join_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[1]);
checkPlanNodeStats(join_node, 1);
// The data_stream_target of 02:HASH JOIN should point to the 04:EXCHANGE node
ASSERT_TRUE(join_node.HasMember("data_stream_target"));
EXPECT_EQ(join_node["data_stream_target"], JOIN_LABELS[0]);
// Check the two children of 02:HASH JOIN node
auto children = join_node["children"].GetArray();
ASSERT_EQ(children.Size(), 2);
for (size_t i = 0; i < children.Size(); ++i) {
EXPECT_EQ(children[i]["label"].GetString(), JOIN_LABELS.at(i + 2));
EXPECT_EQ(children[i]["label_detail"].GetString(), JOIN_LABEL_DETAILS.at(i + 2));
checkPlanNodeStats(children[i], i + 2);
EXPECT_EQ(children[i]["children"].Size(), 0);
}
// Check the 00:SCAN HDFS node
ASSERT_GE(array.Size(), 3);
auto& scan_node = array[2];
EXPECT_EQ(scan_node["label"].GetString(), JOIN_LABELS[4]);
EXPECT_EQ(scan_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[4]);
checkPlanNodeStats(scan_node, 4);
EXPECT_EQ(scan_node["children"].Size(), 0);
// The data_stream_target of 00:SCAN HDFS should point to the 03:EXCHANGE node
ASSERT_TRUE(join_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), JOIN_LABELS[3]);
}
TEST_F(ImpalaHttpHandlerTest, UpdateStatement) {
const vector<string> UPDATE_LABELS = {"00:SCAN HDFS"};
const vector<string> UPDATE_LABEL_DETAILS = {"default.table"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareUpdateStatement(UPDATE_LABELS, UPDATE_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
// Check the MULTI DATA SINK node
ASSERT_EQ(array.Size(), 1);
auto& multi_data_sink_node = array[0];
EXPECT_STREQ(multi_data_sink_node["label"].GetString(), "MULTI DATA SINK");
EXPECT_STREQ(multi_data_sink_node["label_detail"].GetString(),
"HDFS WRITER, ICEBERG BUFFERED DELETER");
checkPlanNodeStats(multi_data_sink_node, -1);
// Check 00:SCAN HDFS node, which is the child of MULTI DATA SINK node
ASSERT_EQ(multi_data_sink_node["children"].Size(), 1);
auto& scan_node = multi_data_sink_node["children"][0];
EXPECT_EQ(scan_node["label"].GetString(), UPDATE_LABELS[0]);
EXPECT_EQ(scan_node["label_detail"].GetString(), UPDATE_LABEL_DETAILS[0]);
checkPlanNodeStats(scan_node, 0);
EXPECT_EQ(scan_node["children"].Size(), 0);
}
TEST_F(ImpalaHttpHandlerTest, MergeStatement) {
const vector<string> MERGE_LABELS = {
"03:MERGE", "02:HASH JOIN", "04:EXCHANGE", "00:SCAN HDFS", "01:SCAN HDFS"};
const vector<string> MERGE_LABEL_DETAILS = {
"", "INNER JOIN, BROADCAST", "BROADCAST", "default.target", "default.source"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareMergeStatement(MERGE_LABELS, MERGE_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 2);
// Check the MERGE SINK node
ASSERT_GE(array.Size(), 1);
auto& merge_sink_node = array[0];
EXPECT_STREQ(merge_sink_node["label"].GetString(), "MERGE SINK");
EXPECT_STREQ(merge_sink_node["label_detail"].GetString(),
"HDFS WRITER, ICEBERG BUFFERED DELETER");
checkPlanNodeStats(merge_sink_node, -1);
// Check 03:MERGE node, which is the child of the MERGE SINK node
ASSERT_EQ(merge_sink_node["children"].Size(), 1);
auto& merge_node = merge_sink_node["children"][0];
EXPECT_EQ(merge_node["label"].GetString(), MERGE_LABELS[0]);
EXPECT_EQ(merge_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[0]);
checkPlanNodeStats(merge_node, 0);
// Check the 02:HASH JOIN node, which is a child of the 03:MERGE node
ASSERT_EQ(merge_node["children"].Size(), 1);
auto& hash_join_node = merge_node["children"][0];
EXPECT_EQ(hash_join_node["label"].GetString(), MERGE_LABELS[1]);
EXPECT_EQ(hash_join_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[1]);
checkPlanNodeStats(hash_join_node, 1);
// Check the two children of the 02:HASH JOIN node
auto children = hash_join_node["children"].GetArray();
ASSERT_EQ(children.Size(), 2);
for (size_t i = 0; i < children.Size(); ++i) {
EXPECT_EQ(children[i]["label"].GetString(), MERGE_LABELS.at(i + 2));
EXPECT_EQ(children[i]["label_detail"].GetString(), MERGE_LABEL_DETAILS.at(i + 2));
checkPlanNodeStats(children[i], i + 2);
EXPECT_EQ(children[i]["children"].Size(), 0);
}
// Check the 01:SCAN HDFS node
ASSERT_EQ(array.Size(), 2);
auto& scan_node = array[1];
EXPECT_EQ(scan_node["label"].GetString(), MERGE_LABELS[4]);
EXPECT_EQ(scan_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[4]);
checkPlanNodeStats(scan_node, 4);
EXPECT_EQ(scan_node["children"].Size(), 0);
// The data_stream_target of 01:SCAN HDFS should point to the 04:EXCHANGE node
ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), MERGE_LABELS[2]);
}