blob: 9ec520141ccadbce88a68cb63283c091c881d522 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/kudu-scanner.h"
#include <kudu/client/row_result.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <vector>
#include <string>
#include "exprs/expr.h"
#include "exprs/expr-context.h"
#include "exec/kudu-util.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-state.h"
#include "runtime/row-batch.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "gutil/gscoped_ptr.h"
#include "gutil/strings/substitute.h"
#include "util/jni-util.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using kudu::client::KuduClient;
using kudu::client::KuduScanBatch;
using kudu::client::KuduSchema;
using kudu::client::KuduTable;
DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
"Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Invalid values "
"result in using READ_LATEST.");
DEFINE_bool(pick_only_leaders_for_tests, false,
"Whether to pick only leader replicas, for tests purposes only.");
DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
"The period at which Kudu Scanners should send keep-alive requests to the tablet "
"server to ensure that scanners do not time out.");
DECLARE_int32(kudu_operation_timeout_ms);
namespace impala {
const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
: scan_node_(scan_node),
state_(state),
cur_kudu_batch_num_read_(0),
last_alive_time_micros_(0) {
}
Status KuduScanner::Open() {
return scan_node_->GetConjunctCtxs(&conjunct_ctxs_);
}
void KuduScanner::KeepKuduScannerAlive() {
if (scanner_ == NULL) return;
int64_t now = MonotonicMicros();
int64_t keepalive_us = FLAGS_kudu_scanner_keep_alive_period_sec * 1e6;
if (now < last_alive_time_micros_ + keepalive_us) {
return;
}
// If we fail to send a keepalive, it isn't a big deal. The Kudu
// client code doesn't handle cross-replica failover or retries when
// the server is busy, so it's better to just ignore errors here. In
// the worst case, we will just fail next time we try to fetch a batch
// if the scan is unrecoverable.
kudu::Status s = scanner_->KeepAlive();
if (!s.ok()) {
VLOG(1) << "Unable to keep the Kudu scanner alive: " << s.ToString();
return;
}
last_alive_time_micros_ = now;
}
Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
int64_t tuple_buffer_size;
uint8_t* tuple_buffer;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_buffer));
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
// Main scan loop:
// Tries to fill 'row_batch' with rows from cur_kudu_batch_.
// If there are no rows to decode, tries to get the next row batch from kudu.
// If this scanner has no more rows, the scanner is closed and eos is returned.
while (!*eos) {
RETURN_IF_CANCELLED(state_);
if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) {
RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple));
if (row_batch->AtCapacity()) break;
}
if (scanner_->HasMoreRows() && !scan_node_->ReachedLimit()) {
RETURN_IF_ERROR(GetNextScannerBatch());
continue;
}
CloseCurrentClientScanner();
*eos = true;
}
return Status::OK();
}
void KuduScanner::Close() {
if (scanner_) CloseCurrentClientScanner();
Expr::Close(conjunct_ctxs_, state_);
}
Status KuduScanner::OpenNextScanToken(const string& scan_token) {
DCHECK(scanner_ == NULL);
kudu::client::KuduScanner* scanner;
KUDU_RETURN_IF_ERROR(kudu::client::KuduScanToken::DeserializeIntoScanner(
scan_node_->kudu_client(), scan_token, &scanner),
"Unable to deserialize scan token");
scanner_.reset(scanner);
if (UNLIKELY(FLAGS_pick_only_leaders_for_tests)) {
KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
"Could not set replica selection.");
}
kudu::client::KuduScanner::ReadMode mode =
MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ?
kudu::client::KuduScanner::READ_AT_SNAPSHOT :
kudu::client::KuduScanner::READ_LATEST;
KUDU_RETURN_IF_ERROR(scanner_->SetReadMode(mode), "Could not set scanner ReadMode");
KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
"Could not set scanner timeout");
VLOG_ROW << "Starting KuduScanner with ReadMode=" << mode << " timeout=" <<
FLAGS_kudu_operation_timeout_ms;
{
SCOPED_TIMER(state_->total_storage_wait_timer());
KUDU_RETURN_IF_ERROR(scanner_->Open(), "Unable to open scanner");
}
return Status::OK();
}
void KuduScanner::CloseCurrentClientScanner() {
DCHECK_NOTNULL(scanner_.get());
scanner_->Close();
scanner_.reset();
}
Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) {
int num_rows_remaining = cur_kudu_batch_.NumRows() - cur_kudu_batch_num_read_;
int rows_to_add = std::min(row_batch->capacity() - row_batch->num_rows(),
num_rows_remaining);
cur_kudu_batch_num_read_ += rows_to_add;
row_batch->CommitRows(rows_to_add);
return Status::OK();
}
Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) {
// Short-circuit the count(*) case.
if (scan_node_->tuple_desc_->slots().empty()) {
return HandleEmptyProjection(row_batch);
}
// Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into
// 'row_batch'.
bool has_conjuncts = !conjunct_ctxs_.empty();
int num_rows = cur_kudu_batch_.NumRows();
for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) {
// Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation
// is performed directly on the Kudu tuple because its memory layout is identical to
// Impala's. We only copy the surviving tuples to Impala's output row batch.
KuduScanBatch::RowPtr krow = cur_kudu_batch_.Row(krow_idx);
Tuple* kudu_tuple = reinterpret_cast<Tuple*>(const_cast<void*>(krow.cell(0)));
++cur_kudu_batch_num_read_;
if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0],
conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) {
continue;
}
// Deep copy the tuple, set it in a new row, and commit the row.
kudu_tuple->DeepCopy(*tuple_mem, *scan_node_->tuple_desc(),
row_batch->tuple_data_pool());
TupleRow* row = row_batch->GetRow(row_batch->AddRow());
row->SetTuple(0, *tuple_mem);
row_batch->CommitLastRow();
// If we've reached the capacity, or the LIMIT for the scan, return.
if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
// Move to the next tuple in the tuple buffer.
*tuple_mem = next_tuple(*tuple_mem);
}
ExprContext::FreeLocalAllocations(conjunct_ctxs_);
// Check the status in case an error status was set during conjunct evaluation.
return state_->GetQueryStatus();
}
Status KuduScanner::GetNextScannerBatch() {
SCOPED_TIMER(state_->total_storage_wait_timer());
int64_t now = MonotonicMicros();
KUDU_RETURN_IF_ERROR(scanner_->NextBatch(&cur_kudu_batch_), "Unable to advance iterator");
COUNTER_ADD(scan_node_->kudu_round_trips(), 1);
cur_kudu_batch_num_read_ = 0;
COUNTER_ADD(scan_node_->rows_read_counter(), cur_kudu_batch_.NumRows());
last_alive_time_micros_ = now;
return Status::OK();
}
} // namespace impala