IMPALA-4192: Move static state from DataSink into a DataSinkConfig

This patch adds a new class called DataSinkConfig which contains a
subset of the static state of their corresponding DataSink, of
which there is one instance per fragment. DataSink contains the
runtime state and there can be up to MT_DOP instances of it per
fragment.
Eventually all static state including codegened function pointers
would be moved to the PlanNodes.

Testing:
Ran exhaustive tests successfully.

Change-Id: I8d5b4226f6cec5305b0ec9a25c5f18b5521c8dd2
Reviewed-on: http://gerrit.cloudera.org:8080/14941
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index c34a968..fcff3fd 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -33,8 +33,8 @@
 namespace impala {
 
 BlockingPlanRootSink::BlockingPlanRootSink(
-    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : PlanRootSink(sink_id, row_desc, state) {}
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : PlanRootSink(sink_id, sink_config, state) {}
 
 Status BlockingPlanRootSink::Prepare(
     RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index 38e4439..0a5b849 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -48,7 +48,7 @@
 class BlockingPlanRootSink : public PlanRootSink {
  public:
   BlockingPlanRootSink(
-      TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
 
   /// TODO: Currently, this does nothing, it just calls DataSink::Prepare. However, adding
   /// it is necessary because BufferedPlanRootSink needs to use PlanRootSink::Prepare.
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 6427bf6..c365f69 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -29,10 +29,10 @@
 const int FETCH_NUM_BATCHES = 10;
 
 BufferedPlanRootSink::BufferedPlanRootSink(TDataSinkId sink_id,
-    const RowDescriptor* row_desc, RuntimeState* state,
-    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
-  : PlanRootSink(sink_id, row_desc, state),
-    resource_profile_(resource_profile),
+    const DataSinkConfig& sink_config, RuntimeState* state,
+    const TDebugOptions& debug_options)
+  : PlanRootSink(sink_id, sink_config, state),
+    resource_profile_(sink_config.tsink_->plan_root_sink.resource_profile),
     debug_options_(debug_options) {}
 
 Status BufferedPlanRootSink::Prepare(
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index c908cda..7be257e 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -43,9 +43,8 @@
 /// synchronize access to the queue.
 class BufferedPlanRootSink : public PlanRootSink {
  public:
-  BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      RuntimeState* state, const TBackendResourceProfile& resource_profile,
-      const TDebugOptions& debug_options);
+  BufferedPlanRootSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      RuntimeState* state, const TDebugOptions& debug_options);
 
   /// Initializes the row_batches_get_wait_timer_ and row_batches_send_wait_timer_
   /// counters.
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index ed3820f..b50d33e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -39,20 +39,81 @@
 
 #include "common/names.h"
 
-DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
-    "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
-    "can accumulate before the row batch is sent over the wire.");
-
 using strings::Substitute;
 
 namespace impala {
 
+Status DataSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  tsink_ = &tsink;
+  input_row_desc_ = input_row_desc;
+  return ScalarExpr::Create(tsink.output_exprs, *input_row_desc_, state, &output_exprs_);
+}
+
+Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
+    const RowDescriptor* row_desc, RuntimeState* state, const DataSinkConfig** sink) {
+  ObjectPool* pool = state->obj_pool();
+  DataSinkConfig* data_sink = nullptr;
+  switch (thrift_sink.type) {
+    case TDataSinkType::DATA_STREAM_SINK:
+      if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
+      // TODO: figure out good buffer size based on size of output row
+      data_sink = pool->Add(new KrpcDataStreamSenderConfig());
+      break;
+    case TDataSinkType::TABLE_SINK:
+      if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
+      switch (thrift_sink.table_sink.type) {
+        case TTableSinkType::HDFS:
+          data_sink = pool->Add(new HdfsTableSinkConfig());
+          break;
+        case TTableSinkType::KUDU:
+          RETURN_IF_ERROR(CheckKuduAvailability());
+          data_sink = pool->Add(new KuduTableSinkConfig());
+          break;
+        case TTableSinkType::HBASE:
+          data_sink = pool->Add(new HBaseTableSinkConfig());
+          break;
+        default:
+          stringstream error_msg;
+          map<int, const char*>::const_iterator i =
+              _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
+          const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
+              i->second :
+              "Unknown table sink";
+          error_msg << str << " not implemented.";
+          return Status(error_msg.str());
+      }
+      break;
+    case TDataSinkType::PLAN_ROOT_SINK:
+      data_sink = pool->Add(new PlanRootSinkConfig());
+      break;
+    case TDataSinkType::JOIN_BUILD_SINK:
+    // IMPALA-4224 - join build sink not supported in backend execution.
+    default:
+      stringstream error_msg;
+      map<int, const char*>::const_iterator i =
+          _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
+      const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
+          i->second :
+          "Unknown data sink type ";
+      error_msg << str << " not implemented.";
+      return Status(error_msg.str());
+  }
+  RETURN_IF_ERROR(data_sink->Init(thrift_sink, row_desc, state));
+  *sink = data_sink;
+  return Status::OK();
+}
+
 // Empty string
 const char* const DataSink::ROOT_PARTITION_KEY = "";
 
-DataSink::DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
-    RuntimeState* state)
-  : closed_(false), row_desc_(row_desc), name_(name) {
+DataSink::DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+    const string& name, RuntimeState* state)
+  : sink_config_(sink_config),
+    closed_(false),
+    row_desc_(sink_config.input_row_desc_),
+    name_(name),
+    output_exprs_(sink_config.output_exprs_) {
   profile_ = RuntimeProfile::Create(state->obj_pool(), name);
   if (sink_id != -1) {
     // There is one sink per fragment so we can use the fragment index as a unique
@@ -65,77 +126,6 @@
   DCHECK(closed_);
 }
 
-Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor* row_desc,
-    RuntimeState* state, DataSink** sink) {
-  const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
-  const vector<TExpr>& thrift_output_exprs = thrift_sink.output_exprs;
-  ObjectPool* pool = state->obj_pool();
-  // We have one fragment per sink, so we can use the fragment index as the sink ID.
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
-  switch (thrift_sink.type) {
-    case TDataSinkType::DATA_STREAM_SINK:
-      if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
-      // TODO: figure out good buffer size based on size of output row
-      *sink = pool->Add(new KrpcDataStreamSender(sink_id,
-          fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
-          fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
-      break;
-    case TDataSinkType::TABLE_SINK:
-      if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
-      switch (thrift_sink.table_sink.type) {
-        case TTableSinkType::HDFS:
-          *sink =
-              pool->Add(new HdfsTableSink(sink_id, row_desc, thrift_sink, state));
-          break;
-        case TTableSinkType::HBASE:
-          *sink =
-              pool->Add(new HBaseTableSink(sink_id, row_desc, thrift_sink, state));
-          break;
-        case TTableSinkType::KUDU:
-          RETURN_IF_ERROR(CheckKuduAvailability());
-          *sink =
-              pool->Add(new KuduTableSink(sink_id, row_desc, thrift_sink, state));
-          break;
-        default:
-          stringstream error_msg;
-          map<int, const char*>::const_iterator i =
-              _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
-          const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
-              i->second : "Unknown table sink";
-          error_msg << str << " not implemented.";
-          return Status(error_msg.str());
-      }
-      break;
-    case TDataSinkType::PLAN_ROOT_SINK:
-      if (state->query_options().spool_query_results) {
-        *sink = pool->Add(new BufferedPlanRootSink(sink_id, row_desc, state,
-            fragment_ctx.fragment.output_sink.plan_root_sink.resource_profile,
-            fragment_instance_ctx.debug_options));
-      } else {
-        *sink = pool->Add(new BlockingPlanRootSink(sink_id, row_desc, state));
-      }
-      break;
-    case TDataSinkType::JOIN_BUILD_SINK:
-      // IMPALA-4224 - join build sink not supported in backend execution.
-    default:
-      stringstream error_msg;
-      map<int, const char*>::const_iterator i =
-          _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
-      const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
-          i->second :  "Unknown data sink type ";
-      error_msg << str << " not implemented.";
-      return Status(error_msg.str());
-  }
-  RETURN_IF_ERROR((*sink)->Init(thrift_output_exprs, thrift_sink, state));
-  return Status::OK();
-}
-
-Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
-}
-
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != nullptr);
   DCHECK(profile_ != nullptr);
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 60968c4..1f06625 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,6 +29,7 @@
 
 namespace impala {
 
+class DataSink;
 class MemPool;
 class ObjectPool;
 class RowBatch;
@@ -43,6 +44,46 @@
 class TPlanFragmentInstanceCtx;
 class TInsertStats;
 
+/// Configuration class for creating DataSink objects. It contains a subset of the static
+/// state of their corresponding DataSink, of which there is one instance per fragment.
+/// DataSink contains the runtime state and there can be up to MT_DOP instances of it per
+/// fragment.
+class DataSinkConfig {
+ public:
+  DataSinkConfig() = default;
+  virtual ~DataSinkConfig() {}
+
+  /// Create its corresponding DataSink. Place the sink in state->obj_pool().
+  virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
+
+  /// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
+  /// owned by QueryState.
+  const TDataSink* tsink_ = nullptr;
+
+  /// The row descriptor for the rows consumed by the sink. Owned by root plan node of
+  /// plan tree, which feeds into this sink. Set in Init().
+  const RowDescriptor* input_row_desc_ = nullptr;
+
+  /// Output expressions to convert row batches onto output values.
+  /// Not used in some sub-classes.
+  std::vector<ScalarExpr*> output_exprs_;
+
+  /// Creates a new data sink config, allocated in state->obj_pool() and returned through
+  /// *sink, from the thrift sink object in fragment_ctx.
+  static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
+      RuntimeState* state, const DataSinkConfig** sink);
+
+ protected:
+  /// Sets reference to TDataSink and initializes the expressions. Returns error status on
+  /// failure. If overridden in subclass, must first call superclass's Init().
+  virtual Status Init(
+      const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(DataSinkConfig);
+};
+
 /// A data sink is an abstract interface for data sinks that consume RowBatches. E.g.
 /// a sink may write a HDFS table, send data across the network, or build hash tables
 /// for a join.
@@ -57,7 +98,7 @@
   /// If this is the sink at the root of a fragment, 'sink_id' must be a unique ID for
   /// the sink for use in runtime profiles and other purposes. Otherwise this is a join
   /// build sink owned by an ExecNode and 'sink_id' must be -1.
-  DataSink(TDataSinkId sink_id, const RowDescriptor* row_desc, const string& name,
+  DataSink(TDataSinkId sink_id, const DataSinkConfig& sink_config, const string& name,
       RuntimeState* state);
   virtual ~DataSink();
 
@@ -87,12 +128,6 @@
   /// Must be idempotent.
   virtual void Close(RuntimeState* state);
 
-  /// Creates a new data sink, allocated in pool and returned through *sink, from
-  /// thrift_sink.
-  static Status Create(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
-
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
@@ -104,6 +139,9 @@
   static const char* const ROOT_PARTITION_KEY;
 
  protected:
+  /// Reference to the sink configuration shared across fragment instances.
+  const DataSinkConfig& sink_config_;
+
   /// Set to true after Close() has been called. Subclasses should check and set this in
   /// Close().
   bool closed_;
@@ -137,10 +175,6 @@
   /// Not used in some sub-classes.
   std::vector<ScalarExpr*> output_exprs_;
   std::vector<ScalarExprEvaluator*> output_expr_evals_;
-
-  /// Initialize the expressions in the data sink and return error status on failure.
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state);
 };
 } // namespace impala
 #endif
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index c907bcc..129ba25 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -31,15 +31,19 @@
 
 namespace impala {
 
-HBaseTableSink::HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "HBaseTableSink", state),
-    table_id_(tsink.table_sink.target_table_id),
-    table_desc_(NULL),
-    hbase_table_writer_(NULL) {
-  DCHECK(tsink.__isset.table_sink);
+DataSink* HBaseTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state));
 }
 
