| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/common/wire_protocol.h" |
| |
| #include <string> |
| #include <vector> |
| |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/row.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/fastmem.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/safe_math.h" |
| #include "kudu/util/slice.h" |
| |
| using google::protobuf::RepeatedPtrField; |
| using std::vector; |
| |
| namespace kudu { |
| |
| void StatusToPB(const Status& status, AppStatusPB* pb) { |
| pb->Clear(); |
| bool is_unknown = false; |
| if (status.ok()) { |
| pb->set_code(AppStatusPB::OK); |
| // OK statuses don't have any message or posix code. |
| return; |
| } else if (status.IsNotFound()) { |
| pb->set_code(AppStatusPB::NOT_FOUND); |
| } else if (status.IsCorruption()) { |
| pb->set_code(AppStatusPB::CORRUPTION); |
| } else if (status.IsNotSupported()) { |
| pb->set_code(AppStatusPB::NOT_SUPPORTED); |
| } else if (status.IsInvalidArgument()) { |
| pb->set_code(AppStatusPB::INVALID_ARGUMENT); |
| } else if (status.IsIOError()) { |
| pb->set_code(AppStatusPB::IO_ERROR); |
| } else if (status.IsAlreadyPresent()) { |
| pb->set_code(AppStatusPB::ALREADY_PRESENT); |
| } else if (status.IsRuntimeError()) { |
| pb->set_code(AppStatusPB::RUNTIME_ERROR); |
| } else if (status.IsNetworkError()) { |
| pb->set_code(AppStatusPB::NETWORK_ERROR); |
| } else if (status.IsIllegalState()) { |
| pb->set_code(AppStatusPB::ILLEGAL_STATE); |
| } else if (status.IsNotAuthorized()) { |
| pb->set_code(AppStatusPB::NOT_AUTHORIZED); |
| } else if (status.IsAborted()) { |
| pb->set_code(AppStatusPB::ABORTED); |
| } else if (status.IsRemoteError()) { |
| pb->set_code(AppStatusPB::REMOTE_ERROR); |
| } else if (status.IsServiceUnavailable()) { |
| pb->set_code(AppStatusPB::SERVICE_UNAVAILABLE); |
| } else if (status.IsTimedOut()) { |
| pb->set_code(AppStatusPB::TIMED_OUT); |
| } else if (status.IsUninitialized()) { |
| pb->set_code(AppStatusPB::UNINITIALIZED); |
| } else if (status.IsConfigurationError()) { |
| pb->set_code(AppStatusPB::CONFIGURATION_ERROR); |
| } else if (status.IsIncomplete()) { |
| pb->set_code(AppStatusPB::INCOMPLETE); |
| } else if (status.IsEndOfFile()) { |
| pb->set_code(AppStatusPB::END_OF_FILE); |
| } else { |
| LOG(WARNING) << "Unknown error code translation from internal error " |
| << status.ToString() << ": sending UNKNOWN_ERROR"; |
| pb->set_code(AppStatusPB::UNKNOWN_ERROR); |
| is_unknown = true; |
| } |
| if (is_unknown) { |
| // For unknown status codes, include the original stringified error |
| // code. |
| pb->set_message(status.CodeAsString() + ": " + |
| status.message().ToString()); |
| } else { |
| // Otherwise, just encode the message itself, since the other end |
| // will reconstruct the other parts of the ToString() response. |
| pb->set_message(status.message().ToString()); |
| } |
| if (status.posix_code() != -1) { |
| pb->set_posix_code(status.posix_code()); |
| } |
| } |
| |
| Status StatusFromPB(const AppStatusPB& pb) { |
| int posix_code = pb.has_posix_code() ? pb.posix_code() : -1; |
| |
| switch (pb.code()) { |
| case AppStatusPB::OK: |
| return Status::OK(); |
| case AppStatusPB::NOT_FOUND: |
| return Status::NotFound(pb.message(), "", posix_code); |
| case AppStatusPB::CORRUPTION: |
| return Status::Corruption(pb.message(), "", posix_code); |
| case AppStatusPB::NOT_SUPPORTED: |
| return Status::NotSupported(pb.message(), "", posix_code); |
| case AppStatusPB::INVALID_ARGUMENT: |
| return Status::InvalidArgument(pb.message(), "", posix_code); |
| case AppStatusPB::IO_ERROR: |
| return Status::IOError(pb.message(), "", posix_code); |
| case AppStatusPB::ALREADY_PRESENT: |
| return Status::AlreadyPresent(pb.message(), "", posix_code); |
| case AppStatusPB::RUNTIME_ERROR: |
| return Status::RuntimeError(pb.message(), "", posix_code); |
| case AppStatusPB::NETWORK_ERROR: |
| return Status::NetworkError(pb.message(), "", posix_code); |
| case AppStatusPB::ILLEGAL_STATE: |
| return Status::IllegalState(pb.message(), "", posix_code); |
| case AppStatusPB::NOT_AUTHORIZED: |
| return Status::NotAuthorized(pb.message(), "", posix_code); |
| case AppStatusPB::ABORTED: |
| return Status::Aborted(pb.message(), "", posix_code); |
| case AppStatusPB::REMOTE_ERROR: |
| return Status::RemoteError(pb.message(), "", posix_code); |
| case AppStatusPB::SERVICE_UNAVAILABLE: |
| return Status::ServiceUnavailable(pb.message(), "", posix_code); |
| case AppStatusPB::TIMED_OUT: |
| return Status::TimedOut(pb.message(), "", posix_code); |
| case AppStatusPB::UNINITIALIZED: |
| return Status::Uninitialized(pb.message(), "", posix_code); |
| case AppStatusPB::CONFIGURATION_ERROR: |
| return Status::ConfigurationError(pb.message(), "", posix_code); |
| case AppStatusPB::INCOMPLETE: |
| return Status::Incomplete(pb.message(), "", posix_code); |
| case AppStatusPB::END_OF_FILE: |
| return Status::EndOfFile(pb.message(), "", posix_code); |
| case AppStatusPB::UNKNOWN_ERROR: |
| default: |
| LOG(WARNING) << "Unknown error code in status: " << pb.ShortDebugString(); |
| return Status::RuntimeError("(unknown error code)", pb.message(), posix_code); |
| } |
| } |
| |
| Status HostPortToPB(const HostPort& host_port, HostPortPB* host_port_pb) { |
| host_port_pb->set_host(host_port.host()); |
| host_port_pb->set_port(host_port.port()); |
| return Status::OK(); |
| } |
| |
| Status HostPortFromPB(const HostPortPB& host_port_pb, HostPort* host_port) { |
| host_port->set_host(host_port_pb.host()); |
| host_port->set_port(host_port_pb.port()); |
| return Status::OK(); |
| } |
| |
| Status AddHostPortPBs(const vector<Sockaddr>& addrs, |
| RepeatedPtrField<HostPortPB>* pbs) { |
| for (const Sockaddr& addr : addrs) { |
| HostPortPB* pb = pbs->Add(); |
| if (addr.IsWildcard()) { |
| RETURN_NOT_OK(GetFQDN(pb->mutable_host())); |
| } else { |
| pb->set_host(addr.host()); |
| } |
| pb->set_port(addr.port()); |
| } |
| return Status::OK(); |
| } |
| |
| Status SchemaToPB(const Schema& schema, SchemaPB *pb, int flags) { |
| pb->Clear(); |
| return SchemaToColumnPBs(schema, pb->mutable_columns(), flags); |
| } |
| |
| Status SchemaToPBWithoutIds(const Schema& schema, SchemaPB *pb) { |
| pb->Clear(); |
| return SchemaToColumnPBs(schema, pb->mutable_columns(), SCHEMA_PB_WITHOUT_IDS); |
| } |
| |
| Status SchemaFromPB(const SchemaPB& pb, Schema *schema) { |
| return ColumnPBsToSchema(pb.columns(), schema); |
| } |
| |
| void ColumnSchemaToPB(const ColumnSchema& col_schema, ColumnSchemaPB *pb, int flags) { |
| pb->Clear(); |
| pb->set_name(col_schema.name()); |
| pb->set_type(col_schema.type_info()->type()); |
| pb->set_is_nullable(col_schema.is_nullable()); |
| if (!(flags & SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES)) { |
| pb->set_encoding(col_schema.attributes().encoding); |
| pb->set_compression(col_schema.attributes().compression); |
| pb->set_cfile_block_size(col_schema.attributes().cfile_block_size); |
| } |
| if (col_schema.has_read_default()) { |
| if (col_schema.type_info()->physical_type() == BINARY) { |
| const Slice *read_slice = static_cast<const Slice *>(col_schema.read_default_value()); |
| pb->set_read_default_value(read_slice->data(), read_slice->size()); |
| } else { |
| const void *read_value = col_schema.read_default_value(); |
| pb->set_read_default_value(read_value, col_schema.type_info()->size()); |
| } |
| } |
| if (col_schema.has_write_default()) { |
| if (col_schema.type_info()->physical_type() == BINARY) { |
| const Slice *write_slice = static_cast<const Slice *>(col_schema.write_default_value()); |
| pb->set_write_default_value(write_slice->data(), write_slice->size()); |
| } else { |
| const void *write_value = col_schema.write_default_value(); |
| pb->set_write_default_value(write_value, col_schema.type_info()->size()); |
| } |
| } |
| } |
| |
| ColumnSchema ColumnSchemaFromPB(const ColumnSchemaPB& pb) { |
| const void *write_default_ptr = nullptr; |
| const void *read_default_ptr = nullptr; |
| Slice write_default; |
| Slice read_default; |
| const TypeInfo* typeinfo = GetTypeInfo(pb.type()); |
| if (pb.has_read_default_value()) { |
| read_default = Slice(pb.read_default_value()); |
| if (typeinfo->physical_type() == BINARY) { |
| read_default_ptr = &read_default; |
| } else { |
| read_default_ptr = read_default.data(); |
| } |
| } |
| if (pb.has_write_default_value()) { |
| write_default = Slice(pb.write_default_value()); |
| if (typeinfo->physical_type() == BINARY) { |
| write_default_ptr = &write_default; |
| } else { |
| write_default_ptr = write_default.data(); |
| } |
| } |
| |
| ColumnStorageAttributes attributes; |
| if (pb.has_encoding()) { |
| attributes.encoding = pb.encoding(); |
| } |
| if (pb.has_compression()) { |
| attributes.compression = pb.compression(); |
| } |
| if (pb.has_cfile_block_size()) { |
| attributes.cfile_block_size = pb.cfile_block_size(); |
| } |
| return ColumnSchema(pb.name(), pb.type(), pb.is_nullable(), |
| read_default_ptr, write_default_ptr, |
| attributes); |
| } |
| |
| Status ColumnPBsToSchema(const RepeatedPtrField<ColumnSchemaPB>& column_pbs, |
| Schema* schema) { |
| |
| vector<ColumnSchema> columns; |
| vector<ColumnId> column_ids; |
| columns.reserve(column_pbs.size()); |
| int num_key_columns = 0; |
| bool is_handling_key = true; |
| for (const ColumnSchemaPB& pb : column_pbs) { |
| columns.push_back(ColumnSchemaFromPB(pb)); |
| if (pb.is_key()) { |
| if (!is_handling_key) { |
| return Status::InvalidArgument( |
| "Got out-of-order key column", pb.ShortDebugString()); |
| } |
| num_key_columns++; |
| } else { |
| is_handling_key = false; |
| } |
| if (pb.has_id()) { |
| column_ids.push_back(ColumnId(pb.id())); |
| } |
| } |
| |
| DCHECK_LE(num_key_columns, columns.size()); |
| |
| // TODO(perf): could make the following faster by adding a |
| // Reset() variant which actually takes ownership of the column |
| // vector. |
| return schema->Reset(columns, column_ids, num_key_columns); |
| } |
| |
| Status SchemaToColumnPBs(const Schema& schema, |
| RepeatedPtrField<ColumnSchemaPB>* cols, |
| int flags) { |
| cols->Clear(); |
| int idx = 0; |
| for (const ColumnSchema& col : schema.columns()) { |
| ColumnSchemaPB* col_pb = cols->Add(); |
| ColumnSchemaToPB(col, col_pb); |
| col_pb->set_is_key(idx < schema.num_key_columns()); |
| |
| if (schema.has_column_ids() && !(flags & SCHEMA_PB_WITHOUT_IDS)) { |
| col_pb->set_id(schema.column_id(idx)); |
| } |
| |
| idx++; |
| } |
| return Status::OK(); |
| } |
| |
| namespace { |
| // Copies a predicate lower or upper bound from 'bound_src' into 'bound_dst'. |
| void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, string* bound_dst) { |
| const void* src; |
| size_t size; |
| if (col.type_info()->physical_type() == BINARY) { |
| // Copying a string involves an extra level of indirection through its |
| // owning slice. |
| const Slice* s = reinterpret_cast<const Slice*>(bound_src); |
| src = s->data(); |
| size = s->size(); |
| } else { |
| src = bound_src; |
| size = col.type_info()->size(); |
| } |
| bound_dst->assign(reinterpret_cast<const char*>(src), size); |
| } |
| |
| // Extract a void* pointer suitable for use in a ColumnRangePredicate from the |
| // string protobuf bound. This validates that the pb_value has the correct |
| // length, copies the data into 'arena', and sets *result to point to it. |
| // Returns bad status if the user-specified value is the wrong length. |
| Status CopyPredicateBoundFromPB(const ColumnSchema& schema, |
| const string& pb_value, |
| Arena* arena, |
| const void** result) { |
| // Copy the data from the protobuf into the Arena. |
| uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size())); |
| memcpy(data_copy, &pb_value[0], pb_value.size()); |
| |
| // If the type is of variable length, then we need to return a pointer to a Slice |
| // element pointing to the string. Otherwise, just verify that the provided |
| // value was the right size. |
| if (schema.type_info()->physical_type() == BINARY) { |
| *result = arena->NewObject<Slice>(data_copy, pb_value.size()); |
| } else { |
| // TODO: add test case for this invalid request |
| size_t expected_size = schema.type_info()->size(); |
| if (pb_value.size() != expected_size) { |
| return Status::InvalidArgument( |
| strings::Substitute("Bad predicate on $0. Expected value size $1, got $2", |
| schema.ToString(), expected_size, pb_value.size())); |
| } |
| *result = data_copy; |
| } |
| |
| return Status::OK(); |
| } |
| } // anonymous namespace |
| |
| void ColumnPredicateToPB(const ColumnPredicate& predicate, |
| ColumnPredicatePB* pb) { |
| pb->set_column(predicate.column().name()); |
| switch (predicate.predicate_type()) { |
| case PredicateType::Equality: { |
| CopyPredicateBoundToPB(predicate.column(), |
| predicate.raw_lower(), |
| pb->mutable_equality()->mutable_value()); |
| return; |
| }; |
| case PredicateType::Range: { |
| auto range_pred = pb->mutable_range(); |
| if (predicate.raw_lower() != nullptr) { |
| CopyPredicateBoundToPB(predicate.column(), |
| predicate.raw_lower(), |
| range_pred->mutable_lower()); |
| } |
| if (predicate.raw_upper() != nullptr) { |
| CopyPredicateBoundToPB(predicate.column(), |
| predicate.raw_upper(), |
| range_pred->mutable_upper()); |
| } |
| return; |
| }; |
| case PredicateType::IsNotNull: { |
| pb->mutable_is_not_null(); |
| return; |
| }; |
| case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf"; |
| } |
| LOG(FATAL) << "unknown predicate type"; |
| } |
| |
| Status ColumnPredicateFromPB(const Schema& schema, |
| Arena* arena, |
| const ColumnPredicatePB& pb, |
| optional<ColumnPredicate>* predicate) { |
| if (!pb.has_column()) { |
| return Status::InvalidArgument("Column predicate must include a column", pb.DebugString()); |
| } |
| const string& column = pb.column(); |
| int32_t idx = schema.find_column(column); |
| if (idx == Schema::kColumnNotFound) { |
| return Status::InvalidArgument("unknown column in predicate", pb.DebugString()); |
| } |
| const ColumnSchema& col = schema.column(idx); |
| |
| switch (pb.predicate_case()) { |
| case ColumnPredicatePB::kRange: { |
| const auto& range = pb.range(); |
| if (!range.has_lower() && !range.has_upper()) { |
| return Status::InvalidArgument("Invalid range predicate on column: no bounds", |
| col.name()); |
| } |
| |
| const void* lower = nullptr; |
| const void* upper = nullptr; |
| if (range.has_lower()) { |
| RETURN_NOT_OK(CopyPredicateBoundFromPB(col, range.lower(), arena, &lower)); |
| } |
| if (range.has_upper()) { |
| RETURN_NOT_OK(CopyPredicateBoundFromPB(col, range.upper(), arena, &upper)); |
| } |
| |
| *predicate = ColumnPredicate::Range(col, lower, upper); |
| break; |
| }; |
| case ColumnPredicatePB::kEquality: { |
| const auto& equality = pb.equality(); |
| if (!equality.has_value()) { |
| return Status::InvalidArgument("Invalid equality predicate on column: no value", |
| col.name()); |
| } |
| const void* value = nullptr; |
| RETURN_NOT_OK(CopyPredicateBoundFromPB(col, equality.value(), arena, &value)); |
| *predicate = ColumnPredicate::Equality(col, value); |
| break; |
| }; |
| case ColumnPredicatePB::kIsNotNull: { |
| ColumnPredicate p = ColumnPredicate::IsNotNull(col); |
| *predicate = ColumnPredicate::IsNotNull(col); |
| break; |
| }; |
| default: return Status::InvalidArgument("Unknown predicate type for column", col.name()); |
| } |
| return Status::OK(); |
| } |
| |
| // Because we use a faststring here, ASAN tests become unbearably slow |
| // with the extra verifications. |
| ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS |
| Status RewriteRowBlockPointers(const Schema& schema, const RowwiseRowBlockPB& rowblock_pb, |
| const Slice& indirect_data_slice, Slice* row_data_slice) { |
| // TODO: cheating here so we can rewrite the request as it arrived and |
| // change any indirect data pointers back to "real" pointers instead of |
| // on-the-wire pointers. Maybe the RPC layer should give us a non-const |
| // request? Maybe we should suck it up and copy the data when we mutate? |
| |
| // We don't need a const-cast because we can just use Slice's lack of |
| // const-safety. |
| uint8_t* row_data = row_data_slice->mutable_data(); |
| const uint8_t* indir_data = indirect_data_slice.data(); |
| size_t row_size = ContiguousRowHelper::row_size(schema); |
| size_t expected_data_size = rowblock_pb.num_rows() * row_size; |
| |
| if (PREDICT_FALSE(row_data_slice->size() != expected_data_size)) { |
| return Status::Corruption( |
| StringPrintf("Row block has %zd bytes of data but expected %zd for %" PRIu32 " rows", |
| row_data_slice->size(), expected_data_size, rowblock_pb.num_rows())); |
| } |
| |
| for (int i = 0; i < schema.num_columns(); i++) { |
| const ColumnSchema& col = schema.column(i); |
| if (col.type_info()->physical_type() != BINARY) { |
| continue; |
| } |
| |
| int row_idx = 0; |
| size_t offset = 0; |
| while (offset < row_data_slice->size()) { |
| ContiguousRow row(&schema, &row_data[offset]); |
| uint8_t* dst_cell = row.mutable_cell_ptr(i); |
| |
| if (!col.is_nullable() || !row.is_null(i)) { |
| // The pointer is currently an offset into indir_data. Need to replace it |
| // with the actual pointer into indir_data |
| Slice *slice = reinterpret_cast<Slice *>(dst_cell); |
| size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data()); |
| |
| // Ensure the updated pointer is within the bounds of the indirect data. |
| bool overflowed = false; |
| size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed); |
| if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) { |
| return Status::Corruption( |
| StringPrintf("Row #%d contained bad indirect slice for column %s: (%zd, %zd)", |
| row_idx, col.ToString().c_str(), |
| reinterpret_cast<uintptr_t>(slice->data()), |
| slice->size())); |
| } |
| *slice = Slice(&indir_data[offset_in_indirect], slice->size()); |
| } |
| |
| // Advance to next row |
| offset += row_size; |
| row_idx++; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ExtractRowsFromRowBlockPB(const Schema& schema, |
| const RowwiseRowBlockPB& rowblock_pb, |
| const Slice& indirect_data, |
| Slice* rows_data, |
| vector<const uint8_t*>* rows) { |
| RETURN_NOT_OK(RewriteRowBlockPointers(schema, rowblock_pb, indirect_data, rows_data)); |
| |
| int n_rows = rowblock_pb.num_rows(); |
| if (PREDICT_FALSE(n_rows == 0)) { |
| // Early-out here to avoid a UBSAN failure. |
| return Status::OK(); |
| } |
| |
| // Doing this resize and array indexing turns out to be noticeably faster |
| // than using reserve and push_back. |
| size_t row_size = ContiguousRowHelper::row_size(schema); |
| const uint8_t* src = rows_data->data(); |
| int dst_index = rows->size(); |
| rows->resize(rows->size() + n_rows); |
| const uint8_t** dst = &(*rows)[dst_index]; |
| while (n_rows > 0) { |
| *dst++ = src; |
| src += row_size; |
| n_rows--; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status FindLeaderHostPort(const RepeatedPtrField<ServerEntryPB>& entries, |
| HostPort* leader_hostport) { |
| for (const ServerEntryPB& entry : entries) { |
| if (entry.has_error()) { |
| LOG(WARNING) << "Error encountered for server entry " << entry.ShortDebugString() |
| << ": " << StatusFromPB(entry.error()).ToString(); |
| continue; |
| } |
| if (!entry.has_role()) { |
| return Status::IllegalState( |
| strings::Substitute("Every server in must have a role, but entry ($0) has no role.", |
| entry.ShortDebugString())); |
| } |
| if (entry.role() == consensus::RaftPeerPB::LEADER) { |
| return HostPortFromPB(entry.registration().rpc_addresses(0), leader_hostport); |
| } |
| } |
| return Status::NotFound("No leader found."); |
| } |
| |
| template<class RowType> |
| void AppendRowToString(const RowType& row, string* buf); |
| |
| template<> |
| void AppendRowToString<ConstContiguousRow>(const ConstContiguousRow& row, string* buf) { |
| buf->append(reinterpret_cast<const char*>(row.row_data()), row.row_size()); |
| } |
| |
| template<> |
| void AppendRowToString<RowBlockRow>(const RowBlockRow& row, string* buf) { |
| size_t row_size = ContiguousRowHelper::row_size(*row.schema()); |
| size_t appended_offset = buf->size(); |
| buf->resize(buf->size() + row_size); |
| uint8_t* copied_rowdata = reinterpret_cast<uint8_t*>(&(*buf)[appended_offset]); |
| ContiguousRow copied_row(row.schema(), copied_rowdata); |
| CHECK_OK(CopyRow(row, &copied_row, reinterpret_cast<Arena*>(NULL))); |
| } |
| |
| // Copy a column worth of data from the given RowBlock into the output |
| // protobuf. |
| // |
| // IS_NULLABLE: true if the column is nullable |
| // IS_VARLEN: true if the column is of variable length |
| // |
| // These are template parameters rather than normal function arguments |
| // so that there are fewer branches inside the loop. |
| // |
| // NOTE: 'dst_schema' must either be NULL or a subset of the specified's |
| // RowBlock's schema. If not NULL, then column at 'col_idx' in 'block' will |
| // be copied to column 'dst_col_idx' in the output protobuf; otherwise, |
| // dst_col_idx must be equal to col_idx. |
| template<bool IS_NULLABLE, bool IS_VARLEN> |
| static void CopyColumn(const RowBlock& block, int col_idx, |
| int dst_col_idx, uint8_t* dst_base, |
| faststring* indirect_data, const Schema* dst_schema) { |
| DCHECK_NOTNULL(dst_schema); |
| ColumnBlock cblock = block.column_block(col_idx); |
| size_t row_stride = ContiguousRowHelper::row_size(*dst_schema); |
| uint8_t* dst = dst_base + dst_schema->column_offset(dst_col_idx); |
| size_t offset_to_null_bitmap = dst_schema->byte_size() - dst_schema->column_offset(dst_col_idx); |
| |
| size_t cell_size = cblock.stride(); |
| const uint8_t* src = cblock.cell_ptr(0); |
| |
| BitmapIterator selected_row_iter(block.selection_vector()->bitmap(), |
| block.nrows()); |
| int run_size; |
| bool selected; |
| int row_idx = 0; |
| while ((run_size = selected_row_iter.Next(&selected))) { |
| if (!selected) { |
| src += run_size * cell_size; |
| row_idx += run_size; |
| continue; |
| } |
| for (int i = 0; i < run_size; i++) { |
| if (IS_NULLABLE && cblock.is_null(row_idx)) { |
| memset(dst, 0, cell_size); |
| BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, true); |
| } else if (IS_VARLEN) { |
| const Slice *slice = reinterpret_cast<const Slice *>(src); |
| size_t offset_in_indirect = indirect_data->size(); |
| indirect_data->append(reinterpret_cast<const char*>(slice->data()), |
| slice->size()); |
| |
| Slice *dst_slice = reinterpret_cast<Slice *>(dst); |
| *dst_slice = Slice(reinterpret_cast<const uint8_t*>(offset_in_indirect), |
| slice->size()); |
| if (IS_NULLABLE) { |
| BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false); |
| } |
| } else { // non-string, non-null |
| strings::memcpy_inlined(dst, src, cell_size); |
| if (IS_NULLABLE) { |
| BitmapChange(dst + offset_to_null_bitmap, dst_col_idx, false); |
| } |
| } |
| dst += row_stride; |
| src += cell_size; |
| row_idx++; |
| } |
| } |
| } |
| |
| // Because we use a faststring here, ASAN tests become unbearably slow |
| // with the extra verifications. |
| ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS |
| void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb, |
| const Schema* projection_schema, |
| faststring* data_buf, faststring* indirect_data) { |
| DCHECK_GT(block.nrows(), 0); |
| const Schema& tablet_schema = block.schema(); |
| |
| if (projection_schema == nullptr) { |
| projection_schema = &tablet_schema; |
| } |
| |
| size_t old_size = data_buf->size(); |
| size_t row_stride = ContiguousRowHelper::row_size(*projection_schema); |
| int num_rows = block.selection_vector()->CountSelected(); |
| data_buf->resize(old_size + row_stride * num_rows); |
| uint8_t* base = reinterpret_cast<uint8_t*>(&(*data_buf)[old_size]); |
| |
| size_t proj_schema_idx = 0; |
| for (int t_schema_idx = 0; t_schema_idx < tablet_schema.num_columns(); t_schema_idx++) { |
| const ColumnSchema& col = tablet_schema.column(t_schema_idx); |
| proj_schema_idx = projection_schema->find_column(col.name()); |
| if (proj_schema_idx == -1) { |
| continue; |
| } |
| |
| // Generating different functions for each of these cases makes them much less |
| // branch-heavy -- we do the branch once outside the loop, and then have a |
| // compiled version for each combination below. |
| // TODO: Using LLVM to build a specialized CopyColumn on the fly should have |
| // even bigger gains, since we could inline the constant cell sizes and column |
| // offsets. |
| if (col.is_nullable() && col.type_info()->physical_type() == BINARY) { |
| CopyColumn<true, true>(block, t_schema_idx, proj_schema_idx, base, indirect_data, |
| projection_schema); |
| } else if (col.is_nullable() && col.type_info()->physical_type() != BINARY) { |
| CopyColumn<true, false>(block, t_schema_idx, proj_schema_idx, base, indirect_data, |
| projection_schema); |
| } else if (!col.is_nullable() && col.type_info()->physical_type() == BINARY) { |
| CopyColumn<false, true>(block, t_schema_idx, proj_schema_idx, base, indirect_data, |
| projection_schema); |
| } else if (!col.is_nullable() && col.type_info()->physical_type() != BINARY) { |
| CopyColumn<false, false>(block, t_schema_idx, proj_schema_idx, base, indirect_data, |
| projection_schema); |
| } else { |
| LOG(FATAL) << "cannot reach here"; |
| } |
| } |
| rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows); |
| } |
| |
| } // namespace kudu |