blob: 5445ec7259f6c64fd4de848e460dd6ad5f6e9c16 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/query-result-set.h"
#include <sstream>
#include <boost/scoped_ptr.hpp>
#include "exprs/scalar-expr-evaluator.h"
#include "rpc/thrift-util.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/types.h"
#include "service/hs2-util.h"
#include "util/bit-util.h"
#include "common/names.h"
using ThriftTColumn = apache::hive::service::cli::thrift::TColumn;
using ThriftTColumnValue = apache::hive::service::cli::thrift::TColumnValue;
using apache::hive::service::cli::thrift::TProtocolVersion;
using apache::hive::service::cli::thrift::TRow;
using apache::hive::service::cli::thrift::TRowSet;
namespace {
/// Ascii output precision for double/float
constexpr int ASCII_PRECISION = 16;
}
namespace impala {
/// Ascii result set for Beeswax. Rows are returned in ascii text encoding, using "\t" as
/// column delimiter.
class AsciiQueryResultSet : public QueryResultSet {
public:
/// Rows are added into 'rowset'.
AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
: metadata_(metadata), result_set_(rowset) {}
virtual ~AsciiQueryResultSet() {}
/// Evaluate 'expr_evals' over rows in 'batch', convert to ASCII using "\t" as column
/// delimiter and store it in this result set.
/// TODO: Handle complex types.
virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
int start_idx, int num_rows) override;
/// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this
/// result set.
virtual Status AddOneRow(const TResultRow& row) override;
virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
virtual int64_t ByteSize(int start_idx, int num_rows) override;
virtual size_t size() override { return result_set_->size(); }
private:
/// Metadata of the result set
const TResultSetMetadata& metadata_;
/// Points to the result set to be filled. Not owned by this object.
vector<string>* result_set_;
};
/// Result set container for Hive protocol versions >= V6, where results are returned in
/// column-orientation.
class HS2ColumnarResultSet : public QueryResultSet {
public:
HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
virtual ~HS2ColumnarResultSet() {}
/// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 columnar
/// representation.
virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
int start_idx, int num_rows) override;
/// Add a row from a TResultRow
virtual Status AddOneRow(const TResultRow& row) override;
/// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
/// from 'other' into this result set
virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
virtual int64_t ByteSize(int start_idx, int num_rows) override;
virtual size_t size() override { return num_rows_; }
private:
/// Metadata of the result set
const TResultSetMetadata& metadata_;
/// Points to the TRowSet to be filled. The row set
/// this points to may be owned by
/// this object, in which case owned_result_set_ is set.
TRowSet* result_set_;
/// Set to result_set_ if result_set_ is owned.
boost::scoped_ptr<TRowSet> owned_result_set_;
int64_t num_rows_;
void InitColumns();
};
/// Row oriented result set for HiveServer2, used to serve HS2 requests with protocol
/// version <= V5.
class HS2RowOrientedResultSet : public QueryResultSet {
public:
/// Rows are added into rowset.
HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset);
virtual ~HS2RowOrientedResultSet() {}
/// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 row-oriented
/// representation of TRows stored in a TRowSet.
virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch,
int start_idx, int num_rows) override;
/// Convert TResultRow to HS2 TRow and store it in a TRowSet
virtual Status AddOneRow(const TResultRow& row) override;
virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override;
virtual int64_t ByteSize(int start_idx, int num_rows) override;
virtual size_t size() override { return result_set_->rows.size(); }
private:
/// Metadata of the result set
const TResultSetMetadata& metadata_;
/// Points to the TRowSet to be filled. The row set
/// this points to may be owned by
/// this object, in which case owned_result_set_ is set.
TRowSet* result_set_;
/// Set to result_set_ if result_set_ is owned.
scoped_ptr<TRowSet> owned_result_set_;
};
QueryResultSet* QueryResultSet::CreateAsciiQueryResultSet(
const TResultSetMetadata& metadata, vector<string>* rowset) {
return new AsciiQueryResultSet(metadata, rowset);
}
QueryResultSet* QueryResultSet::CreateHS2ResultSet(
TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
return new HS2RowOrientedResultSet(metadata, rowset);
} else {
return new HS2ColumnarResultSet(metadata, rowset);
}
}
//////////////////////////////////////////////////////////////////////////////////////////
Status AsciiQueryResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
RowBatch* batch, int start_idx, int num_rows) {
DCHECK_GE(batch->num_rows(), start_idx + num_rows);
int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
vector<int> scales;
scales.reserve(num_col);
for (ScalarExprEvaluator* expr_eval : expr_evals) {
scales.push_back(expr_eval->output_scale());
}
// Round up to power-of-two to avoid accidentally quadratic behaviour from repeated
// small increases in size.
result_set_->reserve(
BitUtil::RoundUpToPowerOfTwo(result_set_->size() + num_rows - start_idx));
stringstream out_stream;
out_stream.precision(ASCII_PRECISION);
FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
for (int i = 0; i < num_col; ++i) {
// ODBC-187 - ODBC can only take "\t" as the delimiter
out_stream << (i > 0 ? "\t" : "");
DCHECK_EQ(1, metadata_.columns[i].columnType.types.size());
RawValue::PrintValue(expr_evals[i]->GetValue(it.Get()),
ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i],
&out_stream);
}
result_set_->push_back(out_stream.str());
out_stream.str("");
}
return Status::OK();
}
Status AsciiQueryResultSet::AddOneRow(const TResultRow& row) {
int num_col = row.colVals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
stringstream out_stream;
out_stream.precision(ASCII_PRECISION);
for (int i = 0; i < num_col; ++i) {
// ODBC-187 - ODBC can only take "\t" as the delimiter
if (i > 0) out_stream << '\t';
PrintTColumnValue(out_stream, row.colVals[i]);
}
result_set_->push_back(out_stream.str());
return Status::OK();
}
int AsciiQueryResultSet::AddRows(
const QueryResultSet* other, int start_idx, int num_rows) {
const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other);
if (start_idx >= o->result_set_->size()) return 0;
const int rows_added =
min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx);
result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx,
o->result_set_->begin() + start_idx + rows_added);
return rows_added;
}
int64_t AsciiQueryResultSet::ByteSize(int start_idx, int num_rows) {
int64_t bytes = 0;
const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx);
for (int i = start_idx; i < start_idx + end; ++i) {
bytes += sizeof(result_set_[i]) + result_set_[i].capacity();
}
return bytes;
}
////////////////////////////////////////////////////////////////////////////////
namespace {
// Utility functions for computing the size of HS2 Thrift structs in bytes.
inline int64_t ByteSize(const ThriftTColumnValue& val) {
return sizeof(val) + val.stringVal.value.capacity();
}
int64_t ByteSize(const TRow& row) {
int64_t bytes = sizeof(row);
for (const ThriftTColumnValue& c : row.colVals) {
bytes += ByteSize(c);
}
return bytes;
}
// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those
// values in the range [start_idx, end_idx).
uint32_t TColumnByteSize(const ThriftTColumn& col, uint32_t start_idx, uint32_t end_idx) {
DCHECK_LE(start_idx, end_idx);
uint32_t num_rows = end_idx - start_idx;
if (num_rows == 0) return 0L;
if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size();
if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size();
if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size();
if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size();
if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size();
if (col.__isset.doubleVal) {
return (num_rows * sizeof(double)) + col.doubleVal.nulls.size();
}
if (col.__isset.stringVal) {
uint32_t bytes = 0;
for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size();
return bytes + col.stringVal.nulls.size();
}
return 0;
}
}
// Result set container for Hive protocol versions >= V6, where results are returned in
// column-orientation.
HS2ColumnarResultSet::HS2ColumnarResultSet(
const TResultSetMetadata& metadata, TRowSet* rowset)
: metadata_(metadata), result_set_(rowset), num_rows_(0) {
if (rowset == NULL) {
owned_result_set_.reset(new TRowSet());
result_set_ = owned_result_set_.get();
}
InitColumns();
}
Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
RowBatch* batch, int start_idx, int num_rows) {
DCHECK_GE(batch->num_rows(), start_idx + num_rows);
int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
for (int i = 0; i < num_col; ++i) {
const TColumnType& type = metadata_.columns[i].columnType;
ScalarExprEvaluator* expr_eval = expr_evals[i];
ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_,
&(result_set_->columns[i]));
}
num_rows_ += num_rows;
return Status::OK();
}
// Add a row from a TResultRow
Status HS2ColumnarResultSet::AddOneRow(const TResultRow& row) {
int num_col = row.colVals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
for (int i = 0; i < num_col; ++i) {
TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_,
&(result_set_->columns[i]));
}
++num_rows_;
return Status::OK();
}
// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows'
// from 'other' into this result set
int HS2ColumnarResultSet::AddRows(
const QueryResultSet* other, int start_idx, int num_rows) {
const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other);
DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size());
if (start_idx >= o->num_rows_) return 0;
const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx);
for (int j = 0; j < metadata_.columns.size(); ++j) {
ThriftTColumn* from = &o->result_set_->columns[j];
ThriftTColumn* to = &result_set_->columns[j];
switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
case TPrimitiveType::NULL_TYPE:
case TPrimitiveType::BOOLEAN:
StitchNulls(
num_rows_, rows_added, start_idx, from->boolVal.nulls, &(to->boolVal.nulls));
to->boolVal.values.insert(to->boolVal.values.end(),
from->boolVal.values.begin() + start_idx,
from->boolVal.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::TINYINT:
StitchNulls(
num_rows_, rows_added, start_idx, from->byteVal.nulls, &(to->byteVal.nulls));
to->byteVal.values.insert(to->byteVal.values.end(),
from->byteVal.values.begin() + start_idx,
from->byteVal.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::SMALLINT:
StitchNulls(
num_rows_, rows_added, start_idx, from->i16Val.nulls, &(to->i16Val.nulls));
to->i16Val.values.insert(to->i16Val.values.end(),
from->i16Val.values.begin() + start_idx,
from->i16Val.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::INT:
StitchNulls(
num_rows_, rows_added, start_idx, from->i32Val.nulls, &(to->i32Val.nulls));
to->i32Val.values.insert(to->i32Val.values.end(),
from->i32Val.values.begin() + start_idx,
from->i32Val.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::BIGINT:
StitchNulls(
num_rows_, rows_added, start_idx, from->i64Val.nulls, &(to->i64Val.nulls));
to->i64Val.values.insert(to->i64Val.values.end(),
from->i64Val.values.begin() + start_idx,
from->i64Val.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::FLOAT:
case TPrimitiveType::DOUBLE:
StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls,
&(to->doubleVal.nulls));
to->doubleVal.values.insert(to->doubleVal.values.end(),
from->doubleVal.values.begin() + start_idx,
from->doubleVal.values.begin() + start_idx + rows_added);
break;
case TPrimitiveType::TIMESTAMP:
case TPrimitiveType::DATE:
case TPrimitiveType::DECIMAL:
case TPrimitiveType::STRING:
case TPrimitiveType::VARCHAR:
case TPrimitiveType::CHAR:
StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls,
&(to->stringVal.nulls));
to->stringVal.values.insert(to->stringVal.values.end(),
from->stringVal.values.begin() + start_idx,
from->stringVal.values.begin() + start_idx + rows_added);
break;
default:
DCHECK(false) << "Unsupported type: "
<< TypeToString(ThriftToType(
metadata_.columns[j].columnType.types[0].scalar_type.type));
break;
}
}
num_rows_ += rows_added;
return rows_added;
}
int64_t HS2ColumnarResultSet::ByteSize(int start_idx, int num_rows) {
const int end = min(start_idx + num_rows, (int)size());
int64_t bytes = 0L;
for (const ThriftTColumn& c : result_set_->columns) {
bytes += TColumnByteSize(c, start_idx, end);
}
return bytes;
}
void HS2ColumnarResultSet::InitColumns() {
result_set_->__isset.columns = true;
for (const TColumn& col : metadata_.columns) {
DCHECK(col.columnType.types.size() == 1)
<< "Structured columns unsupported in HS2 interface";
ThriftTColumn column;
switch (col.columnType.types[0].scalar_type.type) {
case TPrimitiveType::NULL_TYPE:
case TPrimitiveType::BOOLEAN:
column.__isset.boolVal = true;
break;
case TPrimitiveType::TINYINT:
column.__isset.byteVal = true;
break;
case TPrimitiveType::SMALLINT:
column.__isset.i16Val = true;
break;
case TPrimitiveType::INT:
column.__isset.i32Val = true;
break;
case TPrimitiveType::BIGINT:
column.__isset.i64Val = true;
break;
case TPrimitiveType::FLOAT:
case TPrimitiveType::DOUBLE:
column.__isset.doubleVal = true;
break;
case TPrimitiveType::TIMESTAMP:
case TPrimitiveType::DATE:
case TPrimitiveType::DECIMAL:
case TPrimitiveType::VARCHAR:
case TPrimitiveType::CHAR:
case TPrimitiveType::STRING:
column.__isset.stringVal = true;
break;
default:
DCHECK(false) << "Unhandled column type: "
<< TypeToString(
ThriftToType(col.columnType.types[0].scalar_type.type));
}
result_set_->columns.push_back(column);
}
}
HS2RowOrientedResultSet::HS2RowOrientedResultSet(
const TResultSetMetadata& metadata, TRowSet* rowset)
: metadata_(metadata), result_set_(rowset) {
if (rowset == NULL) {
owned_result_set_.reset(new TRowSet());
result_set_ = owned_result_set_.get();
}
}
Status HS2RowOrientedResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals,
RowBatch* batch, int start_idx, int num_rows) {
DCHECK_GE(batch->num_rows(), start_idx + num_rows);
int num_col = expr_evals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
result_set_->rows.reserve(
BitUtil::RoundUpToPowerOfTwo(result_set_->rows.size() + num_rows - start_idx));
FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
result_set_->rows.push_back(TRow());
TRow& trow = result_set_->rows.back();
trow.colVals.resize(num_col);
for (int i = 0; i < num_col; ++i) {
ExprValueToHS2TColumnValue(expr_evals[i]->GetValue(it.Get()),
metadata_.columns[i].columnType, &(trow.colVals[i]));
}
}
return Status::OK();
}
Status HS2RowOrientedResultSet::AddOneRow(const TResultRow& row) {
int num_col = row.colVals.size();
DCHECK_EQ(num_col, metadata_.columns.size());
result_set_->rows.push_back(TRow());
TRow& trow = result_set_->rows.back();
trow.colVals.resize(num_col);
for (int i = 0; i < num_col; ++i) {
TColumnValueToHS2TColumnValue(
row.colVals[i], metadata_.columns[i].columnType, &(trow.colVals[i]));
}
return Status::OK();
}
int HS2RowOrientedResultSet::AddRows(
const QueryResultSet* other, int start_idx, int num_rows) {
const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other);
if (start_idx >= o->result_set_->rows.size()) return 0;
const int rows_added =
min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx);
for (int i = start_idx; i < start_idx + rows_added; ++i) {
result_set_->rows.push_back(o->result_set_->rows[i]);
}
return rows_added;
}
int64_t HS2RowOrientedResultSet::ByteSize(int start_idx, int num_rows) {
int64_t bytes = 0;
const int end =
min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx);
for (int i = start_idx; i < start_idx + end; ++i) {
bytes += impala::ByteSize(result_set_->rows[i]);
}
return bytes;
}
}