+HBaseTableSink::HBaseTableSink(
+      TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "HBaseTableSink", state),
+    table_id_(sink_config.tsink_->table_sink.target_table_id),
+    table_desc_(NULL),
+    hbase_table_writer_(NULL) {}
+
 Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   SCOPED_TIMER(profile()->total_time_counter());
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index d973ad4..4d702bd 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -31,12 +31,21 @@
 
 namespace impala {
 
+class HBaseTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~HBaseTableSinkConfig() override {}
+};
+
 /// Class to take row batches and send them to the HBaseTableWriter to
 /// eventually be written into an HBase table.
 class HBaseTableSink : public DataSink {
  public:
-  HBaseTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  HBaseTableSink(
+      TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
   virtual Status Send(RuntimeState* state, RowBatch* batch);
   virtual Status FlushFinal(RuntimeState* state);
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 3e695b4..2674e72 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -53,23 +53,39 @@
 
 namespace impala {
 
-HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "HdfsTableSink", state),
+Status HdfsTableSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.table_sink);
+  DCHECK(tsink_->table_sink.__isset.hdfs_table_sink);
+  RETURN_IF_ERROR(
+      ScalarExpr::Create(tsink_->table_sink.hdfs_table_sink.partition_key_exprs,
+          *input_row_desc_, state, &partition_key_exprs_));
+  return Status::OK();
+}
+
+DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(
+      new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
+}
+
+HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+    const THdfsTableSink& hdfs_sink, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "HdfsTableSink", state),
     table_desc_(nullptr),
     prototype_partition_(nullptr),
