blob: 0d9abe2b528ed68d508053762ba1c9828e43b048 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HBASE_TABLE_SCANNER_H
#define IMPALA_EXEC_HBASE_TABLE_SCANNER_H
#include <boost/scoped_ptr.hpp>
#include <jni.h>
#include <string>
#include <sstream>
#include <vector>
#include "gen-cpp/PlanNodes_types.h"
#include "exec/scan-node.h"
#include "runtime/hbase-table-factory.h"
#include "runtime/hbase-table.h"
namespace impala {
class TupleDescriptor;
class Tuple;
class RuntimeState;
class MemPool;
class Status;
class HBaseScanNode;
/// JNI wrapper class implementing minimal functionality for scanning an HBase table.
/// Caching behavior is tuned by setting hbase.client.Scan.setCaching() and
/// hbase.client.setCacheBlocks().
//
/// hbase.client.setCacheBlocks() is controlled by query option hbase_cache_blocks. When
/// set to true, HBase region server will cache the blocks. Subsequent retrieval of the
/// same data will be faster. If the table is large and the query is doing big scan, it
/// should be set to false to avoid polluting the cache in the hbase region server. On the
/// other hand, if the table is small and will be used several time, set it to true
/// to improve query performance.
//
/// hbase.client.Scan.setCaching() is DEFAULT_ROWS_CACHED by default. This value controls
/// the number of rows batched together when fetching from a HBase region server. Having a
/// high value will put more memory pressure on the HBase region server and having a small
/// value will cause extra round trips to the HBase region server. This value can
/// be overridden by the query option hbase_caching. FE will also suggest a max value such
/// that it won't put too much memory pressure on the region server.
//
/// HBase version compatibility: This code supports HBase 1.0 and HBase 2.0 APIs. It
/// uses the Cell class for result rows rather than the older KeyValue class, which
/// limits support to HBase >= 0.95.2. The code handles some minor incompatibilities
/// across versions:
/// 1. Scan.setCaching() and Scan.setCacheBlocks() tolerate the older void return value
/// as well as the newer Scan return value (See HBASE-10841).
/// 2. ScannerTimeoutException has been removed in HBase 2.0, as HBase has a heartbeat
/// to prevent timeout. HBase 2.0 will reset the scanner and retry rather than
/// throwing an exception. See HBASE-16266 and HBASE-17809.
/// If ScannerTimeoutException does not exist, then HandleResultScannerTimeout()
/// simply checks for an exception and returns any error via status.
/// Both are detected and handled in Init().
//
/// Note: When none of the requested family/qualifiers exist in a particular row,
/// HBase will not return the row at all, leading to "missing" NULL values.
/// TODO: Related to filtering, there is a special filter that allows only selecting the
/// cells. Currently, if only the row key is requested
/// all cells are fetched from HBase (since there is no family/qualifier
/// restriction).
/// TODO: Enable time travel.
class HBaseTableScanner {
public:
/// Initialize all members to NULL, except ScanNode and HTable cache
/// scan_node is the enclosing hbase scan node and its performance counter will be
/// updated.
HBaseTableScanner(HBaseScanNode* scan_node, HBaseTableFactory* htable_factory,
RuntimeState* state);
/// JNI setup. Create global references to classes,
/// and find method ids.
static Status Init() WARN_UNUSED_RESULT;
/// HBase scan range; "" means unbounded
class ScanRange {
public:
ScanRange()
: start_key_(),
stop_key_() {
}
const std::string& start_key() const { return start_key_; }
const std::string& stop_key() const {return stop_key_; }
void set_start_key(const std::string& key) { start_key_ = key; }
void set_stop_key(const std::string& key) { stop_key_ = key; }
/// Write debug string of this ScanRange into out.
void DebugString(int indentation_level, std::stringstream* out);
private:
std::string start_key_;
std::string stop_key_;
};
typedef std::vector<ScanRange> ScanRangeVector;
/// Perform a table scan, retrieving the families/qualifiers referenced in tuple_desc.
/// If start_/stop_key is not empty, is used for the corresponding role in the scan.
/// Note: scan_range_vector cannot be modified for the duration of the scan.
Status StartScan(JNIEnv* env, const TupleDescriptor* tuple_desc,
const ScanRangeVector& scan_range_vector,
const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT;
/// Position cursor to next row. Sets has_next to true if more rows exist, false
/// otherwise.
/// Returns non-ok status if an error occurred.
Status Next(JNIEnv* env, bool* has_next) WARN_UNUSED_RESULT;
/// Get the current HBase row key.
Status GetRowKey(JNIEnv* env, void** key, int* key_length) WARN_UNUSED_RESULT;
/// Write the current HBase row key into the tuple slot.
/// This is used for retrieving binary encoded data directly into the tuple.
Status GetRowKey(
JNIEnv* env, const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT;
/// Used to fetch HBase values in order of family/qualifier.
/// Fetch the next value matching family and qualifier into value/value_length.
/// If there is no match, value is set to NULL and value_length to 0.
Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier,
void** value, int* value_length) WARN_UNUSED_RESULT;
/// Used to fetch HBase values in order of family/qualifier.
/// Fetch the next value matching family and qualifier into the tuple slot.
/// If there is no match, the tuple slot is set to null.
/// This is used for retrieving binary encoded data directly into the tuple.
Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier,
const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT;
/// Close HTable and ResultScanner.
void Close(JNIEnv* env);
void set_num_requested_cells(int num_requested_cells) {
num_requested_cells_ = num_requested_cells;
}
private:
static const int DEFAULT_ROWS_CACHED = 1024;
/// The enclosing HBaseScanNode.
HBaseScanNode* scan_node_;
RuntimeState* state_;
/// Global class references created with JniUtil.
static jclass scan_cl_;
static jclass resultscanner_cl_;
static jclass result_cl_;
static jclass cell_cl_;
static jclass hconstants_cl_;
static jclass filter_list_cl_;
static jclass filter_list_op_cl_;
static jclass single_column_value_filter_cl_;
static jclass compare_op_cl_;
/// Exception thrown when a ResultScanner times out. ScannerTimeoutException was
/// removed in HBase 2.0. In this case, scanner_timeout_ex_cl_ is null and HBase
/// will not throw this exception.
static jclass scanner_timeout_ex_cl_;
static jmethodID scan_ctor_;
static jmethodID scan_set_max_versions_id_;
static jmethodID scan_set_caching_id_;
static jmethodID scan_set_cache_blocks_id_;
static jmethodID scan_add_column_id_;
static jmethodID scan_set_filter_id_;
static jmethodID scan_set_start_row_id_;
static jmethodID scan_set_stop_row_id_;
static jmethodID resultscanner_next_id_;
static jmethodID resultscanner_close_id_;
static jmethodID result_isempty_id_;
static jmethodID result_raw_cells_id_;
static jmethodID cell_get_row_array_;
static jmethodID cell_get_family_array_;
static jmethodID cell_get_qualifier_array_;
static jmethodID cell_get_value_array_;
static jmethodID cell_get_family_offset_id_;
static jmethodID cell_get_family_length_id_;
static jmethodID cell_get_qualifier_offset_id_;
static jmethodID cell_get_qualifier_length_id_;
static jmethodID cell_get_row_offset_id_;
static jmethodID cell_get_row_length_id_;
static jmethodID cell_get_value_offset_id_;
static jmethodID cell_get_value_length_id_;
static jmethodID filter_list_ctor_;
static jmethodID filter_list_add_filter_id_;
static jmethodID single_column_value_filter_ctor_;
static jobject empty_row_;
static jobject must_pass_all_op_;
static jobjectArray compare_ops_;
/// HBase Table factory from runtime state.
HBaseTableFactory* htable_factory_;
/// Vector of ScanRange
const ScanRangeVector* scan_range_vector_;
int current_scan_range_idx_; // the index of the current scan range
/// C++ wrapper for HTable
boost::scoped_ptr<HBaseTable> htable_;
/// Instances related to scanning a table. Set in StartScan(). They are global references
/// because they cannot be automatically garbage collected by the JVM.
jobject scan_; // Java type Scan
jobject resultscanner_; // Java type ResultScanner
/// Helper members for retrieving results from a scan. Updated in Next() and
/// used by GetRowKey() and GetValue(). Result of resultscanner_.next().raw()
/// Java type Cell[] or KeyValue[] depending on HBase version.
jobjectArray cells_;
/// Current position in cells_. Incremented in NextValue(). Reset in Next().
int cell_index_;
/// Number of requested cells (i.e., the number of added family/qualifier pairs).
/// Set in StartScan().
int num_requested_cells_;
/// number of cols requested in addition to num_requested_cells_, to work around
/// hbase bug
int num_addl_requested_cols_;
/// Number of cells returned from last result_.raw().
int num_cells_;
/// Indicates whether all requested cells are present in the current cells_.
/// If set to true, all family/qualifier comparisons are avoided in NextValue().
bool all_cells_present_;
/// Pool for allocating keys/values retrieved from HBase.
/// Memory allocated from this pool is valid until the following Next().
boost::scoped_ptr<MemPool> value_pool_;
/// Number of rows for caching that will be passed to scanners.
/// Set in the HBase call Scan.setCaching();
int rows_cached_;
/// True if the scanner should set Scan.setCacheBlocks to true.
bool cache_blocks_;
/// HBase specific counters
RuntimeProfile::Counter* scan_setup_timer_;
/// Checks for and handles a ScannerTimeoutException which is thrown if the
/// ResultScanner times out. If a timeout occurs, the ResultScanner is re-created
/// (with the scan range adjusted if some results have already been returned) and
/// the exception is cleared. If any other exception is thrown, the error message
/// is returned in the status. In HBase 2.0, ScannerTimeoutException no longer
/// exists and the error message is returned in the status.
/// 'timeout' is true if a ScannerTimeoutException was thrown, false otherwise.
Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout) WARN_UNUSED_RESULT;
/// Lexicographically compares s with the string in data having given length.
/// Returns a value > 0 if s is greater, a value < 0 if s is smaller,
/// and 0 if they are equal.
int CompareStrings(const std::string& s, void* data, int length);
/// Turn strings into Java byte array.
Status CreateByteArray(
JNIEnv* env, const std::string& s, jbyteArray* bytes) WARN_UNUSED_RESULT;
/// First time scanning the table, do some setup
Status ScanSetup(JNIEnv* env, const TupleDescriptor* tuple_desc,
const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT;
/// Initialize the scan to the given range
Status InitScanRange(JNIEnv* env, const ScanRange& scan_range) WARN_UNUSED_RESULT;
/// Initialize the scan range to the scan range specified by the start and end byte
/// arrays
Status InitScanRange(
JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes) WARN_UNUSED_RESULT;
/// Copies the row key of cell into value_pool_ and returns it via *data and *length.
/// Returns error status if memory limit is exceeded.
inline Status GetRowKey(
JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
/// Copies the column family of cell into value_pool_ and returns it
/// via *data and *length. Returns error status if memory limit is exceeded.
inline Status GetFamily(
JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
/// Copies the column qualifier of cell into value_pool_ and returns it
/// via *data and *length. Returns error status if memory limit is exceeded.
inline Status GetQualifier(
JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
/// Copies the value of cell into value_pool_ and returns it via *data and *length.
/// Returns error status if memory limit is exceeded.
inline Status GetValue(
JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
/// Returns the current value of cells_[cell_index_] in *data and *length
/// if its family/qualifier match the given family/qualifier.
/// Otherwise, sets *is_null to true indicating a mismatch in family or qualifier.
inline Status GetCurrentValue(JNIEnv* env, const std::string& family,
const std::string& qualifier, void** data, int* length,
bool* is_null) WARN_UNUSED_RESULT;
/// Write to a tuple slot with the given hbase binary formatted data, which is in
/// big endian.
/// Only boolean, tinyint, smallint, int, bigint, float and double should have binary
/// formatted data.
inline void WriteTupleSlot(const SlotDescriptor* slot_desc, Tuple* tuple, void* data);
};
} // namespace impala
#endif