blob: 35bd2c11e72335f6e61242fce71e8234cc8615c2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/kudu-scan-node-base.h"
#include <boost/algorithm/string.hpp>
#include <kudu/client/row_result.h>
#include <kudu/client/schema.h>
#include <kudu/client/value.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <vector>
#include "exec/kudu-scanner.h"
#include "exec/kudu-util.h"
#include "exprs/expr.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/query-state.h"
#include "runtime/runtime-state.h"
#include "runtime/row-batch.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using kudu::client::KuduClient;
using kudu::client::KuduTable;
namespace impala {
const string KuduScanNodeBase::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens";
/// KuduClientTime - total amount of time scanner threads spent in the Kudu
/// client, either waiting for data from Kudu or processing data.
const string KuduScanNodeBase::KUDU_CLIENT_TIME = "KuduClientTime";
KuduScanNodeBase::KuduScanNodeBase(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
tuple_id_(tnode.kudu_scan_node.tuple_id),
count_star_slot_offset_(
tnode.kudu_scan_node.__isset.count_star_slot_offset ?
tnode.kudu_scan_node.count_star_slot_offset : -1) {
DCHECK(KuduIsAvailable());
}
KuduScanNodeBase::~KuduScanNodeBase() {
DCHECK(is_closed());
}
Status KuduScanNodeBase::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Prepare(state));
scan_ranges_complete_counter_ =
ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
kudu_client_time_ = ADD_TIMER(runtime_profile(), KUDU_CLIENT_TIME);
DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
// Initialize the list of scan tokens to process from the TScanRangeParams.
DCHECK(scan_range_params_ != NULL);
int num_remote_tokens = 0;
for (const TScanRangeParams& params: *scan_range_params_) {
if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens;
scan_tokens_.push_back(params.scan_range.kudu_scan_token);
}
COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);
return Status::OK();
}
Status KuduScanNodeBase::Open(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
SCOPED_TIMER(runtime_profile_->total_time_counter());
const KuduTableDescriptor* table_desc =
static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
table_desc->kudu_master_addresses(), &client_));
uint64_t latest_ts = static_cast<uint64_t>(
max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
"Unable to open Kudu table");
return Status::OK();
}
void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const {
string indent(indentation_level * 2, ' ');
*out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
}
bool KuduScanNodeBase::HasScanToken() {
return (next_scan_token_idx_ < scan_tokens_.size());
}
const string* KuduScanNodeBase::GetNextScanToken() {
if (!HasScanToken()) return nullptr;
const string* token = &scan_tokens_[next_scan_token_idx_++];
return token;
}
} // namespace impala