-    table_id_(tsink.table_sink.target_table_id),
+    table_id_(sink_config.tsink_->table_sink.target_table_id),
     skip_header_line_count_(
-        tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ?
-            tsink.table_sink.hdfs_table_sink.skip_header_line_count :
-            0),
-    overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
-    input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
-    sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns),
-    current_clustered_partition_(nullptr) {
-  DCHECK(tsink.__isset.table_sink);
-  if (tsink.table_sink.hdfs_table_sink.__isset.write_id) {
-    write_id_ = tsink.table_sink.hdfs_table_sink.write_id;
+        hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0),
+    overwrite_(hdfs_sink.overwrite),
+    input_is_clustered_(hdfs_sink.input_is_clustered),
+    sort_columns_(hdfs_sink.sort_columns),
+    current_clustered_partition_(nullptr),
+    partition_key_exprs_(sink_config.partition_key_exprs_) {
+  if (hdfs_sink.__isset.write_id) {
+    write_id_ = hdfs_sink.write_id;
     DCHECK_GT(write_id_, 0);
   }
 }
@@ -82,15 +98,6 @@
     partition_descriptor(nullptr),
     block_size(0) {}
 
-Status HdfsTableSink::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state));
-  DCHECK(tsink.__isset.table_sink);
-  RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs,
-      *row_desc_, state, &partition_key_exprs_));
-  return Status::OK();
-}
-
 Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 81d5d40..2e790fd 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -94,6 +94,22 @@
   OutputPartition();
 };
 
+class HdfsTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Expressions for computing the target partitions to which a row is written.
+  std::vector<ScalarExpr*> partition_key_exprs_;
+
+  ~HdfsTableSinkConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+};
+
 /// The sink consumes all row batches of its child execution tree, and writes the
 /// evaluated output_exprs into temporary Hdfs files. The query coordinator moves the
 /// temporary files into their final locations after the sinks have finished executing.
