blob: 8913af457031b7b406d778e33293674377ee0742 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hbase-table-scanner.h"
#include "exec/hbase-scan-node.h"
#include <cstring>
#include <algorithm>
#include "util/bit-util.h"
#include "util/jni-util.h"
#include "runtime/descriptors.h"
#include "runtime/runtime-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/tuple.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using namespace impala;
using namespace strings;
jclass HBaseTableScanner::scan_cl_ = NULL;
jclass HBaseTableScanner::resultscanner_cl_ = NULL;
jclass HBaseTableScanner::result_cl_ = NULL;
jclass HBaseTableScanner::cell_cl_ = NULL;
jclass HBaseTableScanner::hconstants_cl_ = NULL;
jclass HBaseTableScanner::filter_list_cl_ = NULL;
jclass HBaseTableScanner::filter_list_op_cl_ = NULL;
jclass HBaseTableScanner::single_column_value_filter_cl_ = NULL;
jclass HBaseTableScanner::compare_op_cl_ = NULL;
jclass HBaseTableScanner::scanner_timeout_ex_cl_ = NULL;
jmethodID HBaseTableScanner::scan_ctor_ = NULL;
jmethodID HBaseTableScanner::scan_set_max_versions_id_ = NULL;
jmethodID HBaseTableScanner::scan_set_caching_id_ = NULL;
jmethodID HBaseTableScanner::scan_set_cache_blocks_id_ = NULL;
jmethodID HBaseTableScanner::scan_add_column_id_ = NULL;
jmethodID HBaseTableScanner::scan_set_filter_id_ = NULL;
jmethodID HBaseTableScanner::scan_set_start_row_id_ = NULL;
jmethodID HBaseTableScanner::scan_set_stop_row_id_ = NULL;
jmethodID HBaseTableScanner::resultscanner_next_id_ = NULL;
jmethodID HBaseTableScanner::resultscanner_close_id_ = NULL;
jmethodID HBaseTableScanner::result_isempty_id_ = NULL;
jmethodID HBaseTableScanner::result_raw_cells_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_row_array_ = NULL;
jmethodID HBaseTableScanner::cell_get_family_array_ = NULL;
jmethodID HBaseTableScanner::cell_get_qualifier_array_ = NULL;
jmethodID HBaseTableScanner::cell_get_value_array_ = NULL;
jmethodID HBaseTableScanner::cell_get_family_offset_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_family_length_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_qualifier_offset_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_qualifier_length_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_row_offset_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_row_length_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_value_offset_id_ = NULL;
jmethodID HBaseTableScanner::cell_get_value_length_id_ = NULL;
jmethodID HBaseTableScanner::filter_list_ctor_ = NULL;
jmethodID HBaseTableScanner::filter_list_add_filter_id_ = NULL;
jmethodID HBaseTableScanner::single_column_value_filter_ctor_ = NULL;
jobject HBaseTableScanner::empty_row_ = NULL;
jobject HBaseTableScanner::must_pass_all_op_ = NULL;
jobjectArray HBaseTableScanner::compare_ops_ = NULL;
const string HBASE_MEM_LIMIT_EXCEEDED = "HBaseTableScanner::$0() failed to "
"allocate $1 bytes for $2.";
void HBaseTableScanner::ScanRange::DebugString(int indentation_level,
stringstream* out) {
*out << string(indentation_level * 2, ' ');
if (!start_key_.empty()) {
*out << " start_key=" << start_key_;
}
if (!stop_key_.empty()) {
*out << " stop_key=" << stop_key_;
}
}
HBaseTableScanner::HBaseTableScanner(
HBaseScanNode* scan_node, HBaseTableFactory* htable_factory, RuntimeState* state)
: scan_node_(scan_node),
state_(state),
htable_factory_(htable_factory),
htable_(NULL),
scan_(NULL),
resultscanner_(NULL),
cells_(NULL),
cell_index_(0),
num_requested_cells_(0),
num_addl_requested_cols_(0),
num_cells_(0),
all_cells_present_(false),
value_pool_(new MemPool(scan_node_->mem_tracker(), true)),
scan_setup_timer_(ADD_TIMER(scan_node_->runtime_profile(),
"HBaseTableScanner.ScanSetup")) {
const TQueryOptions& query_option = state->query_options();
if (query_option.__isset.hbase_caching && query_option.hbase_caching > 0) {
rows_cached_ = query_option.hbase_caching;
} else {
int max_caching = scan_node_->suggested_max_caching();
rows_cached_ = (max_caching > 0 && max_caching < DEFAULT_ROWS_CACHED) ?
max_caching : DEFAULT_ROWS_CACHED;
}
cache_blocks_ = query_option.__isset.hbase_cache_blocks &&
query_option.hbase_cache_blocks;
}
Status HBaseTableScanner::Init() {
// Get the JNIEnv* corresponding to current thread.
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == NULL) {
return Status("Failed to get/create JVM");
}
// Global class references:
// HTable, Scan, ResultScanner, Result, ImmutableBytesWritable, HConstants.
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/Scan", &scan_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/ResultScanner",
&resultscanner_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/Result",
&result_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/HConstants",
&hconstants_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/filter/FilterList",
&filter_list_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env,
"org/apache/hadoop/hbase/filter/FilterList$Operator", &filter_list_op_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env,
"org/apache/hadoop/hbase/filter/SingleColumnValueFilter",
&single_column_value_filter_cl_));
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env,
"org/apache/hadoop/hbase/filter/CompareFilter$CompareOp",
&compare_op_cl_));
// ScannerTimeoutException is removed in HBase 2.0. Leave this as null if
// ScannerTimeoutException does not exist.
if (JniUtil::ClassExists(env,
"org/apache/hadoop/hbase/client/ScannerTimeoutException")) {
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
"org/apache/hadoop/hbase/client/ScannerTimeoutException",
&scanner_timeout_ex_cl_));
}
RETURN_IF_ERROR(
JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/Cell", &cell_cl_));
// Scan method ids.
scan_ctor_ = env->GetMethodID(scan_cl_, "<init>", "()V");
RETURN_ERROR_IF_EXC(env);
scan_set_max_versions_id_ = env->GetMethodID(scan_cl_, "setMaxVersions",
"(I)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
// The signature of setCaching and setCacheBlocks returns either void (old behavior)
// or a Scan object (new behavior). Since the code doesn't use the return value,
// tolerate either. However, the two are expected to be consistent.
if (JniUtil::MethodExists(env, scan_cl_, "setCaching", "(I)V")) {
scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching", "(I)V");
RETURN_ERROR_IF_EXC(env);
scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks", "(Z)V");
RETURN_ERROR_IF_EXC(env);
} else {
scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching",
"(I)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks",
"(Z)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
}
scan_add_column_id_ = env->GetMethodID(scan_cl_, "addColumn",
"([B[B)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
scan_set_filter_id_ = env->GetMethodID(scan_cl_, "setFilter",
"(Lorg/apache/hadoop/hbase/filter/Filter;)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
// TODO: IMPALA-5584: In HBase 2.0, setStartRow() and setStopRow() are deprecated.
scan_set_start_row_id_ = env->GetMethodID(scan_cl_, "setStartRow",
"([B)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
scan_set_stop_row_id_ = env->GetMethodID(scan_cl_, "setStopRow",
"([B)Lorg/apache/hadoop/hbase/client/Scan;");
RETURN_ERROR_IF_EXC(env);
// ResultScanner method ids.
resultscanner_next_id_ = env->GetMethodID(resultscanner_cl_, "next",
"()Lorg/apache/hadoop/hbase/client/Result;");
RETURN_ERROR_IF_EXC(env);
resultscanner_close_id_ = env->GetMethodID(resultscanner_cl_, "close", "()V");
RETURN_ERROR_IF_EXC(env);
// Result method ids.
result_raw_cells_id_ = env->GetMethodID(result_cl_, "rawCells",
"()[Lorg/apache/hadoop/hbase/Cell;");
RETURN_ERROR_IF_EXC(env);
result_isempty_id_ = env->GetMethodID(result_cl_, "isEmpty", "()Z");
RETURN_ERROR_IF_EXC(env);
// Cell method ids to retrieve buffers backing different portions of row data.
cell_get_row_array_ = env->GetMethodID(cell_cl_, "getRowArray", "()[B");
RETURN_ERROR_IF_EXC(env);
cell_get_family_array_ = env->GetMethodID(cell_cl_, "getFamilyArray", "()[B");
RETURN_ERROR_IF_EXC(env);
cell_get_qualifier_array_ = env->GetMethodID(cell_cl_, "getQualifierArray", "()[B");
RETURN_ERROR_IF_EXC(env);
cell_get_value_array_ = env->GetMethodID(cell_cl_, "getValueArray", "()[B");
RETURN_ERROR_IF_EXC(env);
// Cell method ids for retrieving lengths and offsets into buffers backing different
// portions of row data.
cell_get_family_offset_id_ = env->GetMethodID(cell_cl_, "getFamilyOffset", "()I");
RETURN_ERROR_IF_EXC(env);
cell_get_family_length_id_ = env->GetMethodID(cell_cl_, "getFamilyLength", "()B");
RETURN_ERROR_IF_EXC(env);
cell_get_qualifier_offset_id_ =
env->GetMethodID(cell_cl_, "getQualifierOffset", "()I");
RETURN_ERROR_IF_EXC(env);
cell_get_qualifier_length_id_ =
env->GetMethodID(cell_cl_, "getQualifierLength", "()I");
RETURN_ERROR_IF_EXC(env);
cell_get_row_offset_id_ = env->GetMethodID(cell_cl_, "getRowOffset", "()I");
RETURN_ERROR_IF_EXC(env);
cell_get_row_length_id_ = env->GetMethodID(cell_cl_, "getRowLength", "()S");
RETURN_ERROR_IF_EXC(env);
cell_get_value_offset_id_ = env->GetMethodID(cell_cl_, "getValueOffset", "()I");
RETURN_ERROR_IF_EXC(env);
cell_get_value_length_id_ = env->GetMethodID(cell_cl_, "getValueLength", "()I");
RETURN_ERROR_IF_EXC(env);
// HConstants fields.
jfieldID empty_start_row_id =
env->GetStaticFieldID(hconstants_cl_, "EMPTY_START_ROW", "[B");
RETURN_ERROR_IF_EXC(env);
empty_row_ = env->GetStaticObjectField(hconstants_cl_, empty_start_row_id);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, empty_row_, &empty_row_));
// FilterList method ids.
filter_list_ctor_ = env->GetMethodID(filter_list_cl_, "<init>",
"(Lorg/apache/hadoop/hbase/filter/FilterList$Operator;)V");
RETURN_ERROR_IF_EXC(env);
filter_list_add_filter_id_ = env->GetMethodID(filter_list_cl_, "addFilter",
"(Lorg/apache/hadoop/hbase/filter/Filter;)V");
RETURN_ERROR_IF_EXC(env);
// FilterList.Operator fields.
jfieldID must_pass_all_id = env->GetStaticFieldID(filter_list_op_cl_, "MUST_PASS_ALL",
"Lorg/apache/hadoop/hbase/filter/FilterList$Operator;");
RETURN_ERROR_IF_EXC(env);
must_pass_all_op_ = env->GetStaticObjectField(filter_list_op_cl_, must_pass_all_id);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, must_pass_all_op_, &must_pass_all_op_));
// SingleColumnValueFilter method ids.
single_column_value_filter_ctor_ =
env->GetMethodID(single_column_value_filter_cl_, "<init>",
"([B[BLorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;[B)V");
RETURN_ERROR_IF_EXC(env);
// Get op array from CompareFilter.CompareOp.
jmethodID compare_op_values = env->GetStaticMethodID(compare_op_cl_, "values",
"()[Lorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;");
RETURN_ERROR_IF_EXC(env);
compare_ops_ =
(jobjectArray) env->CallStaticObjectMethod(compare_op_cl_, compare_op_values);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, reinterpret_cast<jobject>(compare_ops_),
reinterpret_cast<jobject*>(&compare_ops_)));
return Status::OK();
}
Status HBaseTableScanner::ScanSetup(JNIEnv* env, const TupleDescriptor* tuple_desc,
const vector<THBaseFilter>& filters) {
SCOPED_TIMER(scan_setup_timer_);
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
const HBaseTableDescriptor* hbase_table =
static_cast<const HBaseTableDescriptor*>(tuple_desc->table_desc());
// Use global cache of HTables.
RETURN_IF_ERROR(htable_factory_->GetTable(hbase_table->table_name(),
&htable_));
// Setup an Scan object without the range
// scan_ = new Scan();
DCHECK(scan_ == NULL);
jobject local_scan = env->NewObject(scan_cl_, scan_ctor_);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, local_scan, &scan_));
// scan_.setMaxVersions(1);
env->CallObjectMethod(scan_, scan_set_max_versions_id_, 1);
RETURN_ERROR_IF_EXC(env);
// scan_.setCaching(rows_cached_);
env->CallObjectMethod(scan_, scan_set_caching_id_, rows_cached_);
RETURN_ERROR_IF_EXC(env);
// scan_.setCacheBlocks(cache_blocks_);
env->CallObjectMethod(scan_, scan_set_cache_blocks_id_, cache_blocks_);
RETURN_ERROR_IF_EXC(env);
const vector<SlotDescriptor*>& slots = tuple_desc->slots();
// Restrict scan to materialized families/qualifiers.
for (int i = 0; i < slots.size(); ++i) {
const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
// The row key has an empty qualifier.
if (qualifier.empty()) continue;
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jbyteArray family_bytes;
RETURN_IF_ERROR(CreateByteArray(env, family, &family_bytes));
jbyteArray qualifier_bytes;
RETURN_IF_ERROR(CreateByteArray(env, qualifier, &qualifier_bytes));
// scan_.addColumn(family_bytes, qualifier_bytes);
env->CallObjectMethod(scan_, scan_add_column_id_, family_bytes, qualifier_bytes);
RETURN_ERROR_IF_EXC(env);
}
// circumvent hbase bug: make sure to select all cols that have filters,
// otherwise the filter may not get applied;
// see HBASE-4364 (https://issues.apache.org/jira/browse/HBASE-4364)
num_addl_requested_cols_ = 0;
for (vector<THBaseFilter>::const_iterator it = filters.begin(); it != filters.end();
++it) {
bool requested = false;
for (int i = 0; i < slots.size(); ++i) {
const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
if (family == it->family && qualifier == it->qualifier) {
requested = true;
break;
}
}
if (requested) continue;
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jbyteArray family_bytes;
RETURN_IF_ERROR(CreateByteArray(env, it->family, &family_bytes));
jbyteArray qualifier_bytes;
RETURN_IF_ERROR(CreateByteArray(env, it->qualifier, &qualifier_bytes));
// scan_.addColumn(family_bytes, qualifier_bytes);
env->CallObjectMethod(scan_, scan_add_column_id_, family_bytes, qualifier_bytes);
RETURN_ERROR_IF_EXC(env);
++num_addl_requested_cols_;
}
// Add HBase Filters.
if (!filters.empty()) {
// filter_list = new FilterList(Operator.MUST_PASS_ALL);
jobject filter_list =
env->NewObject(filter_list_cl_, filter_list_ctor_, must_pass_all_op_);
RETURN_ERROR_IF_EXC(env);
vector<THBaseFilter>::const_iterator it;
for (it = filters.begin(); it != filters.end(); ++it) {
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
// hbase_op = CompareFilter.CompareOp.values()[it->op_ordinal];
jobject hbase_op = env->GetObjectArrayElement(compare_ops_, it->op_ordinal);
RETURN_ERROR_IF_EXC(env);
jbyteArray family_bytes;
RETURN_IF_ERROR(CreateByteArray(env, it->family, &family_bytes));
jbyteArray qualifier_bytes;
RETURN_IF_ERROR(CreateByteArray(env, it->qualifier, &qualifier_bytes));
jbyteArray value_bytes;
RETURN_IF_ERROR(CreateByteArray(env, it->filter_constant, &value_bytes));
// filter = new SingleColumnValueFilter(family_bytes, qualifier_bytes, hbase_op,
// value_bytes);
jobject filter = env->NewObject(single_column_value_filter_cl_,
single_column_value_filter_ctor_, family_bytes, qualifier_bytes, hbase_op,
value_bytes);
RETURN_ERROR_IF_EXC(env);
// filter_list.add(filter);
env->CallBooleanMethod(filter_list, filter_list_add_filter_id_, filter);
RETURN_ERROR_IF_EXC(env);
}
// scan.setFilter(filter_list);
env->CallObjectMethod(scan_, scan_set_filter_id_, filter_list);
RETURN_ERROR_IF_EXC(env);
}
return Status::OK();
}
Status HBaseTableScanner::HandleResultScannerTimeout(JNIEnv* env, bool* timeout) {
*timeout = false;
jthrowable exc = env->ExceptionOccurred();
if (exc == NULL) return Status::OK();
// GetJniExceptionMsg gets the error message and clears the exception status (which is
// necessary). For HBase 2.0, ScannerTimeoutException does not exist, so simply
// return the error. Otherwise, we return the error if the exception was not a
// ScannerTimeoutException.
Status status = JniUtil::GetJniExceptionMsg(env, false);
if (scanner_timeout_ex_cl_ == nullptr) return status;
if (env->IsInstanceOf(exc, scanner_timeout_ex_cl_) != JNI_TRUE) return status;
*timeout = true;
const ScanRange& scan_range = (*scan_range_vector_)[current_scan_range_idx_];
// If cells_ is NULL, then the ResultScanner timed out before it was ever used
// so we can just re-create the ResultScanner with the same scan_range
if (cells_ == NULL) return InitScanRange(env, scan_range);
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
DCHECK_LT(cell_index_, num_cells_);
jobject cell = env->GetObjectArrayElement(cells_, cell_index_);
RETURN_ERROR_IF_EXC(env);
// Specifically set the start_bytes to the next row since some of them were already
// read
jbyteArray start_bytes =
(jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
RETURN_ERROR_IF_EXC(env);
jbyteArray end_bytes;
RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes));
return InitScanRange(env, start_bytes, end_bytes);
}
Status HBaseTableScanner::InitScanRange(JNIEnv* env, const ScanRange& scan_range) {
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jbyteArray start_bytes;
RETURN_IF_ERROR(CreateByteArray(env, scan_range.start_key(), &start_bytes));
jbyteArray end_bytes;
RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes));
return InitScanRange(env, start_bytes, end_bytes);
}
Status HBaseTableScanner::InitScanRange(JNIEnv* env, jbyteArray start_bytes,
jbyteArray end_bytes) {
// scan_.setStartRow(start_bytes);
env->CallObjectMethod(scan_, scan_set_start_row_id_, start_bytes);
RETURN_ERROR_IF_EXC(env);
// scan_.setStopRow(end_bytes);
env->CallObjectMethod(scan_, scan_set_stop_row_id_, end_bytes);
RETURN_ERROR_IF_EXC(env);
if (resultscanner_ != NULL) {
// resultscanner_.close();
env->CallObjectMethod(resultscanner_, resultscanner_close_id_);
RETURN_ERROR_IF_EXC(env);
env->DeleteGlobalRef(resultscanner_);
resultscanner_ = NULL;
}
// resultscanner_ = htable_.getScanner(scan_);
jobject local_resultscanner;
RETURN_IF_ERROR(htable_->GetResultScanner(scan_, &local_resultscanner));
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, local_resultscanner, &resultscanner_));
return Status::OK();
}
Status HBaseTableScanner::StartScan(JNIEnv* env, const TupleDescriptor* tuple_desc,
const ScanRangeVector& scan_range_vector, const vector<THBaseFilter>& filters) {
DCHECK(scan_range_vector.size() > 0);
// Setup the scan without ranges first
RETURN_IF_ERROR(ScanSetup(env, tuple_desc, filters));
// Record the ranges
scan_range_vector_ = &scan_range_vector;
current_scan_range_idx_ = 0;
// Now, scan the first range (we should have at least one range). The
// resultscanner_ is NULL and gets created in InitScanRange, so we don't
// need to check if it timed out.
DCHECK(resultscanner_ == NULL);
return InitScanRange(env, (*scan_range_vector_)[current_scan_range_idx_]);
}
Status HBaseTableScanner::CreateByteArray(JNIEnv* env, const string& s,
jbyteArray* bytes) {
if (!s.empty()) {
*bytes = env->NewByteArray(s.size());
if (*bytes == NULL) {
return Status("Couldn't construct java byte array for key " + s);
}
env->SetByteArrayRegion(*bytes, 0, s.size(),
reinterpret_cast<const jbyte*>(s.data()));
} else {
*bytes = reinterpret_cast<jbyteArray>(empty_row_);
}
return Status::OK();
}
Status HBaseTableScanner::Next(JNIEnv* env, bool* has_next) {
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jobject result = NULL;
{
SCOPED_TIMER(scan_node_->hbase_read_timer());
while (true) {
DCHECK(resultscanner_ != NULL);
// result_ = resultscanner_.next();
result = env->CallObjectMethod(resultscanner_, resultscanner_next_id_);
RETURN_ERROR_IF_EXC(env);
// Normally we would check for a JNI exception via RETURN_ERROR_IF_EXC, but we
// need to also check for scanner timeouts and handle them specially, which is
// done by HandleResultScannerTimeout(). If a timeout occurred, then it will
// re-create the ResultScanner so we can try again.
bool timeout;
RETURN_IF_ERROR(HandleResultScannerTimeout(env, &timeout));
if (timeout) {
result = env->CallObjectMethod(resultscanner_, resultscanner_next_id_);
// There shouldn't be a timeout now, so we will just return any errors.
RETURN_ERROR_IF_EXC(env);
}
// jump to the next region when finished with the current region.
if (result == NULL && current_scan_range_idx_ + 1 < scan_range_vector_->size()) {
++current_scan_range_idx_;
RETURN_IF_ERROR(InitScanRange(env,
(*scan_range_vector_)[current_scan_range_idx_]));
continue;
}
// Ignore empty rows
if (result != NULL) {
bool isEmpty = JNI_TRUE == env->CallBooleanMethod(result, result_isempty_id_);
RETURN_ERROR_IF_EXC(env);
if (isEmpty) {
continue;
}
}
break;
}
}
if (result == NULL) {
*has_next = false;
return Status::OK();
}
if (cells_ != NULL) env->DeleteGlobalRef(cells_);
// cells_ = result.raw();
jobject local_cells = reinterpret_cast<jobjectArray>(
env->CallObjectMethod(result, result_raw_cells_id_));
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, local_cells, &cells_));
num_cells_ = env->GetArrayLength(cells_);
// Check that raw() didn't return more cells than expected.
// If num_requested_cells_ is 0 then only row key is asked for and this check
// should pass.
if (num_cells_ > num_requested_cells_ + num_addl_requested_cols_
&& num_requested_cells_ + num_addl_requested_cols_ != 0) {
*has_next = false;
return Status("Encountered more cells than expected.");
}
// If all requested columns are present, and we didn't ask for any extra ones to work
// around an hbase bug, we avoid family-/qualifier comparisons in NextValue().
if (num_cells_ == num_requested_cells_ && num_addl_requested_cols_ == 0) {
all_cells_present_ = true;
} else {
all_cells_present_ = false;
}
cell_index_ = 0;
value_pool_->Clear();
*has_next = true;
return Status::OK();
}
inline void HBaseTableScanner::WriteTupleSlot(const SlotDescriptor* slot_desc,
Tuple* tuple, void* data) {
void* slot = tuple->GetSlot(slot_desc->tuple_offset());
BitUtil::ByteSwap(slot, data, slot_desc->type().GetByteSize());
}
inline Status HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
void** data, int* length) {
JniLocalFrame frame;
RETURN_IF_ERROR(frame.push(env));
int offset = env->CallIntMethod(cell, cell_get_row_offset_id_);
RETURN_ERROR_IF_EXC(env);
*length = env->CallShortMethod(cell, cell_get_row_length_id_);
RETURN_ERROR_IF_EXC(env);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
RETURN_ERROR_IF_EXC(env);
*data = value_pool_->TryAllocate(*length);
if (UNLIKELY(*data == NULL)) {
string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetRowKey",
*length, "row array");
return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
}
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
return Status::OK();
}
inline Status HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_family_offset_id_);
RETURN_ERROR_IF_EXC(env);
*length = env->CallShortMethod(cell, cell_get_family_length_id_);
RETURN_ERROR_IF_EXC(env);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_family_array_);
RETURN_ERROR_IF_EXC(env);
*data = value_pool_->TryAllocate(*length);
if (UNLIKELY(*data == NULL)) {
string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetFamily",
*length, "family array");
return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
}
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
return Status::OK();
}
inline Status HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_qualifier_offset_id_);
RETURN_ERROR_IF_EXC(env);
*length = env->CallIntMethod(cell, cell_get_qualifier_length_id_);
RETURN_ERROR_IF_EXC(env);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_qualifier_array_);
RETURN_ERROR_IF_EXC(env);
*data = value_pool_->TryAllocate(*length);
if (UNLIKELY(*data == NULL)) {
string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetQualifier",
*length, "qualifier array");
return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
}
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
return Status::OK();
}
inline Status HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
void** data, int* length) {
int offset = env->CallIntMethod(cell, cell_get_value_offset_id_);
RETURN_ERROR_IF_EXC(env);
*length = env->CallIntMethod(cell, cell_get_value_length_id_);
RETURN_ERROR_IF_EXC(env);
jbyteArray jdata =
(jbyteArray) env->CallObjectMethod(cell, cell_get_value_array_);
RETURN_ERROR_IF_EXC(env);
*data = value_pool_->TryAllocate(*length);
if (UNLIKELY(*data == NULL)) {
string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetValue",
*length, "value array");
return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length);
}
env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(scan_node_->bytes_read_counter(), *length);
return Status::OK();
}
Status HBaseTableScanner::GetRowKey(JNIEnv* env, void** key, int* key_length) {
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jobject cell = env->GetObjectArrayElement(cells_, 0);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(GetRowKey(env, cell, key, key_length));
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
Status HBaseTableScanner::GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc,
Tuple* tuple) {
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
void* key;
int key_length;
jobject cell = env->GetObjectArrayElement(cells_, 0);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(GetRowKey(env, cell, &key, &key_length));
DCHECK_EQ(key_length, slot_desc->type().GetByteSize());
WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(key));
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
const string& qualifier, void** data, int* length, bool* is_null) {
// Current row doesn't have any more cells. All remaining values are NULL.
if (cell_index_ >= num_cells_) {
*is_null = true;
return Status::OK();
}
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
jobject cell = env->GetObjectArrayElement(cells_, cell_index_);
RETURN_ERROR_IF_EXC(env);
if (!all_cells_present_) {
// Check family. If it doesn't match, we have a NULL value.
void* family_data;
int family_length;
RETURN_IF_ERROR(GetFamily(env, cell, &family_data, &family_length));
if (CompareStrings(family, family_data, family_length) != 0) {
*is_null = true;
return Status::OK();
}
// Check qualifier. If it doesn't match, we have a NULL value.
void* qualifier_data;
int qualifier_length;
RETURN_IF_ERROR(GetQualifier(env, cell, &qualifier_data, &qualifier_length));
if (CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) {
*is_null = true;
return Status::OK();
}
}
RETURN_IF_ERROR(GetValue(env, cell, data, length));
*is_null = false;
return Status::OK();
}
Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
const string& qualifier, void** value, int* value_length) {
bool is_null;
RETURN_IF_ERROR(GetCurrentValue(env, family, qualifier, value, value_length, &is_null));
RETURN_ERROR_IF_EXC(env);
if (is_null) {
*value = NULL;
*value_length = 0;
return Status::OK();
}
++cell_index_;
return Status::OK();
}
Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
const string& qualifier, const SlotDescriptor* slot_desc, Tuple* tuple) {
void* value;
int value_length;
bool is_null;
RETURN_IF_ERROR(
GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null));
RETURN_ERROR_IF_EXC(env);
if (is_null) {
tuple->SetNull(slot_desc->null_indicator_offset());
return Status::OK();
}
DCHECK_EQ(value_length, slot_desc->type().GetByteSize());
WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(value));
++cell_index_;
return Status::OK();
}
int HBaseTableScanner::CompareStrings(const string& s, void* data, int length) {
int slength = static_cast<int>(s.length());
if (slength == 0 && length == 0) return 0;
if (length == 0) return 1;
if (slength == 0) return -1;
int result = memcmp(s.data(), reinterpret_cast<char*>(data), min(slength, length));
if (result == 0 && slength != length) {
return (slength < length ? -1 : 1);
} else {
return result;
}
}
void HBaseTableScanner::Close(JNIEnv* env) {
if (resultscanner_ != NULL) {
// resultscanner_.close();
env->CallObjectMethod(resultscanner_, resultscanner_close_id_);
// Manually check if the ResultScanner timed out so that we can log a less scary
// and more specific message.
jthrowable exc = env->ExceptionOccurred();
if (exc != NULL) {
if (env->IsInstanceOf(exc, scanner_timeout_ex_cl_) == JNI_TRUE) {
env->ExceptionClear();
LOG(INFO) << "ResultScanner timed out before it was closed "
<< "(this does not necessarily indicate a problem)";
} else {
// GetJniExceptionMsg will clear the exception status and log
Status status = JniUtil::GetJniExceptionMsg(env, true,
"Unknown error occurred while closing ResultScanner: ");
if (!status.ok()) LOG(WARNING) << "Error closing ResultScanner()";
}
}
env->DeleteGlobalRef(resultscanner_);
resultscanner_ = NULL;
}
if (scan_ != NULL) env->DeleteGlobalRef(scan_);
if (cells_ != NULL) env->DeleteGlobalRef(cells_);
// Close the HTable so that the connections are not kept around.
if (htable_.get() != NULL) htable_->Close(state_);
value_pool_->FreeAll();
}