IMPALA-4192: Move static state from DataSink into a DataSinkConfig
This patch adds a new class called DataSinkConfig which contains a
subset of the static state of their corresponding DataSink, of
which there is one instance per fragment. DataSink contains the
runtime state and there can be up to MT_DOP instances of it per
fragment.
Eventually all static state including codegened function pointers
would be moved to the PlanNodes.
Testing:
Ran exhaustive tests successfully.
Change-Id: I8d5b4226f6cec5305b0ec9a25c5f18b5521c8dd2
Reviewed-on: http://gerrit.cloudera.org:8080/14941
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index c34a968..fcff3fd 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -33,8 +33,8 @@
namespace impala {
BlockingPlanRootSink::BlockingPlanRootSink(
- TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
- : PlanRootSink(sink_id, row_desc, state) {}
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+ : PlanRootSink(sink_id, sink_config, state) {}
Status BlockingPlanRootSink::Prepare(
RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index 38e4439..0a5b849 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -48,7 +48,7 @@
class BlockingPlanRootSink : public PlanRootSink {
public:
BlockingPlanRootSink(
- TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
/// TODO: Currently, this does nothing, it just calls DataSink::Prepare. However, adding
/// it is necessary because BufferedPlanRootSink needs to use PlanRootSink::Prepare.
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 6427bf6..c365f69 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -29,10 +29,10 @@
const int FETCH_NUM_BATCHES = 10;
BufferedPlanRootSink::BufferedPlanRootSink(TDataSinkId sink_id,
- const RowDescriptor* row_desc, RuntimeState* state,
- const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
- : PlanRootSink(sink_id, row_desc, state),
- resource_profile_(resource_profile),
+ const DataSinkConfig& sink_config, RuntimeState* state,
+ const TDebugOptions& debug_options)
+ : PlanRootSink(sink_id, sink_config, state),
+ resource_profile_(sink_config.tsink_->plan_root_sink.resource_profile),
debug_options_(debug_options) {}
Status BufferedPlanRootSink::Prepare(
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index c908cda..7be257e 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -43,9 +43,8 @@
/// synchronize access to the queue.
class BufferedPlanRootSink : public PlanRootSink {
public:
- BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- RuntimeState* state, const TBackendResourceProfile& resource_profile,
- const TDebugOptions& debug_options);
+ BufferedPlanRootSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+ RuntimeState* state, const TDebugOptions& debug_options);
/// Initializes the row_batches_get_wait_timer_ and row_batches_send_wait_timer_
/// counters.
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index ed3820f..b50d33e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -39,20 +39,81 @@
#include "common/names.h"
-DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
- "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
- "can accumulate before the row batch is sent over the wire.");
-
using strings::Substitute;
namespace impala {
+Status DataSinkConfig::Init(
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ tsink_ = &tsink;
+ input_row_desc_ = input_row_desc;
+ return ScalarExpr::Create(tsink.output_exprs, *input_row_desc_, state, &output_exprs_);
+}
+
+Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
+ const RowDescriptor* row_desc, RuntimeState* state, const DataSinkConfig** sink) {
+ ObjectPool* pool = state->obj_pool();
+ DataSinkConfig* data_sink = nullptr;
+ switch (thrift_sink.type) {
+ case TDataSinkType::DATA_STREAM_SINK:
+ if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
+ // TODO: figure out good buffer size based on size of output row
+ data_sink = pool->Add(new KrpcDataStreamSenderConfig());
+ break;
+ case TDataSinkType::TABLE_SINK:
+ if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
+ switch (thrift_sink.table_sink.type) {
+ case TTableSinkType::HDFS:
+ data_sink = pool->Add(new HdfsTableSinkConfig());
+ break;
+ case TTableSinkType::KUDU:
+ RETURN_IF_ERROR(CheckKuduAvailability());
+ data_sink = pool->Add(new KuduTableSinkConfig());
+ break;
+ case TTableSinkType::HBASE:
+ data_sink = pool->Add(new HBaseTableSinkConfig());
+ break;
+ default:
+ stringstream error_msg;
+ map<int, const char*>::const_iterator i =
+ _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
+ const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
+ i->second :
+ "Unknown table sink";
+ error_msg << str << " not implemented.";
+ return Status(error_msg.str());
+ }
+ break;
+ case TDataSinkType::PLAN_ROOT_SINK:
+ data_sink = pool->Add(new PlanRootSinkConfig());
+ break;
+ case TDataSinkType::JOIN_BUILD_SINK:
+ // IMPALA-4224 - join build sink not supported in backend execution.
+ default:
+ stringstream error_msg;
+ map<int, const char*>::const_iterator i =
+ _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
+ const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
+ i->second :
+ "Unknown data sink type ";
+ error_msg << str << " not implemented.";
+ return Status(error_msg.str());
+ }
+ RETURN_IF_ERROR(data_sink->Init(thrift_sink, row_desc, state));
+ *sink = data_sink;
+ return Status::OK();
+}
+
// Empty string
const char* const DataSink::ROOT_PARTITION_KEY = "";
-DataSink::DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
- RuntimeState* state)
- : closed_(false), row_desc_(row_desc), name_(name) {
+DataSink::DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+ const string& name, RuntimeState* state)
+ : sink_config_(sink_config),
+ closed_(false),
+ row_desc_(sink_config.input_row_desc_),
+ name_(name),
+ output_exprs_(sink_config.output_exprs_) {
profile_ = RuntimeProfile::Create(state->obj_pool(), name);
if (sink_id != -1) {
// There is one sink per fragment so we can use the fragment index as a unique
@@ -65,77 +126,6 @@
DCHECK(closed_);
}
-Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
- const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor* row_desc,
- RuntimeState* state, DataSink** sink) {
- const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
- const vector<TExpr>& thrift_output_exprs = thrift_sink.output_exprs;
- ObjectPool* pool = state->obj_pool();
- // We have one fragment per sink, so we can use the fragment index as the sink ID.
- TDataSinkId sink_id = fragment_ctx.fragment.idx;
- switch (thrift_sink.type) {
- case TDataSinkType::DATA_STREAM_SINK:
- if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
- // TODO: figure out good buffer size based on size of output row
- *sink = pool->Add(new KrpcDataStreamSender(sink_id,
- fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
- fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
- break;
- case TDataSinkType::TABLE_SINK:
- if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
- switch (thrift_sink.table_sink.type) {
- case TTableSinkType::HDFS:
- *sink =
- pool->Add(new HdfsTableSink(sink_id, row_desc, thrift_sink, state));
- break;
- case TTableSinkType::HBASE:
- *sink =
- pool->Add(new HBaseTableSink(sink_id, row_desc, thrift_sink, state));
- break;
- case TTableSinkType::KUDU:
- RETURN_IF_ERROR(CheckKuduAvailability());
- *sink =
- pool->Add(new KuduTableSink(sink_id, row_desc, thrift_sink, state));
- break;
- default:
- stringstream error_msg;
- map<int, const char*>::const_iterator i =
- _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
- const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
- i->second : "Unknown table sink";
- error_msg << str << " not implemented.";
- return Status(error_msg.str());
- }
- break;
- case TDataSinkType::PLAN_ROOT_SINK:
- if (state->query_options().spool_query_results) {
- *sink = pool->Add(new BufferedPlanRootSink(sink_id, row_desc, state,
- fragment_ctx.fragment.output_sink.plan_root_sink.resource_profile,
- fragment_instance_ctx.debug_options));
- } else {
- *sink = pool->Add(new BlockingPlanRootSink(sink_id, row_desc, state));
- }
- break;
- case TDataSinkType::JOIN_BUILD_SINK:
- // IMPALA-4224 - join build sink not supported in backend execution.
- default:
- stringstream error_msg;
- map<int, const char*>::const_iterator i =
- _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
- const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
- i->second : "Unknown data sink type ";
- error_msg << str << " not implemented.";
- return Status(error_msg.str());
- }
- RETURN_IF_ERROR((*sink)->Init(thrift_output_exprs, thrift_sink, state));
- return Status::OK();
-}
-
-Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) {
- return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
-}
-
Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
DCHECK(parent_mem_tracker != nullptr);
DCHECK(profile_ != nullptr);
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 60968c4..1f06625 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,6 +29,7 @@
namespace impala {
+class DataSink;
class MemPool;
class ObjectPool;
class RowBatch;
@@ -43,6 +44,46 @@
class TPlanFragmentInstanceCtx;
class TInsertStats;
+/// Configuration class for creating DataSink objects. It contains a subset of the static
+/// state of their corresponding DataSink, of which there is one instance per fragment.
+/// DataSink contains the runtime state and there can be up to MT_DOP instances of it per
+/// fragment.
+class DataSinkConfig {
+ public:
+ DataSinkConfig() = default;
+ virtual ~DataSinkConfig() {}
+
+ /// Create its corresponding DataSink. Place the sink in state->obj_pool().
+ virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
+
+ /// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
+ /// owned by QueryState.
+ const TDataSink* tsink_ = nullptr;
+
+ /// The row descriptor for the rows consumed by the sink. Owned by root plan node of
+ /// plan tree, which feeds into this sink. Set in Init().
+ const RowDescriptor* input_row_desc_ = nullptr;
+
+ /// Output expressions to convert row batches onto output values.
+ /// Not used in some sub-classes.
+ std::vector<ScalarExpr*> output_exprs_;
+
+ /// Creates a new data sink config, allocated in state->obj_pool() and returned through
+ /// *sink, from the thrift sink object in fragment_ctx.
+ static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
+ RuntimeState* state, const DataSinkConfig** sink);
+
+ protected:
+ /// Sets reference to TDataSink and initializes the expressions. Returns error status on
+ /// failure. If overridden in subclass, must first call superclass's Init().
+ virtual Status Init(
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state);
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(DataSinkConfig);
+};
+
/// A data sink is an abstract interface for data sinks that consume RowBatches. E.g.
/// a sink may write a HDFS table, send data across the network, or build hash tables
/// for a join.
@@ -57,7 +98,7 @@
/// If this is the sink at the root of a fragment, 'sink_id' must be a unique ID for
/// the sink for use in runtime profiles and other purposes. Otherwise this is a join
/// build sink owned by an ExecNode and 'sink_id' must be -1.
- DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
+ DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config, const string& name,
RuntimeState* state);
virtual ~DataSink();
@@ -87,12 +128,6 @@
/// Must be idempotent.
virtual void Close(RuntimeState* state);
- /// Creates a new data sink, allocated in pool and returned through *sink, from
- /// thrift_sink.
- static Status Create(const TPlanFragmentCtx& fragment_ctx,
- const TPlanFragmentInstanceCtx& fragment_instance_ctx,
- const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
-
MemTracker* mem_tracker() const { return mem_tracker_.get(); }
RuntimeProfile* profile() const { return profile_; }
const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
@@ -104,6 +139,9 @@
static const char* const ROOT_PARTITION_KEY;
protected:
+ /// Reference to the sink configuration shared across fragment instances.
+ const DataSinkConfig& sink_config_;
+
/// Set to true after Close() has been called. Subclasses should check and set this in
/// Close().
bool closed_;
@@ -137,10 +175,6 @@
/// Not used in some sub-classes.
std::vector<ScalarExpr*> output_exprs_;
std::vector<ScalarExprEvaluator*> output_expr_evals_;
-
- /// Initialize the expressions in the data sink and return error status on failure.
- virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state);
};
} // namespace impala
#endif
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index c907bcc..129ba25 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -31,15 +31,19 @@
namespace impala {
-HBaseTableSink::HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state)
- : DataSink(sink_id, row_desc, "HBaseTableSink", state),
- table_id_(tsink.table_sink.target_table_id),
- table_desc_(NULL),
- hbase_table_writer_(NULL) {
- DCHECK(tsink.__isset.table_sink);
+DataSink* HBaseTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ TDataSinkId sink_id = fragment_ctx.fragment.idx;
+ return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state));
}
+HBaseTableSink::HBaseTableSink(
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+ : DataSink(sink_id, sink_config, "HBaseTableSink", state),
+ table_id_(sink_config.tsink_->table_sink.target_table_id),
+ table_desc_(NULL),
+ hbase_table_writer_(NULL) {}
+
Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
SCOPED_TIMER(profile()->total_time_counter());
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index d973ad4..4d702bd 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -31,12 +31,21 @@
namespace impala {
+class HBaseTableSinkConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ ~HBaseTableSinkConfig() override {}
+};
+
/// Class to take row batches and send them to the HBaseTableWriter to
/// eventually be written into an HBase table.
class HBaseTableSink : public DataSink {
public:
- HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state);
+ HBaseTableSink(
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
virtual Status Send(RuntimeState* state, RowBatch* batch);
virtual Status FlushFinal(RuntimeState* state);
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 3e695b4..2674e72 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -53,23 +53,39 @@
namespace impala {
-HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state)
- : DataSink(sink_id, row_desc, "HdfsTableSink", state),
+Status HdfsTableSinkConfig::Init(
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+ DCHECK(tsink_->__isset.table_sink);
+ DCHECK(tsink_->table_sink.__isset.hdfs_table_sink);
+ RETURN_IF_ERROR(
+ ScalarExpr::Create(tsink_->table_sink.hdfs_table_sink.partition_key_exprs,
+ *input_row_desc_, state, &partition_key_exprs_));
+ return Status::OK();
+}
+
+DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ TDataSinkId sink_id = fragment_ctx.fragment.idx;
+ return state->obj_pool()->Add(
+ new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
+}
+
+HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+ const THdfsTableSink& hdfs_sink, RuntimeState* state)
+ : DataSink(sink_id, sink_config, "HdfsTableSink", state),
table_desc_(nullptr),
prototype_partition_(nullptr),
- table_id_(tsink.table_sink.target_table_id),
+ table_id_(sink_config.tsink_->table_sink.target_table_id),
skip_header_line_count_(
- tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ?
- tsink.table_sink.hdfs_table_sink.skip_header_line_count :
- 0),
- overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
- input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
- sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns),
- current_clustered_partition_(nullptr) {
- DCHECK(tsink.__isset.table_sink);
- if (tsink.table_sink.hdfs_table_sink.__isset.write_id) {
- write_id_ = tsink.table_sink.hdfs_table_sink.write_id;
+ hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0),
+ overwrite_(hdfs_sink.overwrite),
+ input_is_clustered_(hdfs_sink.input_is_clustered),
+ sort_columns_(hdfs_sink.sort_columns),
+ current_clustered_partition_(nullptr),
+ partition_key_exprs_(sink_config.partition_key_exprs_) {
+ if (hdfs_sink.__isset.write_id) {
+ write_id_ = hdfs_sink.write_id;
DCHECK_GT(write_id_, 0);
}
}
@@ -82,15 +98,6 @@
partition_descriptor(nullptr),
block_size(0) {}
-Status HdfsTableSink::Init(const vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) {
- RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state));
- DCHECK(tsink.__isset.table_sink);
- RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs,
- *row_desc_, state, &partition_key_exprs_));
- return Status::OK();
-}
-
Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 81d5d40..2e790fd 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -94,6 +94,22 @@
OutputPartition();
};
+class HdfsTableSinkConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ /// Expressions for computing the target partitions to which a row is written.
+ std::vector<ScalarExpr*> partition_key_exprs_;
+
+ ~HdfsTableSinkConfig() override {}
+
+ protected:
+ Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+ RuntimeState* state) override;
+};
+
/// The sink consumes all row batches of its child execution tree, and writes the
/// evaluated output_exprs into temporary Hdfs files. The query coordinator moves the
/// temporary files into their final locations after the sinks have finished executing.
@@ -134,8 +150,8 @@
/// <table base dir>/<partition dirs>/<ACID base or delta directory>
class HdfsTableSink : public DataSink {
public:
- HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state);
+ HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+ const THdfsTableSink& hdfs_sink, RuntimeState* state);
/// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
@@ -167,10 +183,6 @@
std::string DebugString() const;
- protected:
- virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) WARN_UNUSED_RESULT;
-
private:
/// Initialises the filenames of a given output partition, and opens the temporary file.
/// The partition key is derived from 'row'. If the partition will not have any rows
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index d4113e6..1372476 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -66,14 +66,20 @@
// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
-KuduTableSink::KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state)
- : DataSink(sink_id, row_desc, "KuduTableSink", state),
- table_id_(tsink.table_sink.target_table_id),
- sink_action_(tsink.table_sink.action),
- kudu_table_sink_(tsink.table_sink.kudu_table_sink),
+DataSink* KuduTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ TDataSinkId sink_id = fragment_ctx.fragment.idx;
+ return state->obj_pool()->Add(
+ new KuduTableSink(sink_id, *this, tsink_->table_sink, state));
+}
+
+KuduTableSink::KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+ const TTableSink& table_sink, RuntimeState* state)
+ : DataSink(sink_id, sink_config, "KuduTableSink", state),
+ table_id_(table_sink.target_table_id),
+ sink_action_(table_sink.action),
+ kudu_table_sink_(table_sink.kudu_table_sink),
client_tracked_bytes_(0) {
- DCHECK(tsink.__isset.table_sink);
DCHECK(KuduIsAvailable());
}
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 2496640..5cde878 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -30,6 +30,15 @@
namespace impala {
+class KuduTableSinkConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ ~KuduTableSinkConfig() override {}
+};
+
/// Sink that takes RowBatches and writes them into a Kudu table.
///
/// The data is added to Kudu in Send(). The Kudu client is configured to automatically
@@ -53,8 +62,8 @@
/// status. All reported errors (ignored or not) will be logged via the RuntimeState.
class KuduTableSink : public DataSink {
public:
- KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
- const TDataSink& tsink, RuntimeState* state);
+ KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+ const TTableSink& table_sink, RuntimeState* state);
/// Prepares the expressions to be applied and creates a KuduSchema based on the
/// expressions and KuduTableDescriptor.
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index a521fa9..4932a67 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -27,9 +27,17 @@
using namespace impala;
-NljBuilder::NljBuilder(const RowDescriptor* row_desc, RuntimeState* state)
- : DataSink(-1, row_desc, "Nested Loop Join Builder", state),
- build_batch_cache_(row_desc, state->batch_size()) {}
+NljBuilder* NljBuilder::CreateSink(const RowDescriptor* row_desc, RuntimeState* state) {
+ ObjectPool* pool = state->obj_pool();
+ DataSinkConfig* sink_config = pool->Add(new NljBuilderConfig());
+ sink_config->tsink_ = pool->Add(new TDataSink());
+ sink_config->input_row_desc_ = row_desc;
+ return pool->Add(new NljBuilder(*sink_config, state));
+}
+
+NljBuilder::NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state)
+ : DataSink(-1, sink_config, "Nested Loop Join Builder", state),
+ build_batch_cache_(row_desc_, state->batch_size()) {}
Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index 8a03d1f..bdc3eb0 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -26,6 +26,19 @@
namespace impala {
+/// Dummy class needed to create an instance of the sink.
+class NljBuilderConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override {
+ DCHECK(false) << "Not Implemented";
+ return nullptr;
+ }
+
+ ~NljBuilderConfig() override {}
+};
+
/// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
/// Implements the DataSink interface but also exposes some methods for direct use by
/// NestedLoopJoinNode.
@@ -37,7 +50,9 @@
/// is used and all data is deep copied into memory owned by the builder.
class NljBuilder : public DataSink {
public:
- NljBuilder(const RowDescriptor* row_desc, RuntimeState* state);
+
+ /// To be used by the NestedLoopJoinNode to create an instance of this sink.
+ static NljBuilder* CreateSink(const RowDescriptor* row_desc, RuntimeState* state);
/// Implementations of DataSink interface methods.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
@@ -76,6 +91,8 @@
inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
private:
+ NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state);
+
/// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'.
/// Resets all the source batches and clears 'input_build_batches_'.
/// If the memory limit is exceeded while copying batches, returns a
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 6be017e..c4d8379 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -95,8 +95,7 @@
DCHECK_EQ(builder_->copied_build_batches()->total_num_rows(), 0);
build_batches_ = builder_->input_build_batches();
} else {
- RETURN_IF_ERROR(
- BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+ RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
build_batches_ = builder_->GetFinalBuildBatches();
if (matching_build_rows_ != NULL) {
RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
@@ -112,7 +111,7 @@
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
- builder_.reset(new NljBuilder(child(1)->row_desc(), state));
+ builder_ = NljBuilder::CreateSink(child(1)->row_desc(), state);
RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
runtime_profile()->PrependChild(builder_->profile());
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index e3cf2b3..d117ff8 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -66,8 +66,8 @@
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
- /// The build side rows of the join.
- boost::scoped_ptr<NljBuilder> builder_;
+ /// The build side rows of the join. Created in Prepare() and owned by runtime state.
+ NljBuilder* builder_;
/// Pointer to the RowBatchList (owned by 'builder_') that contains the batches to
/// use during the probe phase.
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a865e8f..6df580d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -49,29 +49,45 @@
using namespace impala;
using strings::Substitute;
-const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+Status PhjBuilderConfig::Init(
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ return Status("Not Implemented.");
+}
-PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
- TJoinOp::type join_op, const RowDescriptor* build_row_desc, RuntimeState* state,
- BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
- int64_t max_row_buffer_size)
- : DataSink(-1, build_row_desc,
- Substitute("Hash Join Builder (join_node_id=$0)", join_node_id), state),
- runtime_state_(state),
- join_node_id_(join_node_id),
- join_node_label_(join_node_label),
- join_op_(join_op),
- buffer_pool_client_(buffer_pool_client),
- spillable_buffer_size_(spillable_buffer_size),
- max_row_buffer_size_(max_row_buffer_size) {}
+DataSink* PhjBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ DCHECK(false) << "Not Implemented";
+ return nullptr;
+}
-Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
+PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+ const std::string& join_node_label, int64_t spillable_buffer_size,
+ int64_t max_row_buffer_size, RuntimeState* state) const {
+ ObjectPool* pool = state->obj_pool();
+ return pool->Add(new PhjBuilder(*this, buffer_pool_client, join_node_label,
+ spillable_buffer_size, max_row_buffer_size, state));
+}
+
+Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
+ TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink) {
+ ObjectPool* pool = state->obj_pool();
+ TDataSink* tsink = pool->Add(new TDataSink()); // just a dummy object.
+ PhjBuilderConfig* data_sink = pool->Add(new PhjBuilderConfig());
+ RETURN_IF_ERROR(data_sink->Init(
+ state, *tsink, join_node_id, join_op, build_row_desc, eq_join_conjuncts, filters));
+ *sink = data_sink;
+ return Status::OK();
+}
+
+Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
const vector<TEqJoinCondition>& eq_join_conjuncts,
const vector<TRuntimeFilterDesc>& filter_descs) {
for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
ScalarExpr* build_expr;
RETURN_IF_ERROR(
- ScalarExpr::Create(eq_join_conjunct.right, *row_desc_, state, &build_expr));
+ ScalarExpr::Create(eq_join_conjunct.right, *input_row_desc_, state, &build_expr));
build_exprs_.push_back(build_expr);
is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
}
@@ -92,17 +108,47 @@
if (it == filters_produced.end()) continue;
ScalarExpr* filter_expr;
RETURN_IF_ERROR(
- ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr));
+ ScalarExpr::Create(filter_desc.src_expr, *input_row_desc_, state, &filter_expr));
filter_exprs_.push_back(filter_expr);
-
- // TODO: Move to Prepare().
- filter_ctxs_.emplace_back();
- // TODO: IMPALA-4400 - implement local aggregation of runtime filters.
- filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true);
+ filter_descs_.push_back(filter_desc);
}
return Status::OK();
}
+Status PhjBuilderConfig::Init(RuntimeState* state, const TDataSink& tsink,
+ int join_node_id, TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters) {
+ RETURN_IF_ERROR(DataSinkConfig::Init(tsink, build_row_desc, state));
+ join_node_id_ = join_node_id;
+ join_op_ = join_op;
+ return InitExprsAndFilters(state, eq_join_conjuncts, filters);
+}
+
+const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+
+PhjBuilder::PhjBuilder(const PhjBuilderConfig& sink_config,
+ BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+ int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state)
+ : DataSink(-1, sink_config,
+ Substitute("Hash Join Builder (join_node_id=$0)", sink_config.join_node_id_),
+ state),
+ runtime_state_(state),
+ join_node_id_(sink_config.join_node_id_),
+ join_node_label_(join_node_label),
+ join_op_(sink_config.join_op_),
+ buffer_pool_client_(buffer_pool_client),
+ spillable_buffer_size_(spillable_buffer_size),
+ max_row_buffer_size_(max_row_buffer_size),
+ build_exprs_(sink_config.build_exprs_),
+ is_not_distinct_from_(sink_config.is_not_distinct_from_),
+ filter_exprs_(sink_config.filter_exprs_) {
+ for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) {
+ filter_ctxs_.emplace_back();
+ filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true);
+ }
+}
+
Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, build_exprs_, build_exprs_,
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 9ac23b1..0dec0b8 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -35,11 +35,76 @@
namespace impala {
+class PhjBuilder;
class RowDescriptor;
class RuntimeState;
class ScalarExpr;
class ScalarExprEvaluator;
+/// Partitioned Hash Join Builder Config class. This has a few extra methods to be used
+/// directly by the PartitionedHashJoinPlanNode. Since it is expected to only be created
+/// and used by PartitionedHashJoinPlanNode only, the DataSinkConfig::Init() and
+/// DataSinkConfig::CreateSink() are not implemented for it.
+class PhjBuilderConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ /// Creates an instance of PhjBuilder data sink in the state's object pool. To be used
+ /// only by PartitionedHashJoinPlanNode.
+ PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+ const std::string& join_node_label, int64_t spillable_buffer_size,
+ int64_t max_row_buffer_size, RuntimeState* state) const;
+
+ /// Creates an instance of this class in the state's object pool. To be used only by
+ /// PartitionedHashJoinPlanNode.
+ static Status CreateConfig(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
+ const RowDescriptor* build_row_desc,
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink);
+
+ ~PhjBuilderConfig() override {}
+
+ /// The ID of the plan join node this is associated with.
+ int join_node_id_;
+
+ /// The join operation this is building for.
+ TJoinOp::type join_op_;
+
+ /// Expressions over input rows for hash table build.
+ std::vector<ScalarExpr*> build_exprs_;
+
+ /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
+ /// NOT DISTINCT FROM, rather than equality.
+ /// Set in InitExprsAndFilters() and constant thereafter.
+ std::vector<bool> is_not_distinct_from_;
+
+ /// Expressions for evaluating input rows for insertion into runtime filters.
+ /// Only includes exprs for filters produced by this builder.
+ std::vector<ScalarExpr*> filter_exprs_;
+
+ /// The runtime filter descriptors of filters produced by this builder.
+ vector<TRuntimeFilterDesc> filter_descs_;
+
+ protected:
+ Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+ RuntimeState* state) override;
+
+ private:
+ /// Helper method used by CreateConfig()
+ Status Init(RuntimeState* state, const TDataSink& tsink, int join_node_id,
+ TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters);
+
+ /// Initializes the build and filter expressions and creates a copy of the filter
+ /// descriptors that will be generated by this sink.
+ Status InitExprsAndFilters(RuntimeState* state,
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters);
+};
+
/// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how
/// these states fit in it.
enum class HashJoinState {
@@ -114,14 +179,9 @@
using PartitionId = int;
- PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
- const RowDescriptor* build_row_desc, RuntimeState* state,
- BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
- int64_t max_row_buffer_size);
-
- Status InitExprsAndFilters(RuntimeState* state,
- const std::vector<TEqJoinCondition>& eq_join_conjuncts,
- const std::vector<TRuntimeFilterDesc>& filters) WARN_UNUSED_RESULT;
+ PhjBuilder(const PhjBuilderConfig& sink_config,
+ BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+ int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state);
/// Implementations of DataSink interface methods.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 737a963..a9fe4e9 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -76,24 +76,34 @@
full_row_desc, state, &other_join_conjuncts_));
DCHECK(tnode.hash_join_node.join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
|| eq_join_conjuncts.size() == 1);
+
+ RETURN_IF_ERROR(PhjBuilderConfig::CreateConfig(state, tnode_->node_id,
+ tnode_->hash_join_node.join_op, children_[1]->row_descriptor_, eq_join_conjuncts,
+ tnode_->runtime_filters, &phj_builder_config));
return Status::OK();
}
Status PartitionedHashJoinPlanNode::CreateExecNode(
RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
- *node = pool->Add(new PartitionedHashJoinNode(pool, *this, state->desc_tbl()));
+ *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));
return Status::OK();
}
-PartitionedHashJoinNode::PartitionedHashJoinNode(ObjectPool* pool,
+PartitionedHashJoinNode::PartitionedHashJoinNode(RuntimeState* state,
const PartitionedHashJoinPlanNode& pnode, const DescriptorTbl& descs)
: BlockingJoinNode("PartitionedHashJoinNode", pnode.tnode_->hash_join_node.join_op,
- pool, pnode, descs) {
+ state->obj_pool(), pnode, descs),
+ build_exprs_(pnode.build_exprs_),
+ probe_exprs_(pnode.probe_exprs_),
+ other_join_conjuncts_(pnode.other_join_conjuncts_) {
memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
- build_exprs_ = pnode.build_exprs_;
- probe_exprs_ = pnode.probe_exprs_;
- other_join_conjuncts_ = pnode.other_join_conjuncts_;
+ // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+ // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+ // being separated out further.
+ builder_ = pnode.phj_builder_config->CreateSink(buffer_pool_client(), label(),
+ resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
+ state);
}
PartitionedHashJoinNode::~PartitionedHashJoinNode() {
@@ -106,21 +116,6 @@
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
runtime_state_ = state;
- const vector<TEqJoinCondition>& eq_join_conjuncts =
- plan_node_.tnode_->hash_join_node.eq_join_conjuncts;
- // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
- // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
- // being separated out further.
- // TODO: Move the builder creation on Expr Init into the constructor once the PlanRoot
- // Equivalent of a sink is implemented. build_exprs_ and filter_exprs can be passed
- // directly from those generated in Phj node, only thing left to do is register the
- // filters.
- builder_.reset(new PhjBuilder(id(), label(), join_op_, child(1)->row_desc(), state,
- buffer_pool_client(), resource_profile_.spillable_buffer_size,
- resource_profile_.max_row_buffer_size));
- RETURN_IF_ERROR(builder_->InitExprsAndFilters(
- state, eq_join_conjuncts, plan_node_.tnode_->runtime_filters));
-
RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
runtime_profile()->PrependChild(builder_->profile());
@@ -186,7 +181,7 @@
// are cleared in QueryMaintenance().
probe_expr_results_pool_->Clear();
- RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+ RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
build_hash_partitions_ = builder_->BeginInitialProbe(buffer_pool_client());
RETURN_IF_ERROR(PrepareForPartitionedProbe());
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 38c274f..9c6f16b 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -52,6 +52,10 @@
/// Non-equi-join conjuncts from the ON clause.
std::vector<ScalarExpr*> other_join_conjuncts_;
+
+ /// Data sink config object for creating a phj builder that will be eventually used by
+ /// the exec node.
+ const PhjBuilderConfig* phj_builder_config;
};
/// Operator to perform partitioned hash join, spilling to disk as necessary. This
@@ -128,7 +132,7 @@
class PartitionedHashJoinNode : public BlockingJoinNode {
public:
- PartitionedHashJoinNode(ObjectPool* pool, const PartitionedHashJoinPlanNode& pnode,
+ PartitionedHashJoinNode(RuntimeState* state, const PartitionedHashJoinPlanNode& pnode,
const DescriptorTbl& descs);
virtual ~PartitionedHashJoinNode();
@@ -552,8 +556,8 @@
/// State of the probing algorithm. Used to drive the state machine in GetNext().
ProbeState probe_state_ = ProbeState::PROBE_COMPLETE;
- /// The build-side of the join. Initialized in Prepare().
- boost::scoped_ptr<PhjBuilder> builder_;
+ /// The build-side of the join. Initialized in constructor and owned by runtime state.
+ PhjBuilder* builder_;
/// Last set of hash partitions obtained from builder_. Only valid when the
/// builder's state is PARTITIONING_PROBE or REPARTITIONING_PROBE.
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 45368f9..bce6324 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -17,6 +17,8 @@
#include "exec/plan-root-sink.h"
+#include "exec/buffered-plan-root-sink.h"
+#include "exec/blocking-plan-root-sink.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/row-batch.h"
@@ -33,9 +35,21 @@
namespace impala {
+DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ TDataSinkId sink_id = fragment_ctx.fragment.idx;
+ ObjectPool* pool = state->obj_pool();
+ if (state->query_options().spool_query_results) {
+ return pool->Add(new BufferedPlanRootSink(
+ sink_id, *this, state, fragment_instance_ctx.debug_options));
+ } else {
+ return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
+ }
+}
+
PlanRootSink::PlanRootSink(
- TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
- : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+ : DataSink(sink_id, sink_config, "PLAN_ROOT_SINK", state),
num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
PlanRootSink::~PlanRootSink() {}
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index ad4dc3e..ecc03aa 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -27,6 +27,15 @@
class QueryResultSet;
class ScalarExprEvaluator;
+class PlanRootSinkConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ ~PlanRootSinkConfig() override {}
+};
+
/// Sink which manages the handoff between a 'sender' (a fragment instance) that produces
/// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows
/// formed by computing a set of output expressions over the input batches, by calling
@@ -54,7 +63,8 @@
/// ensures that this outlives any calls to Send() and GetNext(), respectively.
class PlanRootSink : public DataSink {
public:
- PlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+ PlanRootSink(
+ TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
virtual ~PlanRootSink();
/// Called before Send(), Open(), or Close(). Performs any additional setup necessary,
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 3121ee9..90bbf48 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -526,20 +526,16 @@
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
const TDataSink& sink = GetSink(partition_type);
+ const DataSinkConfig* data_sink = nullptr;
+ EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
// We create an object of the base class DataSink and cast to the appropriate sender
// according to the 'is_thrift' option.
scoped_ptr<DataSink> sender;
- TExprNode expr_node;
- expr_node.node_type = TExprNodeType::SLOT_REF;
- TExpr output_exprs;
- output_exprs.nodes.push_back(expr_node);
-
- sender.reset(new KrpcDataStreamSender(-1,
- sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
- EXPECT_OK(static_cast<KrpcDataStreamSender*>(
- sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+ sender.reset(new KrpcDataStreamSender(-1, sender_num,
+ *(static_cast<const KrpcDataStreamSenderConfig*>(data_sink)),
+ data_sink->tsink_->stream_sink, dest_, channel_buffer_size, &state));
EXPECT_OK(sender->Prepare(&state, &tracker_));
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 02bc9cf..bc34d17 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -215,8 +215,10 @@
// prepare sink_
DCHECK(fragment_ctx_.fragment.__isset.output_sink);
- RETURN_IF_ERROR(DataSink::Create(fragment_ctx_, instance_ctx_, exec_tree_->row_desc(),
- runtime_state_, &sink_));
+ const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
+ RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
+ thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
+ sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
RuntimeProfile* sink_profile = sink_->profile();
if (sink_profile != nullptr) profile()->AddChild(sink_profile);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 7b059c6..69e081b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -51,6 +51,7 @@
class PlanRootSink;
class Thread;
class DataSink;
+class DataSinkConfig;
class RuntimeState;
/// FragmentInstanceState handles all aspects of the execution of a single plan fragment
@@ -154,6 +155,7 @@
RuntimeState* runtime_state_ = nullptr; // lives in obj_pool()
/// Lives in obj_pool(). Not mutated after being initialized.
const PlanNode* plan_tree_ = nullptr;
+ const DataSinkConfig* sink_config_ = nullptr;
/// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
/// There should be only one thread doing status report at the same time.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index f1a9951..91cac2b 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -55,6 +55,10 @@
#include "common/names.h"
+DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
+ "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
+ "can accumulate before the row batch is sent over the wire.");
+
using std::condition_variable_any;
using namespace apache::thrift;
using kudu::rpc::RpcController;
@@ -71,6 +75,29 @@
const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender";
const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
+Status KrpcDataStreamSenderConfig::Init(
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+ DCHECK(tsink_->__isset.stream_sink);
+ auto& partition_type = tsink_->stream_sink.output_partition.type;
+ if (partition_type == TPartitionType::HASH_PARTITIONED
+ || partition_type == TPartitionType::KUDU) {
+ RETURN_IF_ERROR(
+ ScalarExpr::Create(tsink_->stream_sink.output_partition.partition_exprs,
+ *input_row_desc_, state, &partition_exprs_));
+ }
+ return Status::OK();
+}
+
+DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+ // We have one fragment per sink, so we can use the fragment index as the sink ID.
+ TDataSinkId sink_id = fragment_ctx.fragment.idx;
+ return state->obj_pool()->Add(new KrpcDataStreamSender(sink_id,
+ fragment_instance_ctx.sender_id, *this, tsink_->stream_sink,
+ fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
+}
+
// A datastream sender may send row batches to multiple destinations. There is one
// channel for each destination.
//
@@ -669,14 +696,15 @@
}
KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
- const RowDescriptor* row_desc, const TDataStreamSink& sink,
- const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size,
- RuntimeState* state)
- : DataSink(sink_id, row_desc,
+ const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size, RuntimeState* state)
+ : DataSink(sink_id, sink_config,
Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state),
sender_id_(sender_id),
partition_type_(sink.output_partition.type),
per_channel_buffer_size_(per_channel_buffer_size),
+ partition_exprs_(sink_config.partition_exprs_),
dest_node_id_(sink.dest_node_id),
next_unknown_partition_(0) {
DCHECK_GT(destinations.size(), 0);
@@ -687,16 +715,17 @@
for (int i = 0; i < destinations.size(); ++i) {
channels_.push_back(
- new Channel(this, row_desc, destinations[i].thrift_backend.hostname,
+ new Channel(this, row_desc_, destinations[i].thrift_backend.hostname,
destinations[i].krpc_backend, destinations[i].fragment_instance_id,
sink.dest_node_id, per_channel_buffer_size));
}
- if (partition_type_ == TPartitionType::UNPARTITIONED ||
- partition_type_ == TPartitionType::RANDOM) {
+ if (partition_type_ == TPartitionType::UNPARTITIONED
+ || partition_type_ == TPartitionType::RANDOM) {
// Randomize the order we open/transmit to channels to avoid thundering herd problems.
random_shuffle(channels_.begin(), channels_.end());
}
+
}
KrpcDataStreamSender::~KrpcDataStreamSender() {
@@ -707,18 +736,6 @@
}
}
-Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) {
- SCOPED_TIMER(profile_->total_time_counter());
- DCHECK(tsink.__isset.stream_sink);
- if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
- partition_type_ == TPartitionType::KUDU) {
- RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
- *row_desc_, state, &partition_exprs_));
- }
- return Status::OK();
-}
-
Status KrpcDataStreamSender::Prepare(
RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index d0f9848..0e5d6d6 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -39,6 +39,23 @@
class TNetworkAddress;
class TPlanFragmentDestination;
+class KrpcDataStreamSenderConfig : public DataSinkConfig {
+ public:
+ DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+ RuntimeState* state) const override;
+
+ /// Expressions of partition keys. It's used to compute the
+ /// per-row partition values for shuffling exchange;
+ std::vector<ScalarExpr*> partition_exprs_;
+
+ ~KrpcDataStreamSenderConfig() override {}
+
+ protected:
+ Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+ RuntimeState* state) override;
+};
+
/// Single sender of an m:n data stream.
///
/// Row batch data is routed to destinations based on the provided partitioning
@@ -47,21 +64,19 @@
///
/// TODO: capture stats that describe distribution of rows/data volume
/// across channels.
-/// TODO: create a PlanNode equivalent class for DataSink.
class KrpcDataStreamSender : public DataSink {
public:
- /// Constructs a sender according to the output specification (tsink), sending to the
+ /// Constructs a sender according to the config (sink_config), sending to the
/// given destinations:
/// 'sender_id' identifies this sender instance, and is unique within a fragment.
- /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
/// 'destinations' are the receivers' network addresses. There is one channel for each
/// destination.
/// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the
/// per-channel's accumulating row batch before it will be sent.
/// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
/// and RANDOM.
- KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, const RowDescriptor* row_desc,
- const TDataStreamSink& tsink,
+ KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
+ const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, RuntimeState* state);
@@ -102,11 +117,6 @@
protected:
friend class DataStreamTest;
- /// Initializes any partitioning expressions based on 'thrift_output_exprs' and stores
- /// them in 'partition_exprs_'. Returns error status if the initialization failed.
- virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) override;
-
/// Returns total number of bytes sent. If batches are broadcast to multiple receivers,
/// they are counted once per receiver.
int64_t GetNumDataBytesSent() const;