@@ -134,8 +150,8 @@
 /// <table base dir>/<partition dirs>/<ACID base or delta directory>
 class HdfsTableSink : public DataSink {
  public:
-  HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
+    const THdfsTableSink& hdfs_sink, RuntimeState* state);
 
   /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
@@ -167,10 +183,6 @@
 
   std::string DebugString() const;
 
- protected:
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state) WARN_UNUSED_RESULT;
-
  private:
   /// Initialises the filenames of a given output partition, and opens the temporary file.
   /// The partition key is derived from 'row'. If the partition will not have any rows
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index d4113e6..1372476 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -66,14 +66,20 @@
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
-KuduTableSink::KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-    const TDataSink& tsink, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "KuduTableSink", state),
-    table_id_(tsink.table_sink.target_table_id),
-    sink_action_(tsink.table_sink.action),
-    kudu_table_sink_(tsink.table_sink.kudu_table_sink),
+DataSink* KuduTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(
+      new KuduTableSink(sink_id, *this, tsink_->table_sink, state));
+}
+
+KuduTableSink::KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      const TTableSink& table_sink, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "KuduTableSink", state),
+    table_id_(table_sink.target_table_id),
+    sink_action_(table_sink.action),
+    kudu_table_sink_(table_sink.kudu_table_sink),
     client_tracked_bytes_(0) {
-  DCHECK(tsink.__isset.table_sink);
   DCHECK(KuduIsAvailable());
 }
 
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 2496640..5cde878 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -30,6 +30,15 @@
 
 namespace impala {
 
+class KuduTableSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~KuduTableSinkConfig() override {}
+};
+
 /// Sink that takes RowBatches and writes them into a Kudu table.
 ///
 /// The data is added to Kudu in Send(). The Kudu client is configured to automatically
@@ -53,8 +62,8 @@
 /// status. All reported errors (ignored or not) will be logged via the RuntimeState.
 class KuduTableSink : public DataSink {
  public:
-  KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      const TDataSink& tsink, RuntimeState* state);
+  KuduTableSink(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      const TTableSink& table_sink, RuntimeState* state);
 
   /// Prepares the expressions to be applied and creates a KuduSchema based on the
   /// expressions and KuduTableDescriptor.
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index a521fa9..4932a67 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -27,9 +27,17 @@
 
 using namespace impala;
 
-NljBuilder::NljBuilder(const RowDescriptor* row_desc, RuntimeState* state)
-  : DataSink(-1, row_desc, "Nested Loop Join Builder", state),
-    build_batch_cache_(row_desc, state->batch_size()) {}
+NljBuilder* NljBuilder::CreateSink(const RowDescriptor* row_desc, RuntimeState* state) {
+  ObjectPool* pool = state->obj_pool();
+  DataSinkConfig* sink_config = pool->Add(new NljBuilderConfig());
+  sink_config->tsink_ = pool->Add(new TDataSink());
+  sink_config->input_row_desc_ = row_desc;
+  return pool->Add(new NljBuilder(*sink_config, state));
+}
+
+NljBuilder::NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(-1, sink_config, "Nested Loop Join Builder", state),
+    build_batch_cache_(row_desc_, state->batch_size()) {}
 
 Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index 8a03d1f..bdc3eb0 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -26,6 +26,19 @@
 
 namespace impala {
 
+/// Dummy class needed to create an instance of the sink.
+class NljBuilderConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override {
+    DCHECK(false) << "Not Implemented";
+    return nullptr;
+  }
+
+  ~NljBuilderConfig() override {}
+};
+
 /// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
 /// Implements the DataSink interface but also exposes some methods for direct use by
 /// NestedLoopJoinNode.
@@ -37,7 +50,9 @@
 /// is used and all data is deep copied into memory owned by the builder.
 class NljBuilder : public DataSink {
  public:
-  NljBuilder(const RowDescriptor* row_desc, RuntimeState* state);
+
+  /// To be used by the NestedLoopJoinNode to create an instance of this sink.
+  static NljBuilder* CreateSink(const RowDescriptor* row_desc, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
@@ -76,6 +91,8 @@
   inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
 
  private:
+  NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state);
+
   /// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'.
   /// Resets all the source batches and clears 'input_build_batches_'.
   /// If the memory limit is exceeded while copying batches, returns a
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 6be017e..c4d8379 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -95,8 +95,7 @@
     DCHECK_EQ(builder_->copied_build_batches()->total_num_rows(), 0);
     build_batches_ = builder_->input_build_batches();
   } else {
-    RETURN_IF_ERROR(
-        BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+    RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
     build_batches_ = builder_->GetFinalBuildBatches();
     if (matching_build_rows_ != NULL) {
       RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
@@ -112,7 +111,7 @@
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
       pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
-  builder_.reset(new NljBuilder(child(1)->row_desc(), state));
+  builder_ = NljBuilder::CreateSink(child(1)->row_desc(), state);
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index e3cf2b3..d117ff8 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -66,8 +66,8 @@
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// The build side rows of the join.
-  boost::scoped_ptr<NljBuilder> builder_;
+  /// The build side rows of the join. Created in Prepare() and owned by runtime state.
+  NljBuilder* builder_;
 
   /// Pointer to the RowBatchList (owned by 'builder_') that contains the batches to
   /// use during the probe phase.
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a865e8f..6df580d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -49,29 +49,45 @@
 using namespace impala;
 using strings::Substitute;
 
-const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+Status PhjBuilderConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  return Status("Not Implemented.");
+}
 
-PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
-    TJoinOp::type join_op, const RowDescriptor* build_row_desc, RuntimeState* state,
-    BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
-    int64_t max_row_buffer_size)
-  : DataSink(-1, build_row_desc,
-        Substitute("Hash Join Builder (join_node_id=$0)", join_node_id), state),
-    runtime_state_(state),
-    join_node_id_(join_node_id),
-    join_node_label_(join_node_label),
-    join_op_(join_op),
-    buffer_pool_client_(buffer_pool_client),
-    spillable_buffer_size_(spillable_buffer_size),
-    max_row_buffer_size_(max_row_buffer_size) {}
+DataSink* PhjBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  DCHECK(false) << "Not Implemented";
+  return nullptr;
+}
 
-Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
+PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+    const std::string& join_node_label, int64_t spillable_buffer_size,
+    int64_t max_row_buffer_size, RuntimeState* state) const {
+  ObjectPool* pool = state->obj_pool();
+  return pool->Add(new PhjBuilder(*this, buffer_pool_client, join_node_label,
+      spillable_buffer_size, max_row_buffer_size, state));
+}
+
+Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
+    TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+    const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+    const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink) {
+  ObjectPool* pool = state->obj_pool();
+  TDataSink* tsink = pool->Add(new TDataSink()); // just a dummy object.
+  PhjBuilderConfig* data_sink = pool->Add(new PhjBuilderConfig());
+  RETURN_IF_ERROR(data_sink->Init(
+      state, *tsink, join_node_id, join_op, build_row_desc, eq_join_conjuncts, filters));
+  *sink = data_sink;
+  return Status::OK();
+}
+
+Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
     const vector<TEqJoinCondition>& eq_join_conjuncts,
     const vector<TRuntimeFilterDesc>& filter_descs) {
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ScalarExpr* build_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(eq_join_conjunct.right, *row_desc_, state, &build_expr));
+        ScalarExpr::Create(eq_join_conjunct.right, *input_row_desc_, state, &build_expr));
     build_exprs_.push_back(build_expr);
     is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
   }
@@ -92,17 +108,47 @@
     if (it == filters_produced.end()) continue;
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr));
+        ScalarExpr::Create(filter_desc.src_expr, *input_row_desc_, state, &filter_expr));
     filter_exprs_.push_back(filter_expr);
-
-    // TODO: Move to Prepare().
-    filter_ctxs_.emplace_back();
-    // TODO: IMPALA-4400 - implement local aggregation of runtime filters.
-    filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true);
+    filter_descs_.push_back(filter_desc);
   }
   return Status::OK();
 }
 
