blob: 24966409ecfd3970bbc1480fa27dcdeb0e61c6bd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_KUDU_TABLE_SINK_H
#define IMPALA_EXEC_KUDU_TABLE_SINK_H
#include <boost/scoped_ptr.hpp>
#include <kudu/client/client.h>
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "common/status.h"
#include "exec/kudu-util.h"
#include "exec/data-sink.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
namespace impala {
/// Sink that takes RowBatches and writes them into a Kudu table.
///
/// The data is added to Kudu in Send(). The Kudu client is configured to automatically
/// flush records when enough data has been written (AUTO_FLUSH_BACKGROUND). This
/// requires specifying a mutation buffer size and a buffer flush watermark percentage in
/// the Kudu client. The mutation buffer needs to be large enough to buffer rows sent to
/// all destination nodes because the buffer accounting is not specified per-tablet
/// server (KUDU-1693). Tests showed that 10MB was a good default, and this is
/// configurable via the gflag --kudu_mutation_buffer_size. The buffer flush watermark
/// percentage is set to a value that results in Kudu flushing after 7MB is in a
/// buffer for a particular destination (of the 10MB of the total mutation buffer space)
/// because Kudu currently has some 8MB buffer limits.
///
/// Kudu doesn't have transactions yet, so some rows may fail to write while others are
/// successful. The Kudu client reports errors, some of which are treated as warnings and
/// will not fail the query: PK already exists on INSERT, key not found on UPDATE/DELETE,
/// NULL in a non-nullable column, and PK specifying rows in an uncovered range.
/// The number of rows that cannot be modified due to these errors is reported in the
/// TInsertPartitionStatus report sent by the DataSink to the coordinator.
/// Any other kind of error reported by Kudu results in the sink returning an error
/// status. All reported errors (ignored or not) will be logged via the RuntimeState.
class KuduTableSink : public DataSink {
public:
KuduTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
const TDataSink& tsink, RuntimeState* state);
/// Prepares the expressions to be applied and creates a KuduSchema based on the
/// expressions and KuduTableDescriptor.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
/// Connects to Kudu and creates the KuduSession to be used for the writes.
virtual Status Open(RuntimeState* state);
/// Transforms 'batch' into Kudu writes and sends them to Kudu.
/// The KuduSession is flushed on each row batch.
virtual Status Send(RuntimeState* state, RowBatch* batch);
/// Forces any remaining buffered operations to be flushed to Kudu.
virtual Status FlushFinal(RuntimeState* state);
/// Closes the KuduSession and the expressions.
virtual void Close(RuntimeState* state);
private:
/// Create a new write operation according to the sink type.
kudu::client::KuduWriteOperation* NewWriteOp();
/// Checks for any errors buffered in the Kudu session, and increments
/// appropriate counters for ignored errors.
//
/// Returns a bad Status if there are non-ignorable errors.
Status CheckForErrors(RuntimeState* state) WARN_UNUSED_RESULT;
/// Used to get the KuduTableDescriptor from the RuntimeState
TableId table_id_;
/// The descriptor of the KuduTable being written to. Set on Prepare().
const KuduTableDescriptor* table_desc_;
/// The Kudu client, owned by the ExecEnv.
kudu::client::KuduClient* client_ = nullptr;
/// The Kudu table and session.
kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_;
/// A cache of the nullability of each Kudu column. The Kudu schema accessor
/// is not inlined and actually creates a copy (see IMPALA-8284).
///
/// Initialized in Open().
std::vector<bool> kudu_column_nullabilities_;
/// Used to specify the type of write operation (INSERT/UPDATE/DELETE).
TSinkAction::type sink_action_;
/// Captures parameters passed down from the frontend
TKuduTableSink kudu_table_sink_;
/// The amount consumed from 'mem_tracker_' to account for the mem used by 'client_'.
int64_t client_tracked_bytes_;
/// Time spent applying Kudu operations. In normal circumstances, Apply() should be
/// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled.
/// Significant time spent in Apply() may indicate that Kudu cannot buffer and send
/// rows as fast as the sink can write them.
RuntimeProfile::Counter* kudu_apply_timer_;
/// Total number of rows processed, i.e. rows written to Kudu and also rows with
/// errors.
RuntimeProfile::Counter* total_rows_ = nullptr;
/// The number of rows with errors.
RuntimeProfile::Counter* num_row_errors_ = nullptr;
/// Rate at which the sink consumes and processes rows, i.e. writing rows to Kudu or
/// skipping rows that are known to violate nullability constraints.
RuntimeProfile::Counter* rows_processed_rate_ = nullptr;
};
} // namespace impala
#endif // IMPALA_EXEC_KUDU_TABLE_SINK_H