blob: bdc3eb006ef357da035c5a4b5676521547a492ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_NESTED_LOOP_JOIN_BUILDER_H
#define IMPALA_EXEC_NESTED_LOOP_JOIN_BUILDER_H
#include "exec/blocking-join-node.h"
#include "exec/data-sink.h"
#include "exec/row-batch-cache.h"
#include "exec/row-batch-list.h"
#include "runtime/descriptors.h"
namespace impala {
/// Dummy class needed to create an instance of the sink.
class NljBuilderConfig : public DataSinkConfig {
public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override {
DCHECK(false) << "Not Implemented";
return nullptr;
}
~NljBuilderConfig() override {}
};
/// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
/// Implements the DataSink interface but also exposes some methods for direct use by
/// NestedLoopJoinNode.
///
/// The builder will operate in one of two modes depending on the memory ownership of
/// row batches pulled from the child node on the build side. If the row batches own all
/// tuple memory, the non-copying mode is used and row batches are simply accumulated in
/// the builder. If the batches reference tuple data they do not own, the copying mode
/// is used and all data is deep copied into memory owned by the builder.
class NljBuilder : public DataSink {
public:
/// To be used by the NestedLoopJoinNode to create an instance of this sink.
static NljBuilder* CreateSink(const RowDescriptor* row_desc, RuntimeState* state);
/// Implementations of DataSink interface methods.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
virtual Status Open(RuntimeState* state) override;
virtual Status Send(RuntimeState* state, RowBatch* batch) override;
virtual Status FlushFinal(RuntimeState* state) override;
virtual void Close(RuntimeState* state) override;
/// Reset the builder to the same state as it was in after calling Open().
void Reset();
/// Returns the next build batch that should be filled and passed to AddBuildBatch().
/// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for efficiency.
inline RowBatch* GetNextEmptyBatch() {
return build_batch_cache_.GetNextBatch(mem_tracker_.get());
}
/// Add a batch to the build side. Does not copy the data, so either resources must
/// be owned by the batch (or a later batch), or DeepCopyBuildBatches() must be called
/// before the referenced resources are released.
/// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for efficiency.
inline void AddBuildBatch(RowBatch* batch) { input_build_batches_.AddRowBatch(batch); }
/// Return a pointer to the final list of build batches.
/// Only valid to call after FlushFinal() has been called.
RowBatchList* GetFinalBuildBatches() {
if (copied_build_batches_.total_num_rows() > 0) {
DCHECK_EQ(input_build_batches_.total_num_rows(), 0);
return &copied_build_batches_;
} else {
return &input_build_batches_;
}
}
inline RowBatchList* input_build_batches() { return &input_build_batches_; }
inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
private:
NljBuilder(const DataSinkConfig& sink_config, RuntimeState* state);
/// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'.
/// Resets all the source batches and clears 'input_build_batches_'.
/// If the memory limit is exceeded while copying batches, returns a
/// MEM_LIMIT_EXCEEDED status, sets the query status to MEM_LIMIT_EXCEEDED and leave
/// the row batches to be cleaned up later when the node is closed.
Status DeepCopyBuildBatches(RuntimeState* state);
/// Creates and caches RowBatches for the build side. The RowBatch objects are owned
/// by this cache. The cache helps to avoid creating new RowBatches after a Reset().
RowBatchCache build_batch_cache_;
/// List of the input build batches we obtained from the child, which may reference
/// memory that is owned by the child node.
RowBatchList input_build_batches_;
/// List of build batches that were deep copied from 'input_build_batches_' and are
/// backed by each row batch's pool.
RowBatchList copied_build_batches_;
};
}
#endif