+Status PhjBuilderConfig::Init(RuntimeState* state, const TDataSink& tsink,
+    int join_node_id, TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+    const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+    const std::vector<TRuntimeFilterDesc>& filters) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, build_row_desc, state));
+  join_node_id_ = join_node_id;
+  join_op_ = join_op;
+  return InitExprsAndFilters(state, eq_join_conjuncts, filters);
+}
+
+const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+
+PhjBuilder::PhjBuilder(const PhjBuilderConfig& sink_config,
+    BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+    int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state)
+  : DataSink(-1, sink_config,
+        Substitute("Hash Join Builder (join_node_id=$0)", sink_config.join_node_id_),
+        state),
+    runtime_state_(state),
+    join_node_id_(sink_config.join_node_id_),
+    join_node_label_(join_node_label),
+    join_op_(sink_config.join_op_),
+    buffer_pool_client_(buffer_pool_client),
+    spillable_buffer_size_(spillable_buffer_size),
+    max_row_buffer_size_(max_row_buffer_size),
+    build_exprs_(sink_config.build_exprs_),
+    is_not_distinct_from_(sink_config.is_not_distinct_from_),
+    filter_exprs_(sink_config.filter_exprs_) {
+  for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) {
+    filter_ctxs_.emplace_back();
+    filter_ctxs_.back().filter = state->filter_bank()->RegisterFilter(filter_desc, true);
+  }
+}
+
 Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, build_exprs_, build_exprs_,
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 9ac23b1..0dec0b8 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -35,11 +35,76 @@
 
 namespace impala {
 
+class PhjBuilder;
 class RowDescriptor;
 class RuntimeState;
 class ScalarExpr;
 class ScalarExprEvaluator;
 
+/// Partitioned Hash Join Builder Config class. This has a few extra methods to be used
+/// directly by the PartitionedHashJoinPlanNode. Since it is expected to only be created
+/// and used by PartitionedHashJoinPlanNode only, the DataSinkConfig::Init() and
+/// DataSinkConfig::CreateSink() are not implemented for it.
+class PhjBuilderConfig : public DataSinkConfig {
+ public:
+    DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Creates an instance of PhjBuilder data sink in the state's object pool. To be used
+  /// only by PartitionedHashJoinPlanNode.
+  PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+      const std::string& join_node_label, int64_t spillable_buffer_size,
+      int64_t max_row_buffer_size, RuntimeState* state) const;
+
+  /// Creates an instance of this class in the state's object pool. To be used only by
+  /// PartitionedHashJoinPlanNode.
+  static Status CreateConfig(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
+      const RowDescriptor* build_row_desc,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters, const PhjBuilderConfig** sink);
+
+  ~PhjBuilderConfig() override {}
+
+  /// The ID of the plan join node this is associated with.
+  int join_node_id_;
+
+  /// The join operation this is building for.
+  TJoinOp::type join_op_;
+
+  /// Expressions over input rows for hash table build.
+  std::vector<ScalarExpr*> build_exprs_;
+
+  /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
+  /// NOT DISTINCT FROM, rather than equality.
+  /// Set in InitExprsAndFilters() and constant thereafter.
+  std::vector<bool> is_not_distinct_from_;
+
+  /// Expressions for evaluating input rows for insertion into runtime filters.
+  /// Only includes exprs for filters produced by this builder.
+  std::vector<ScalarExpr*> filter_exprs_;
+
+  /// The runtime filter descriptors of filters produced by this builder.
+  vector<TRuntimeFilterDesc> filter_descs_;
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+
+ private:
+  /// Helper method used by CreateConfig()
+  Status Init(RuntimeState* state, const TDataSink& tsink, int join_node_id,
+      TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters);
+
+  /// Initializes the build and filter expressions and creates a copy of the filter
+  /// descriptors that will be generated by this sink.
+  Status InitExprsAndFilters(RuntimeState* state,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters);
+};
+
 /// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how
 /// these states fit in it.
 enum class HashJoinState {
@@ -114,14 +179,9 @@
 
   using PartitionId = int;
 
-  PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
-      const RowDescriptor* build_row_desc, RuntimeState* state,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
-      int64_t max_row_buffer_size);
-
-  Status InitExprsAndFilters(RuntimeState* state,
-      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
-      const std::vector<TRuntimeFilterDesc>& filters) WARN_UNUSED_RESULT;
+  PhjBuilder(const PhjBuilderConfig& sink_config,
+      BufferPool::ClientHandle* buffer_pool_client, const std::string& join_node_label,
+      int64_t spillable_buffer_size, int64_t max_row_buffer_size, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 737a963..a9fe4e9 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -76,24 +76,34 @@
       full_row_desc, state, &other_join_conjuncts_));
   DCHECK(tnode.hash_join_node.join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
       || eq_join_conjuncts.size() == 1);
+
+  RETURN_IF_ERROR(PhjBuilderConfig::CreateConfig(state, tnode_->node_id,
+      tnode_->hash_join_node.join_op, children_[1]->row_descriptor_, eq_join_conjuncts,
+      tnode_->runtime_filters, &phj_builder_config));
   return Status::OK();
 }
 
 Status PartitionedHashJoinPlanNode::CreateExecNode(
     RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
-  *node = pool->Add(new PartitionedHashJoinNode(pool, *this, state->desc_tbl()));
+  *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));
   return Status::OK();
 }
 
-PartitionedHashJoinNode::PartitionedHashJoinNode(ObjectPool* pool,
+PartitionedHashJoinNode::PartitionedHashJoinNode(RuntimeState* state,
     const PartitionedHashJoinPlanNode& pnode, const DescriptorTbl& descs)
   : BlockingJoinNode("PartitionedHashJoinNode", pnode.tnode_->hash_join_node.join_op,
-        pool, pnode, descs) {
+        state->obj_pool(), pnode, descs),
+    build_exprs_(pnode.build_exprs_),
+    probe_exprs_(pnode.probe_exprs_),
+    other_join_conjuncts_(pnode.other_join_conjuncts_) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
-  build_exprs_ = pnode.build_exprs_;
-  probe_exprs_ = pnode.probe_exprs_;
-  other_join_conjuncts_ = pnode.other_join_conjuncts_;
+  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+  // being separated out further.
+  builder_ = pnode.phj_builder_config->CreateSink(buffer_pool_client(), label(),
+      resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
+      state);
 }
 
 PartitionedHashJoinNode::~PartitionedHashJoinNode() {
@@ -106,21 +116,6 @@
 
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
-  const vector<TEqJoinCondition>& eq_join_conjuncts =
-      plan_node_.tnode_->hash_join_node.eq_join_conjuncts;
-  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
-  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
-  // being separated out further.
-  // TODO: Move the builder creation on Expr Init into the constructor once the PlanRoot
-  // Equivalent of a sink is implemented. build_exprs_ and filter_exprs can be passed
-  // directly from those generated in Phj node, only thing left to do is register the
-  // filters.
-  builder_.reset(new PhjBuilder(id(), label(), join_op_, child(1)->row_desc(), state,
-      buffer_pool_client(), resource_profile_.spillable_buffer_size,
-      resource_profile_.max_row_buffer_size));
-  RETURN_IF_ERROR(builder_->InitExprsAndFilters(
-      state, eq_join_conjuncts, plan_node_.tnode_->runtime_filters));
-
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 
@@ -186,7 +181,7 @@
   // are cleared in QueryMaintenance().
   probe_expr_results_pool_->Clear();
 
-  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
 
   build_hash_partitions_ = builder_->BeginInitialProbe(buffer_pool_client());
   RETURN_IF_ERROR(PrepareForPartitionedProbe());
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 38c274f..9c6f16b 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -52,6 +52,10 @@
 
   /// Non-equi-join conjuncts from the ON clause.
   std::vector<ScalarExpr*> other_join_conjuncts_;
+
+  /// Data sink config object for creating a phj builder that will be eventually used by
+  /// the exec node.
+  const PhjBuilderConfig* phj_builder_config;
 };
 
 /// Operator to perform partitioned hash join, spilling to disk as necessary. This
