blob: 5c473a3efb5d8d7eac7f340b1f5fbc93939f1443 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/wire_protocol.h"
#include <time.h>
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <ostream>
#include <string>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <google/protobuf/map.h>
#include <google/protobuf/stubs/common.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/fixedarray.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/faststring.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/safe_math.h"
#include "kudu/util/slice.h"
using google::protobuf::Map;
using google::protobuf::RepeatedPtrField;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using std::map;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
void StatusToPB(const Status& status, AppStatusPB* pb) {
pb->Clear();
bool is_unknown = false;
if (status.ok()) {
pb->set_code(AppStatusPB::OK);
// OK statuses don't have any message or posix code.
return;
} else if (status.IsNotFound()) {
pb->set_code(AppStatusPB::NOT_FOUND);
} else if (status.IsCorruption()) {
pb->set_code(AppStatusPB::CORRUPTION);
} else if (status.IsNotSupported()) {
pb->set_code(AppStatusPB::NOT_SUPPORTED);
} else if (status.IsInvalidArgument()) {
pb->set_code(AppStatusPB::INVALID_ARGUMENT);
} else if (status.IsIOError()) {
pb->set_code(AppStatusPB::IO_ERROR);
} else if (status.IsAlreadyPresent()) {
pb->set_code(AppStatusPB::ALREADY_PRESENT);
} else if (status.IsRuntimeError()) {
pb->set_code(AppStatusPB::RUNTIME_ERROR);
} else if (status.IsNetworkError()) {
pb->set_code(AppStatusPB::NETWORK_ERROR);
} else if (status.IsIllegalState()) {
pb->set_code(AppStatusPB::ILLEGAL_STATE);
} else if (status.IsNotAuthorized()) {
pb->set_code(AppStatusPB::NOT_AUTHORIZED);
} else if (status.IsAborted()) {
pb->set_code(AppStatusPB::ABORTED);
} else if (status.IsRemoteError()) {
pb->set_code(AppStatusPB::REMOTE_ERROR);
} else if (status.IsServiceUnavailable()) {
pb->set_code(AppStatusPB::SERVICE_UNAVAILABLE);
} else if (status.IsTimedOut()) {
pb->set_code(AppStatusPB::TIMED_OUT);
} else if (status.IsUninitialized()) {
pb->set_code(AppStatusPB::UNINITIALIZED);
} else if (status.IsConfigurationError()) {
pb->set_code(AppStatusPB::CONFIGURATION_ERROR);
} else if (status.IsIncomplete()) {
pb->set_code(AppStatusPB::INCOMPLETE);
} else if (status.IsEndOfFile()) {
pb->set_code(AppStatusPB::END_OF_FILE);
} else {
LOG(WARNING) << "Unknown error code translation from internal error "
<< status.ToString() << ": sending UNKNOWN_ERROR";
pb->set_code(AppStatusPB::UNKNOWN_ERROR);
is_unknown = true;
}
if (is_unknown) {
// For unknown status codes, include the original stringified error
// code.
pb->set_message(status.CodeAsString() + ": " +
status.message().ToString());
} else {
// Otherwise, just encode the message itself, since the other end
// will reconstruct the other parts of the ToString() response.
pb->set_message(status.message().ToString());
}
if (status.posix_code() != -1) {
pb->set_posix_code(status.posix_code());
}
}
Status StatusFromPB(const AppStatusPB& pb) {
int posix_code = pb.has_posix_code() ? pb.posix_code() : -1;
switch (pb.code()) {
case AppStatusPB::OK:
return Status::OK();
case AppStatusPB::NOT_FOUND:
return Status::NotFound(pb.message(), "", posix_code);
case AppStatusPB::CORRUPTION:
return Status::Corruption(pb.message(), "", posix_code);
case AppStatusPB::NOT_SUPPORTED:
return Status::NotSupported(pb.message(), "", posix_code);
case AppStatusPB::INVALID_ARGUMENT:
return Status::InvalidArgument(pb.message(), "", posix_code);
case AppStatusPB::IO_ERROR:
return Status::IOError(pb.message(), "", posix_code);
case AppStatusPB::ALREADY_PRESENT:
return Status::AlreadyPresent(pb.message(), "", posix_code);
case AppStatusPB::RUNTIME_ERROR:
return Status::RuntimeError(pb.message(), "", posix_code);
case AppStatusPB::NETWORK_ERROR:
return Status::NetworkError(pb.message(), "", posix_code);
case AppStatusPB::ILLEGAL_STATE:
return Status::IllegalState(pb.message(), "", posix_code);
case AppStatusPB::NOT_AUTHORIZED:
return Status::NotAuthorized(pb.message(), "", posix_code);
case AppStatusPB::ABORTED:
return Status::Aborted(pb.message(), "", posix_code);
case AppStatusPB::REMOTE_ERROR:
return Status::RemoteError(pb.message(), "", posix_code);
case AppStatusPB::SERVICE_UNAVAILABLE:
return Status::ServiceUnavailable(pb.message(), "", posix_code);
case AppStatusPB::TIMED_OUT:
return Status::TimedOut(pb.message(), "", posix_code);
case AppStatusPB::UNINITIALIZED:
return Status::Uninitialized(pb.message(), "", posix_code);
case AppStatusPB::CONFIGURATION_ERROR:
return Status::ConfigurationError(pb.message(), "", posix_code);
case AppStatusPB::INCOMPLETE:
return Status::Incomplete(pb.message(), "", posix_code);
case AppStatusPB::END_OF_FILE:
return Status::EndOfFile(pb.message(), "", posix_code);
case AppStatusPB::UNKNOWN_ERROR:
default:
LOG(WARNING) << "Unknown error code in status: " << SecureShortDebugString(pb);
return Status::RuntimeError("(unknown error code)", pb.message(), posix_code);
}
}
Status HostPortToPB(const HostPort& host_port, HostPortPB* host_port_pb) {
host_port_pb->set_host(host_port.host());
host_port_pb->set_port(host_port.port());
return Status::OK();
}
Status HostPortFromPB(const HostPortPB& host_port_pb, HostPort* host_port) {
host_port->set_host(host_port_pb.host());
host_port->set_port(host_port_pb.port());
return Status::OK();
}
Status AddHostPortPBs(const vector<Sockaddr>& addrs,
RepeatedPtrField<HostPortPB>* pbs) {
for (const Sockaddr& addr : addrs) {
HostPortPB* pb = pbs->Add();
if (addr.IsWildcard()) {
RETURN_NOT_OK(GetFQDN(pb->mutable_host()));
} else {
pb->set_host(addr.host());
}
pb->set_port(addr.port());
}
return Status::OK();
}
Status SchemaToPB(const Schema& schema, SchemaPB *pb, int flags) {
pb->Clear();
return SchemaToColumnPBs(schema, pb->mutable_columns(), flags);
}
Status SchemaFromPB(const SchemaPB& pb, Schema *schema) {
return ColumnPBsToSchema(pb.columns(), schema);
}
void ColumnSchemaToPB(const ColumnSchema& col_schema, ColumnSchemaPB *pb, int flags) {
pb->Clear();
pb->set_name(col_schema.name());
pb->set_is_nullable(col_schema.is_nullable());
DataType type = col_schema.type_info()->type();
pb->set_type(type);
// Only serialize precision and scale for decimal types.
if (type == DataType::DECIMAL32 ||
type == DataType::DECIMAL64 ||
type == DataType::DECIMAL128) {
pb->mutable_type_attributes()->set_precision(col_schema.type_attributes().precision);
pb->mutable_type_attributes()->set_scale(col_schema.type_attributes().scale);
} else if (type == DataType::VARCHAR) {
pb->mutable_type_attributes()->set_length(col_schema.type_attributes().length);
}
if (!(flags & SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES)) {
pb->set_encoding(col_schema.attributes().encoding);
pb->set_compression(col_schema.attributes().compression);
pb->set_cfile_block_size(col_schema.attributes().cfile_block_size);
}
if (col_schema.has_read_default()) {
if (col_schema.type_info()->physical_type() == BINARY) {
const Slice *read_slice = static_cast<const Slice *>(col_schema.read_default_value());
pb->set_read_default_value(read_slice->data(), read_slice->size());
} else {
const void *read_value = col_schema.read_default_value();
pb->set_read_default_value(read_value, col_schema.type_info()->size());
}
}
if (col_schema.has_write_default() && !(flags & SCHEMA_PB_WITHOUT_WRITE_DEFAULT)) {
if (col_schema.type_info()->physical_type() == BINARY) {
const Slice *write_slice = static_cast<const Slice *>(col_schema.write_default_value());
pb->set_write_default_value(write_slice->data(), write_slice->size());
} else {
const void *write_value = col_schema.write_default_value();
pb->set_write_default_value(write_value, col_schema.type_info()->size());
}
}
if (!col_schema.comment().empty() && !(flags & SCHEMA_PB_WITHOUT_COMMENT)) {
pb->set_comment(col_schema.comment());
}
}
Status ColumnSchemaFromPB(const ColumnSchemaPB& pb, boost::optional<ColumnSchema>* col_schema) {
const void *write_default_ptr = nullptr;
const void *read_default_ptr = nullptr;
Slice write_default;
Slice read_default;
const TypeInfo* typeinfo = GetTypeInfo(pb.type());
if (pb.has_read_default_value()) {
read_default = Slice(pb.read_default_value());
if (typeinfo->physical_type() == BINARY) {
read_default_ptr = &read_default;
} else {
if (typeinfo->size() > read_default.size()) {
return Status::Corruption(
Substitute("Not enough bytes for $0: read default size ($1) less than type size ($2)",
typeinfo->name(), read_default.size(), typeinfo->size()));
}
read_default_ptr = read_default.data();
}
}
if (pb.has_write_default_value()) {
write_default = Slice(pb.write_default_value());
if (typeinfo->physical_type() == BINARY) {
write_default_ptr = &write_default;
} else {
if (typeinfo->size() > write_default.size()) {
return Status::Corruption(
Substitute("Not enough bytes for $0: write default size ($1) less than type size ($2)",
typeinfo->name(), write_default.size(), typeinfo->size()));
}
write_default_ptr = write_default.data();
}
}
ColumnTypeAttributes type_attributes;
if (pb.has_type_attributes()) {
const ColumnTypeAttributesPB& typeAttributesPB = pb.type_attributes();
if (typeAttributesPB.has_precision()) {
type_attributes.precision = typeAttributesPB.precision();
}
if (typeAttributesPB.has_scale()) {
type_attributes.scale = typeAttributesPB.scale();
}
if (typeAttributesPB.has_length()) {
type_attributes.length = typeAttributesPB.length();
}
}
ColumnStorageAttributes attributes;
if (pb.has_encoding()) {
attributes.encoding = pb.encoding();
}
if (pb.has_compression()) {
attributes.compression = pb.compression();
}
if (pb.has_cfile_block_size()) {
attributes.cfile_block_size = pb.cfile_block_size();
}
// According to the URL below, the default value for strings that are optional
// in protobuf is the empty string. So, it's safe to use pb.comment() directly
// regardless of whether has_comment() is true or false.
// https://developers.google.com/protocol-buffers/docs/proto#optional
*col_schema = ColumnSchema(pb.name(), pb.type(), pb.is_nullable(),
read_default_ptr, write_default_ptr,
attributes, type_attributes, pb.comment());
return Status::OK();
}
void ColumnSchemaDeltaToPB(const ColumnSchemaDelta& col_delta, ColumnSchemaDeltaPB *pb) {
pb->Clear();
pb->set_name(col_delta.name);
if (col_delta.new_name) {
pb->set_new_name(*col_delta.new_name);
}
if (col_delta.default_value) {
pb->set_default_value(col_delta.default_value->data(),
col_delta.default_value->size());
}
if (col_delta.remove_default) {
pb->set_remove_default(true);
}
if (col_delta.encoding) {
pb->set_encoding(*col_delta.encoding);
}
if (col_delta.compression) {
pb->set_compression(*col_delta.compression);
}
if (col_delta.cfile_block_size) {
pb->set_block_size(*col_delta.cfile_block_size);
}
if (col_delta.new_comment) {
pb->set_new_comment(*col_delta.new_comment);
}
}
ColumnSchemaDelta ColumnSchemaDeltaFromPB(const ColumnSchemaDeltaPB& pb) {
ColumnSchemaDelta col_delta(pb.name());
if (pb.has_new_name()) {
col_delta.new_name = boost::optional<string>(pb.new_name());
}
if (pb.has_default_value()) {
col_delta.default_value = boost::optional<Slice>(Slice(pb.default_value()));
}
if (pb.has_remove_default()) {
col_delta.remove_default = true;
}
if (pb.has_encoding()) {
col_delta.encoding = boost::optional<EncodingType>(pb.encoding());
}
if (pb.has_compression()) {
col_delta.compression = boost::optional<CompressionType>(pb.compression());
}
if (pb.has_block_size()) {
col_delta.cfile_block_size = boost::optional<int32_t>(pb.block_size());
}
if (pb.has_new_comment()) {
col_delta.new_comment = boost::optional<string>(pb.new_comment());
}
return col_delta;
}
Status ColumnPBsToSchema(const RepeatedPtrField<ColumnSchemaPB>& column_pbs,
Schema* schema) {
vector<ColumnSchema> columns;
vector<ColumnId> column_ids;
columns.reserve(column_pbs.size());
int num_key_columns = 0;
bool is_handling_key = true;
for (const ColumnSchemaPB& pb : column_pbs) {
boost::optional<ColumnSchema> column;
RETURN_NOT_OK(ColumnSchemaFromPB(pb, &column));
columns.emplace_back(std::move(*column));
if (pb.is_key()) {
if (!is_handling_key) {
return Status::InvalidArgument(
"Got out-of-order key column", SecureShortDebugString(pb));
}
num_key_columns++;
} else {
is_handling_key = false;
}
if (pb.has_id()) {
column_ids.emplace_back(pb.id());
}
}
DCHECK_LE(num_key_columns, columns.size());
return schema->Reset(std::move(columns), std::move(column_ids), num_key_columns);
}
Status SchemaToColumnPBs(const Schema& schema,
RepeatedPtrField<ColumnSchemaPB>* cols,
int flags) {
cols->Clear();
int idx = 0;
for (const ColumnSchema& col : schema.columns()) {
ColumnSchemaPB* col_pb = cols->Add();
ColumnSchemaToPB(col, col_pb, flags);
col_pb->set_is_key(idx < schema.num_key_columns());
if (schema.has_column_ids() && !(flags & SCHEMA_PB_WITHOUT_IDS)) {
col_pb->set_id(schema.column_id(idx));
}
idx++;
}
return Status::OK();
}
namespace {
// Copies a predicate lower or upper bound from 'bound_src' into 'bound_dst'.
void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, string* bound_dst) {
const void* src;
size_t size;
if (col.type_info()->physical_type() == BINARY) {
// Copying a string involves an extra level of indirection through its
// owning slice.
const Slice* s = reinterpret_cast<const Slice*>(bound_src);
src = s->data();
size = s->size();
} else {
src = bound_src;
size = col.type_info()->size();
}
bound_dst->assign(reinterpret_cast<const char*>(src), size);
}
// Copies a predicate bloom filter data from 'bf_src' into 'bf_dst'.
void CopyPredicateBloomFilterToPB(const ColumnPredicate::BloomFilterInner& bf_src,
ColumnPredicatePB::BloomFilter* bf_dst) {
bf_dst->set_nhash(bf_src.nhash());
const void* src = bf_src.bloom_data().data();
size_t size = bf_src.bloom_data().size();
bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(src), size);
bf_dst->set_hash_algorithm(bf_src.hash_algorithm());
}
// Extract a void* pointer suitable for use in a ColumnRangePredicate from the
// string protobuf bound. This validates that the pb_value has the correct
// length, copies the data into 'arena', and sets *result to point to it.
// Returns bad status if the user-specified value is the wrong length.
Status CopyPredicateBoundFromPB(const ColumnSchema& schema,
const string& pb_value,
Arena* arena,
const void** result) {
// Copy the data from the protobuf into the Arena.
uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size()));
memcpy(data_copy, &pb_value[0], pb_value.size());
// If the type is of variable length, then we need to return a pointer to a Slice
// element pointing to the string. Otherwise, just verify that the provided
// value was the right size.
if (schema.type_info()->physical_type() == BINARY) {
*result = arena->NewObject<Slice>(data_copy, pb_value.size());
} else {
// TODO: add test case for this invalid request
size_t expected_size = schema.type_info()->size();
if (pb_value.size() != expected_size) {
return Status::InvalidArgument(
strings::Substitute("Bad predicate on $0. Expected value size $1, got $2",
schema.ToString(), expected_size, pb_value.size()));
}
*result = data_copy;
}
return Status::OK();
}
// Extract BloomFilterInner from bloom data for ColumnBloomFilterPredicate.
Status CopyPredicateBloomFilterFromPB(const ColumnPredicatePB::BloomFilter& bf_src,
ColumnPredicate::BloomFilterInner* dst_src,
Arena* arena) {
size_t bloom_data_size = bf_src.bloom_data().size();
dst_src->set_nhash(bf_src.nhash());
// Copy the data from the protobuf into the Arena.
uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(bloom_data_size));
memcpy(data_copy, bf_src.bloom_data().data(), bloom_data_size);
dst_src->set_bloom_data(Slice(data_copy, bloom_data_size));
dst_src->set_hash_algorithm(bf_src.hash_algorithm());
return Status::OK();
}
} // anonymous namespace
void ColumnPredicateToPB(const ColumnPredicate& predicate,
ColumnPredicatePB* pb) {
pb->set_column(predicate.column().name());
switch (predicate.predicate_type()) {
case PredicateType::Equality: {
CopyPredicateBoundToPB(predicate.column(),
predicate.raw_lower(),
pb->mutable_equality()->mutable_value());
return;
};
case PredicateType::Range: {
auto range_pred = pb->mutable_range();
if (predicate.raw_lower() != nullptr) {
CopyPredicateBoundToPB(predicate.column(),
predicate.raw_lower(),
range_pred->mutable_lower());
}
if (predicate.raw_upper() != nullptr) {
CopyPredicateBoundToPB(predicate.column(),
predicate.raw_upper(),
range_pred->mutable_upper());
}
return;
};
case PredicateType::IsNotNull: {
pb->mutable_is_not_null();
return;
};
case PredicateType::IsNull: {
pb->mutable_is_null();
return;
}
case PredicateType::InList: {
auto* values = pb->mutable_in_list()->mutable_values();
for (const void* value : predicate.raw_values()) {
CopyPredicateBoundToPB(predicate.column(), value, values->Add());
}
return;
};
case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf";
case PredicateType::InBloomFilter: {
auto* bloom_filter_pred = pb->mutable_in_bloom_filter();
for (const auto& bf : predicate.bloom_filters()) {
ColumnPredicatePB::BloomFilter* bloom_filter = bloom_filter_pred->add_bloom_filters();
CopyPredicateBloomFilterToPB(bf, bloom_filter);
}
// Form the optional lower and upper bound.
if (predicate.raw_lower() != nullptr) {
CopyPredicateBoundToPB(predicate.column(),
predicate.raw_lower(),
bloom_filter_pred->mutable_lower());
}
if (predicate.raw_upper() != nullptr) {
CopyPredicateBoundToPB(predicate.column(),
predicate.raw_upper(),
bloom_filter_pred->mutable_upper());
}
return;
}
}
LOG(FATAL) << "unknown predicate type";
}
Status ColumnPredicateFromPB(const Schema& schema,
Arena* arena,
const ColumnPredicatePB& pb,
boost::optional<ColumnPredicate>* predicate) {
if (!pb.has_column()) {
return Status::InvalidArgument("Column predicate must include a column", SecureDebugString(pb));
}
const string& column = pb.column();
int32_t idx = schema.find_column(column);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("unknown column in predicate", SecureDebugString(pb));
}
const ColumnSchema& col = schema.column(idx);
switch (pb.predicate_case()) {
case ColumnPredicatePB::kRange: {
const auto& range = pb.range();
if (!range.has_lower() && !range.has_upper()) {
return Status::InvalidArgument("Invalid range predicate on column: no bounds",
col.name());
}
const void* lower = nullptr;
const void* upper = nullptr;
if (range.has_lower()) {
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, range.lower(), arena, &lower));
}
if (range.has_upper()) {
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, range.upper(), arena, &upper));
}
*predicate = ColumnPredicate::Range(col, lower, upper);
break;
};
case ColumnPredicatePB::kEquality: {
const auto& equality = pb.equality();
if (!equality.has_value()) {
return Status::InvalidArgument("Invalid equality predicate on column: no value",
col.name());
}
const void* value = nullptr;
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, equality.value(), arena, &value));
*predicate = ColumnPredicate::Equality(col, value);
break;
};
case ColumnPredicatePB::kInList: {
const auto& inlist = pb.in_list();
vector<const void*> values;
for (const string& pb_value : inlist.values()) {
const void* value = nullptr;
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, pb_value, arena, &value));
values.push_back(value);
}
*predicate = ColumnPredicate::InList(col, &values);
break;
};
case ColumnPredicatePB::kIsNotNull: {
*predicate = ColumnPredicate::IsNotNull(col);
break;
};
case ColumnPredicatePB::kIsNull: {
*predicate = ColumnPredicate::IsNull(col);
break;
};
case ColumnPredicatePB::kInBloomFilter: {
const auto& in_bloom_filter = pb.in_bloom_filter();
vector<ColumnPredicate::BloomFilterInner> bloom_filters;
if (in_bloom_filter.bloom_filters_size() == 0) {
return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
"no bloom filter contained", col.name());
}
for (const auto& bf : in_bloom_filter.bloom_filters()) {
if (!bf.has_nhash()
|| !bf.has_bloom_data()
|| !bf.has_hash_algorithm()
|| bf.hash_algorithm() == UNKNOWN_HASH) {
return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
"missing bloom filter details", col.name());
}
ColumnPredicate::BloomFilterInner bloom_filter;
RETURN_NOT_OK(CopyPredicateBloomFilterFromPB(bf, &bloom_filter, arena));
bloom_filters.emplace_back(bloom_filter);
}
// Extract the optional lower and upper bound.
const void* lower = nullptr;
const void* upper = nullptr;
if (in_bloom_filter.has_lower()) {
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.lower(), arena, &lower));
}
if (in_bloom_filter.has_upper()) {
RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.upper(), arena, &upper));
}
*predicate = ColumnPredicate::InBloomFilter(col, &bloom_filters, lower, upper);
break;
};
default: return Status::InvalidArgument("Unknown predicate type for column", col.name());
}
return Status::OK();
}
const char kTableHistoryMaxAgeSec[] = "kudu.table.history_max_age_sec";
const char kTableMaintenancePriority[] = "kudu.table.maintenance_priority";
Status ExtraConfigPBToMap(const TableExtraConfigPB& pb, map<string, string>* configs) {
Map<string, string> tmp;
RETURN_NOT_OK(ExtraConfigPBToPBMap(pb, &tmp));
map<string, string> result(tmp.begin(), tmp.end());
*configs = std::move(result);
return Status::OK();
}
Status ParseInt32Config(const string& name, const string& value, int32_t* result) {
CHECK(result);
if (!safe_strto32(value, result)) {
return Status::InvalidArgument(Substitute("unable to parse $0", name), value);
}
return Status::OK();
}
Status ExtraConfigPBFromPBMap(const Map<string, string>& configs, TableExtraConfigPB* pb) {
TableExtraConfigPB result;
for (const auto& config : configs) {
const string& name = config.first;
const string& value = config.second;
if (name == kTableHistoryMaxAgeSec) {
if (!value.empty()) {
int32_t history_max_age_sec;
RETURN_NOT_OK(ParseInt32Config(name, value, &history_max_age_sec));
result.set_history_max_age_sec(history_max_age_sec);
}
} else if (name == kTableMaintenancePriority) {
if (!value.empty()) {
int32_t maintenance_priority;
RETURN_NOT_OK(ParseInt32Config(name, value, &maintenance_priority));
result.set_maintenance_priority(maintenance_priority);
}
} else {
LOG(WARNING) << "Unknown extra configuration property: " << name;
}
}
*pb = std::move(result);
return Status::OK();
}
Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb, Map<string, string>* configs) {
Map<string, string> result;
if (pb.has_history_max_age_sec()) {
result[kTableHistoryMaxAgeSec] = std::to_string(pb.history_max_age_sec());
}
if (pb.has_maintenance_priority()) {
result[kTableMaintenancePriority] = std::to_string(pb.maintenance_priority());
}
*configs = std::move(result);
return Status::OK();
}
// Because we use a faststring here, ASAN tests become unbearably slow
// with the extra verifications.
ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB& rowblock_pb,
const Slice& indirect_data_slice, Slice* row_data_slice,
bool pad_unixtime_micros_to_16_bytes) {
// TODO(todd): cheating here so we can rewrite the request as it arrived and
// change any indirect data pointers back to "real" pointers instead of
// on-the-wire pointers. Maybe the RPC layer should give us a non-const
// request? Maybe we should suck it up and copy the data when we mutate?
size_t total_padding = 0;
int num_binary_cols = 0;
for (int i = 0; i < schema.num_columns(); i++) {
// If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
// size to adjust the row_stride.
if (pad_unixtime_micros_to_16_bytes &&
schema.column(i).type_info()->type() == UNIXTIME_MICROS) {
total_padding += 8;
}
if (schema.column(i).type_info()->physical_type() == BINARY) {
num_binary_cols++;
}
}
const size_t row_stride = ContiguousRowHelper::row_size(schema) + total_padding;
// We don't need a const-cast because we can just use Slice's lack of
// const-safety.
uint8_t* row_data = row_data_slice->mutable_data();
const uint8_t* indir_data = indirect_data_slice.data();
const size_t expected_data_size = rowblock_pb.num_rows() * row_stride;
const size_t null_bitmap_offset = schema.byte_size() + total_padding;
if (PREDICT_FALSE(row_data_slice->size() != expected_data_size)) {
return Status::Corruption(
Substitute("Row block has $0 bytes of data but expected $1 for $2 rows",
row_data_slice->size(), expected_data_size, rowblock_pb.num_rows()));
}
if (num_binary_cols == 0) return Status::OK();
// Calculate the offset information for the columns which need rewriting.
// Calculating this up front means we can avoid re-calculating this redundant
// information once per row.
struct ToRewrite {
int col_idx;
int col_offset;
bool nullable;
};
FixedArray<ToRewrite> to_rewrite(num_binary_cols);
int padding_so_far = 0;
int j = 0;
for (int i = 0; i < schema.num_columns(); i++) {
const ColumnSchema& col = schema.column(i);
if (pad_unixtime_micros_to_16_bytes &&
col.type_info()->type() == UNIXTIME_MICROS) {
padding_so_far += 8;
}
if (col.type_info()->physical_type() == BINARY) {
int column_offset = schema.column_offset(i) + padding_so_far;
to_rewrite[j++] = { i, column_offset, col.is_nullable() };
}
}
DCHECK_EQ(j, num_binary_cols);
// Iterate through the rows and rewrite columns as necessary.
// NOTE: we do this row-by-row instead of column-by-column because
// the input data is typically much larger than L1 cache, and thus
// doing one pass over the memory is faster.
uint8_t* row_ptr = row_data;
for (int row_idx = 0;
row_idx < rowblock_pb.num_rows();
row_idx++) {
for (const auto& t : to_rewrite) {
uint8_t* cell_ptr = row_ptr + t.col_offset;
if (t.nullable && BitmapTest(row_ptr + null_bitmap_offset, t.col_idx)) {
// No need to rewrite null values.
continue;
}
// The pointer is currently an offset into indir_data. Need to replace it
// with the actual pointer into indir_data
Slice *slice = reinterpret_cast<Slice *>(cell_ptr);
size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
// Ensure the updated pointer is within the bounds of the indirect data.
bool overflowed = false;
size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
const auto& col = schema.column(t.col_idx);
return Status::Corruption(
Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
row_idx, col.ToString(), offset_in_indirect, slice->size()));
}
*slice = Slice(&indir_data[offset_in_indirect], slice->size());
}
row_ptr += row_stride;
}
return Status::OK();
}
Status ExtractRowsFromRowBlockPB(const Schema& schema,
const RowwiseRowBlockPB& rowblock_pb,
const Slice& indirect_data,
Slice* rows_data,
vector<const uint8_t*>* rows) {
RETURN_NOT_OK(RewriteRowBlockPointers(schema, rowblock_pb, indirect_data, rows_data));
int n_rows = rowblock_pb.num_rows();
if (PREDICT_FALSE(n_rows == 0)) {
// Early-out here to avoid a UBSAN failure.
return Status::OK();
}
// Doing this resize and array indexing turns out to be noticeably faster
// than using reserve and push_back.
size_t row_size = ContiguousRowHelper::row_size(schema);
const uint8_t* src = rows_data->data();
int dst_index = rows->size();
rows->resize(rows->size() + n_rows);
const uint8_t** dst = &(*rows)[dst_index];
while (n_rows > 0) {
*dst++ = src;
src += row_size;
n_rows--;
}
return Status::OK();
}
Status FindLeaderHostPort(const RepeatedPtrField<ServerEntryPB>& entries,
HostPort* leader_hostport) {
for (const ServerEntryPB& entry : entries) {
if (entry.has_error()) {
LOG(WARNING) << "Error encountered for server entry " << SecureShortDebugString(entry)
<< ": " << StatusFromPB(entry.error()).ToString();
continue;
}
if (!entry.has_role()) {
return Status::IllegalState(
strings::Substitute("Every server in must have a role, but entry ($0) has no role.",
SecureShortDebugString(entry)));
}
if (entry.role() == consensus::RaftPeerPB::LEADER) {
return HostPortFromPB(entry.registration().rpc_addresses(0), leader_hostport);
}
}
return Status::NotFound("No leader found.");
}
template<class RowType>
void AppendRowToString(const RowType& row, string* buf);
template<>
void AppendRowToString<ConstContiguousRow>(const ConstContiguousRow& row, string* buf) {
buf->append(reinterpret_cast<const char*>(row.row_data()), row.row_size());
}
template<>
void AppendRowToString<RowBlockRow>(const RowBlockRow& row, string* buf) {
size_t row_size = ContiguousRowHelper::row_size(*row.schema());
size_t appended_offset = buf->size();
buf->resize(buf->size() + row_size);
uint8_t* copied_rowdata = reinterpret_cast<uint8_t*>(&(*buf)[appended_offset]);
ContiguousRow copied_row(row.schema(), copied_rowdata);
CHECK_OK(CopyRow(row, &copied_row, reinterpret_cast<Arena*>(NULL)));
}
// Copy a column worth of data from the given RowBlock into the output
// protobuf.
//
// IS_NULLABLE: true if the column is nullable
// IS_VARLEN: true if the column is of variable length
//
// These are template parameters rather than normal function arguments
// so that there are fewer branches inside the loop.
//
// NOTE: 'dst_schema' must either be NULL or a subset of the specified's
// RowBlock's schema. If not NULL, then column at 'col_idx' in 'block' will
// be copied to column 'dst_col_idx' in the output protobuf; otherwise,
// dst_col_idx must be equal to col_idx.
template<bool IS_NULLABLE, bool IS_VARLEN>
static void CopyColumn(
const ColumnBlock& column_block, int dst_col_idx, uint8_t* __restrict__ dst_base,
faststring* indirect_data, const Schema* dst_schema, size_t row_stride,
size_t schema_byte_size, size_t column_offset,
const vector<int>& row_idx_select) {
DCHECK(dst_schema);
uint8_t* dst = dst_base + column_offset;
size_t offset_to_null_bitmap = schema_byte_size - column_offset;
size_t cell_size = column_block.stride();
const uint8_t* src = column_block.cell_ptr(0);
for (auto index : row_idx_select) {
src = column_block.cell_ptr(index);
if (IS_NULLABLE && column_block.is_null(index)) {
BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, true);
} else if (IS_VARLEN) {
const Slice* slice = reinterpret_cast<const Slice *>(src);
size_t offset_in_indirect = indirect_data->size();
indirect_data->append(reinterpret_cast<const char*>(slice->data()), slice->size());
Slice* dst_slice = reinterpret_cast<Slice *>(dst);
*dst_slice = Slice(reinterpret_cast<const uint8_t*>(offset_in_indirect),
slice->size());
if (IS_NULLABLE) {
BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
}
} else { // non-string, non-null
strings::memcpy_inlined(dst, src, cell_size);
if (IS_NULLABLE) {
BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false);
}
}
dst += row_stride;
}
}
// Because we use a faststring here, ASAN tests become unbearably slow
// with the extra verifications.
ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS
void SerializeRowBlock(const RowBlock& block,
RowwiseRowBlockPB* rowblock_pb,
const Schema* projection_schema,
faststring* data_buf,
faststring* indirect_data,
bool pad_unixtime_micros_to_16_bytes) {
DCHECK_GT(block.nrows(), 0);
const Schema* tablet_schema = block.schema();
if (projection_schema == nullptr) {
projection_schema = tablet_schema;
}
// Check whether we need to pad or if there are nullable columns, this will dictate whether
// we need to set memory to zero.
size_t total_padding = 0;
bool has_nullable_cols = false;
for (int i = 0; i < projection_schema->num_columns(); i++) {
if (projection_schema->column(i).is_nullable()) {
has_nullable_cols = true;
}
// If we're padding UNIXTIME_MICROS for Impala we need to calculate the total padding
// size to adjust the row_stride.
if (pad_unixtime_micros_to_16_bytes &&
projection_schema->column(i).type_info()->type() == UNIXTIME_MICROS) {
total_padding += 8;
}
}
size_t old_size = data_buf->size();
size_t row_stride = ContiguousRowHelper::row_size(*projection_schema) + total_padding;
size_t num_rows = block.selection_vector()->CountSelected();
size_t schema_byte_size = projection_schema->byte_size() + total_padding;
size_t additional_size = row_stride * num_rows;
data_buf->resize(old_size + additional_size);
uint8_t* base = &(*data_buf)[old_size];
// Zero out the memory if we have nullable columns or if we're padding slots so that we don't leak
// unrelated data to the client.
if (total_padding != 0 || has_nullable_cols) {
memset(base, 0, additional_size);
}
vector<int> selected_row_indexes;
block.selection_vector()->GetSelectedRows(&selected_row_indexes);
size_t t_schema_idx = 0;
size_t padding_so_far = 0;
for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++) {
const ColumnSchema& col = projection_schema->column(p_schema_idx);
t_schema_idx = tablet_schema->find_column(col.name());
DCHECK_NE(t_schema_idx, -1);
size_t column_offset = projection_schema->column_offset(p_schema_idx) + padding_so_far;
const ColumnBlock& column_block = block.column_block(t_schema_idx);
// Generating different functions for each of these cases makes them much less
// branch-heavy -- we do the branch once outside the loop, and then have a
// compiled version for each combination below.
// TODO: Using LLVM to build a specialized CopyColumn on the fly should have
// even bigger gains, since we could inline the constant cell sizes and column
// offsets.
if (col.is_nullable() && col.type_info()->physical_type() == BINARY) {
CopyColumn<true, true>(column_block, p_schema_idx, base, indirect_data, projection_schema,
row_stride, schema_byte_size, column_offset, selected_row_indexes);
} else if (col.is_nullable() && col.type_info()->physical_type() != BINARY) {
CopyColumn<true, false>(column_block, p_schema_idx, base, indirect_data, projection_schema,
row_stride, schema_byte_size, column_offset, selected_row_indexes);
} else if (!col.is_nullable() && col.type_info()->physical_type() == BINARY) {
CopyColumn<false, true>(column_block, p_schema_idx, base, indirect_data, projection_schema,
row_stride, schema_byte_size, column_offset, selected_row_indexes);
} else if (!col.is_nullable() && col.type_info()->physical_type() != BINARY) {
CopyColumn<false, false>(column_block, p_schema_idx, base, indirect_data, projection_schema,
row_stride, schema_byte_size, column_offset, selected_row_indexes);
} else {
LOG(FATAL) << "cannot reach here";
}
if (col.type_info()->type() == UNIXTIME_MICROS && pad_unixtime_micros_to_16_bytes) {
padding_so_far += 8;
}
}
rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows);
}
string StartTimeToString(const ServerRegistrationPB& reg) {
string start_time;
if (reg.has_start_time()) {
// Convert epoch time to localtime.
StringAppendStrftime(&start_time, "%Y-%m-%d %H:%M:%S %Z",
static_cast<time_t>(reg.start_time()), true);
} else {
start_time = "<unknown>";
}
return start_time;
}
} // namespace kudu