@@ -128,7 +132,7 @@
 
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
-  PartitionedHashJoinNode(ObjectPool* pool, const PartitionedHashJoinPlanNode& pnode,
+  PartitionedHashJoinNode(RuntimeState* state, const PartitionedHashJoinPlanNode& pnode,
       const DescriptorTbl& descs);
   virtual ~PartitionedHashJoinNode();
 
@@ -552,8 +556,8 @@
   /// State of the probing algorithm. Used to drive the state machine in GetNext().
   ProbeState probe_state_ = ProbeState::PROBE_COMPLETE;
 
-  /// The build-side of the join. Initialized in Prepare().
-  boost::scoped_ptr<PhjBuilder> builder_;
+  /// The build-side of the join. Initialized in constructor and owned by runtime state.
+  PhjBuilder* builder_;
 
   /// Last set of hash partitions obtained from builder_. Only valid when the
   /// builder's state is PARTITIONING_PROBE or REPARTITIONING_PROBE.
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 45368f9..bce6324 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -17,6 +17,8 @@
 
 #include "exec/plan-root-sink.h"
 
+#include "exec/buffered-plan-root-sink.h"
+#include "exec/blocking-plan-root-sink.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/row-batch.h"
@@ -33,9 +35,21 @@
 
 namespace impala {
 
+DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  ObjectPool* pool = state->obj_pool();
+  if (state->query_options().spool_query_results) {
+    return pool->Add(new BufferedPlanRootSink(
+        sink_id, *this, state, fragment_instance_ctx.debug_options));
+  } else {
+    return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
+  }
+}
+
 PlanRootSink::PlanRootSink(
-    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state)
+  : DataSink(sink_id, sink_config, "PLAN_ROOT_SINK", state),
     num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
 
 PlanRootSink::~PlanRootSink() {}
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index ad4dc3e..ecc03aa 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -27,6 +27,15 @@
 class QueryResultSet;
 class ScalarExprEvaluator;
 
+class PlanRootSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  ~PlanRootSinkConfig() override {}
+};
+
 /// Sink which manages the handoff between a 'sender' (a fragment instance) that produces
 /// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows
 /// formed by computing a set of output expressions over the input batches, by calling
@@ -54,7 +63,8 @@
 /// ensures that this outlives any calls to Send() and GetNext(), respectively.
 class PlanRootSink : public DataSink {
  public:
-  PlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+  PlanRootSink(
+    TDataSinkId sink_id, const DataSinkConfig& sink_config, RuntimeState* state);
   virtual ~PlanRootSink();
 
   /// Called before Send(), Open(), or Close(). Performs any additional setup necessary,
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 3121ee9..90bbf48 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -526,20 +526,16 @@
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
+    const DataSinkConfig* data_sink = nullptr;
+    EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
 
     // We create an object of the base class DataSink and cast to the appropriate sender
     // according to the 'is_thrift' option.
     scoped_ptr<DataSink> sender;
 
-    TExprNode expr_node;
-    expr_node.node_type = TExprNodeType::SLOT_REF;
-    TExpr output_exprs;
-    output_exprs.nodes.push_back(expr_node);
-
-    sender.reset(new KrpcDataStreamSender(-1,
-        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-    EXPECT_OK(static_cast<KrpcDataStreamSender*>(
-        sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    sender.reset(new KrpcDataStreamSender(-1, sender_num,
+        *(static_cast<const KrpcDataStreamSenderConfig*>(data_sink)),
+        data_sink->tsink_->stream_sink, dest_, channel_buffer_size, &state));
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 02bc9cf..bc34d17 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -215,8 +215,10 @@
 
   // prepare sink_
   DCHECK(fragment_ctx_.fragment.__isset.output_sink);
-  RETURN_IF_ERROR(DataSink::Create(fragment_ctx_, instance_ctx_, exec_tree_->row_desc(),
-      runtime_state_, &sink_));
+  const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
+  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
+      thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
+  sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
   RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 7b059c6..69e081b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -51,6 +51,7 @@
 class PlanRootSink;
 class Thread;
 class DataSink;
+class DataSinkConfig;
 class RuntimeState;
 
 /// FragmentInstanceState handles all aspects of the execution of a single plan fragment
@@ -154,6 +155,7 @@
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
   /// Lives in obj_pool(). Not mutated after being initialized.
   const PlanNode* plan_tree_ = nullptr;
+  const DataSinkConfig* sink_config_ = nullptr;
 
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index f1a9951..91cac2b 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -55,6 +55,10 @@
 
 #include "common/names.h"
 
+DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
+    "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
+    "can accumulate before the row batch is sent over the wire.");
+
 using std::condition_variable_any;
 using namespace apache::thrift;
 using kudu::rpc::RpcController;
@@ -71,6 +75,29 @@
 const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender";
 const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 
+Status KrpcDataStreamSenderConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.stream_sink);
+  auto& partition_type = tsink_->stream_sink.output_partition.type;
+  if (partition_type == TPartitionType::HASH_PARTITIONED
+      || partition_type == TPartitionType::KUDU) {
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(tsink_->stream_sink.output_partition.partition_exprs,
+            *input_row_desc_, state, &partition_exprs_));
+  }
+  return Status::OK();
+}
+
+DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
+  // We have one fragment per sink, so we can use the fragment index as the sink ID.
+  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  return state->obj_pool()->Add(new KrpcDataStreamSender(sink_id,
+      fragment_instance_ctx.sender_id, *this, tsink_->stream_sink,
+      fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
+}
+
 // A datastream sender may send row batches to multiple destinations. There is one
 // channel for each destination.
 //
@@ -669,14 +696,15 @@
 }
 
 KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
-    const RowDescriptor* row_desc, const TDataStreamSink& sink,
-    const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size,
-    RuntimeState* state)
-  : DataSink(sink_id, row_desc,
+    const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
+    const std::vector<TPlanFragmentDestination>& destinations,
+    int per_channel_buffer_size, RuntimeState* state)
+  : DataSink(sink_id, sink_config,
         Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state),
     sender_id_(sender_id),
     partition_type_(sink.output_partition.type),
     per_channel_buffer_size_(per_channel_buffer_size),
+    partition_exprs_(sink_config.partition_exprs_),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
@@ -687,16 +715,17 @@
 
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
-        new Channel(this, row_desc, destinations[i].thrift_backend.hostname,
+        new Channel(this, row_desc_, destinations[i].thrift_backend.hostname,
             destinations[i].krpc_backend, destinations[i].fragment_instance_id,
             sink.dest_node_id, per_channel_buffer_size));
   }
 
-  if (partition_type_ == TPartitionType::UNPARTITIONED ||
-      partition_type_ == TPartitionType::RANDOM) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || partition_type_ == TPartitionType::RANDOM) {
     // Randomize the order we open/transmit to channels to avoid thundering herd problems.
     random_shuffle(channels_.begin(), channels_.end());
   }
+
 }
 
 KrpcDataStreamSender::~KrpcDataStreamSender() {
@@ -707,18 +736,6 @@
   }
 }
 
-Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  SCOPED_TIMER(profile_->total_time_counter());
-  DCHECK(tsink.__isset.stream_sink);
-  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
-      partition_type_ == TPartitionType::KUDU) {
-    RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
-        *row_desc_, state, &partition_exprs_));
-  }
-  return Status::OK();
-}
-
 Status KrpcDataStreamSender::Prepare(
     RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index d0f9848..0e5d6d6 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -39,6 +39,23 @@
 class TNetworkAddress;
 class TPlanFragmentDestination;
 
+class KrpcDataStreamSenderConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      RuntimeState* state) const override;
+
+  /// Expressions of partition keys. It's used to compute the
+  /// per-row partition values for shuffling exchange;
+  std::vector<ScalarExpr*> partition_exprs_;
+
+  ~KrpcDataStreamSenderConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      RuntimeState* state) override;
+};
+
 /// Single sender of an m:n data stream.
 ///
 /// Row batch data is routed to destinations based on the provided partitioning
@@ -47,21 +64,19 @@
 ///
 /// TODO: capture stats that describe distribution of rows/data volume
 /// across channels.
-/// TODO: create a PlanNode equivalent class for DataSink.
 class KrpcDataStreamSender : public DataSink {
  public:
-  /// Constructs a sender according to the output specification (tsink), sending to the
+  /// Constructs a sender according to the config (sink_config), sending to the
   /// given destinations:
   /// 'sender_id' identifies this sender instance, and is unique within a fragment.
-  /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
   /// 'destinations' are the receivers' network addresses. There is one channel for each
   /// destination.
   /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the
   /// per-channel's accumulating row batch before it will be sent.
   /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
   /// and RANDOM.
-  KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, const RowDescriptor* row_desc,
-      const TDataStreamSink& tsink,
+  KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
+      const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
       const std::vector<TPlanFragmentDestination>& destinations,
       int per_channel_buffer_size, RuntimeState* state);
 
@@ -102,11 +117,6 @@
  protected:
   friend class DataStreamTest;
 
-  /// Initializes any partitioning expressions based on 'thrift_output_exprs' and stores
-  /// them in 'partition_exprs_'. Returns error status if the initialization failed.
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state) override;
-
   /// Returns total number of bytes sent. If batches are broadcast to multiple receivers,
   /// they are counted once per receiver.
   int64_t GetNumDataBytesSent() const;