| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/descriptors.h" |
| |
| #include <boost/algorithm/string/join.hpp> |
| #include <gutil/strings/substitute.h> |
| #include <ios> |
| #include <sstream> |
| |
| #include <llvm/ExecutionEngine/ExecutionEngine.h> |
| #include <llvm/IR/DataLayout.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "exprs/scalar-expr.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "gen-cpp/Descriptors_types.h" |
| #include "gen-cpp/PlanNodes_types.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/runtime-state.h" |
| |
| #include "common/names.h" |
| |
| using boost::algorithm::join; |
| using namespace strings; |
| |
| // In 'thrift_partition', the location is stored in a compressed format that references |
| // the 'partition_prefixes' of 'thrift_table'. This function decompresses that format into |
| // a string and stores it in 'result'. If 'location' is not set in the THdfsPartition, |
| // 'result' is set to the empty string. |
| static void DecompressLocation(const impala::THdfsTable& thrift_table, |
| const impala::THdfsPartition& thrift_partition, string* result) { |
| if (!thrift_partition.__isset.location) { |
| result->clear(); |
| return; |
| } |
| *result = thrift_partition.location.suffix; |
| if (thrift_partition.location.prefix_index != -1) { |
| // -1 means an uncompressed location |
| DCHECK_GE(thrift_partition.location.prefix_index, 0); |
| DCHECK_LT( |
| thrift_partition.location.prefix_index, thrift_table.partition_prefixes.size()); |
| *result = |
| thrift_table.partition_prefixes[thrift_partition.location.prefix_index] + *result; |
| } |
| } |
| |
| namespace impala { |
| |
| const int SchemaPathConstants::ARRAY_ITEM; |
| const int SchemaPathConstants::ARRAY_POS; |
| const int SchemaPathConstants::MAP_KEY; |
| const int SchemaPathConstants::MAP_VALUE; |
| |
| const int RowDescriptor::INVALID_IDX; |
| |
| const char* TupleDescriptor::LLVM_CLASS_NAME = "class.impala::TupleDescriptor"; |
| const char* NullIndicatorOffset::LLVM_CLASS_NAME = "struct.impala::NullIndicatorOffset"; |
| |
| string NullIndicatorOffset::DebugString() const { |
| stringstream out; |
| out << "(offset=" << byte_offset |
| << " mask=" << hex << static_cast<int>(bit_mask) << dec << ")"; |
| return out.str(); |
| } |
| |
| llvm::Constant* NullIndicatorOffset::ToIR(LlvmCodeGen* codegen) const { |
| llvm::StructType* null_indicator_offset_type = |
| codegen->GetStructType<NullIndicatorOffset>(); |
| // Populate padding at end of struct with zeroes. |
| llvm::ConstantAggregateZero* zeroes = |
| llvm::ConstantAggregateZero::get(null_indicator_offset_type); |
| return llvm::ConstantStruct::get(null_indicator_offset_type, |
| {codegen->GetI32Constant(byte_offset), |
| codegen->GetI8Constant(bit_mask), |
| zeroes->getStructElement(2)}); |
| } |
| |
| ostream& operator<<(ostream& os, const NullIndicatorOffset& null_indicator) { |
| os << null_indicator.DebugString(); |
| return os; |
| } |
| |
| SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc, |
| const TupleDescriptor* parent, const TupleDescriptor* collection_item_descriptor) |
| : id_(tdesc.id), |
| type_(ColumnType::FromThrift(tdesc.slotType)), |
| parent_(parent), |
| collection_item_descriptor_(collection_item_descriptor), |
| col_path_(tdesc.materializedPath), |
| tuple_offset_(tdesc.byteOffset), |
| null_indicator_offset_(tdesc.nullIndicatorByte, tdesc.nullIndicatorBit), |
| slot_idx_(tdesc.slotIdx), |
| slot_size_(type_.GetSlotSize()) { |
| DCHECK_NE(type_.type, TYPE_STRUCT); |
| DCHECK(parent_ != nullptr) << tdesc.parent; |
| if (type_.IsCollectionType()) { |
| DCHECK(tdesc.__isset.itemTupleId); |
| DCHECK(collection_item_descriptor_ != nullptr) << tdesc.itemTupleId; |
| } else { |
| DCHECK(!tdesc.__isset.itemTupleId); |
| DCHECK(collection_item_descriptor == nullptr); |
| } |
| } |
| |
| bool SlotDescriptor::ColPathLessThan(const SlotDescriptor* a, const SlotDescriptor* b) { |
| int common_levels = min(a->col_path().size(), b->col_path().size()); |
| for (int i = 0; i < common_levels; ++i) { |
| if (a->col_path()[i] == b->col_path()[i]) continue; |
| return a->col_path()[i] < b->col_path()[i]; |
| } |
| return a->col_path().size() < b->col_path().size(); |
| } |
| |
| string SlotDescriptor::DebugString() const { |
| stringstream out; |
| out << "Slot(id=" << id_ << " type=" << type_.DebugString() |
| << " col_path=["; |
| if (col_path_.size() > 0) out << col_path_[0]; |
| for (int i = 1; i < col_path_.size(); ++i) { |
| out << ","; |
| out << col_path_[i]; |
| } |
| out << "]"; |
| if (collection_item_descriptor_ != nullptr) { |
| out << " collection_item_tuple_id=" << collection_item_descriptor_->id(); |
| } |
| out << " offset=" << tuple_offset_ << " null=" << null_indicator_offset_.DebugString() |
| << " slot_idx=" << slot_idx_ << " field_idx=" << llvm_field_idx_ |
| << ")"; |
| return out.str(); |
| } |
| |
| bool SlotDescriptor::LayoutEquals(const SlotDescriptor& other_desc) const { |
| if (type() != other_desc.type()) return false; |
| if (is_nullable() != other_desc.is_nullable()) return false; |
| if (slot_size() != other_desc.slot_size()) return false; |
| if (tuple_offset() != other_desc.tuple_offset()) return false; |
| if (!null_indicator_offset().Equals(other_desc.null_indicator_offset())) return false; |
| return true; |
| } |
| |
| ColumnDescriptor::ColumnDescriptor(const TColumnDescriptor& tdesc) |
| : name_(tdesc.name), |
| type_(ColumnType::FromThrift(tdesc.type)) { |
| } |
| |
| string ColumnDescriptor::DebugString() const { |
| return Substitute("$0: $1", name_, type_.DebugString()); |
| } |
| |
| TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc) |
| : name_(tdesc.tableName), |
| database_(tdesc.dbName), |
| id_(tdesc.id), |
| type_(tdesc.tableType), |
| num_clustering_cols_(tdesc.numClusteringCols) { |
| for (int i = 0; i < tdesc.columnDescriptors.size(); ++i) { |
| col_descs_.push_back(ColumnDescriptor(tdesc.columnDescriptors[i])); |
| } |
| } |
| |
| string TableDescriptor::fully_qualified_name() const { |
| return Substitute("$0.$1", database_, name_); |
| } |
| |
| string TableDescriptor::DebugString() const { |
| vector<string> cols; |
| for (const ColumnDescriptor& col_desc: col_descs_) { |
| cols.push_back(col_desc.DebugString()); |
| } |
| stringstream out; |
| out << "#cols=" << num_cols() << " #clustering_cols=" << num_clustering_cols_; |
| out << " cols=["; |
| out << join(cols, ", "); |
| out << "]"; |
| return out.str(); |
| } |
| |
| HdfsPartitionDescriptor::HdfsPartitionDescriptor( |
| const THdfsTable& thrift_table, const THdfsPartition& thrift_partition) |
| : line_delim_(thrift_partition.lineDelim), |
| field_delim_(thrift_partition.fieldDelim), |
| collection_delim_(thrift_partition.collectionDelim), |
| escape_char_(thrift_partition.escapeChar), |
| block_size_(thrift_partition.blockSize), |
| id_(thrift_partition.id), |
| thrift_partition_key_exprs_(thrift_partition.partitionKeyExprs), |
| file_format_(thrift_partition.fileFormat) { |
| DecompressLocation(thrift_table, thrift_partition, &location_); |
| } |
| |
| string HdfsPartitionDescriptor::DebugString() const { |
| stringstream out; |
| out << " file_format=" << file_format_ << "'" |
| << " line_delim='" << line_delim_ << "'" |
| << " field_delim='" << field_delim_ << "'" |
| << " coll_delim='" << collection_delim_ << "'" |
| << " escape_char='" << escape_char_ << "')"; |
| return out.str(); |
| } |
| |
| string DataSourceTableDescriptor::DebugString() const { |
| stringstream out; |
| out << "DataSourceTable(" << TableDescriptor::DebugString() << ")"; |
| return out.str(); |
| } |
| |
| HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) |
| : TableDescriptor(tdesc), |
| hdfs_base_dir_(tdesc.hdfsTable.hdfsBaseDir), |
| null_partition_key_value_(tdesc.hdfsTable.nullPartitionKeyValue), |
| null_column_value_(tdesc.hdfsTable.nullColumnValue) { |
| for (const auto& entry : tdesc.hdfsTable.partitions) { |
| HdfsPartitionDescriptor* partition = |
| pool->Add(new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second)); |
| partition_descriptors_[entry.first] = partition; |
| } |
| prototype_partition_descriptor_ = pool->Add(new HdfsPartitionDescriptor( |
| tdesc.hdfsTable, tdesc.hdfsTable.prototype_partition)); |
| avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : ""; |
| } |
| |
| void HdfsTableDescriptor::ReleaseResources() { |
| for (const auto& part_entry: partition_descriptors_) { |
| for (ScalarExprEvaluator* eval : |
| part_entry.second->partition_key_value_evals()) { |
| eval->Close(nullptr); |
| const_cast<ScalarExpr&>(eval->root()).Close(); |
| } |
| } |
| } |
| |
| string HdfsTableDescriptor::DebugString() const { |
| stringstream out; |
| out << "HdfsTable(" << TableDescriptor::DebugString() |
| << " hdfs_base_dir='" << hdfs_base_dir_ << "'"; |
| out << " partitions=["; |
| vector<string> partition_strings; |
| map<int64_t, HdfsPartitionDescriptor*>::const_iterator it; |
| for (it = partition_descriptors_.begin(); it != partition_descriptors_.end(); ++it) { |
| stringstream s; |
| s << " (id: " << it->first << ", partition: " << it->second->DebugString() << ")"; |
| partition_strings.push_back(s.str()); |
| } |
| out << join(partition_strings, ",") << "]"; |
| |
| out << " null_partition_key_value='" << null_partition_key_value_ << "'"; |
| out << " null_column_value='" << null_column_value_ << "'"; |
| return out.str(); |
| } |
| |
| HBaseTableDescriptor::HBaseTableDescriptor(const TTableDescriptor& tdesc) |
| : TableDescriptor(tdesc), |
| table_name_(tdesc.hbaseTable.tableName) { |
| for (int i = 0; i < tdesc.hbaseTable.families.size(); ++i) { |
| bool is_binary_encoded = tdesc.hbaseTable.__isset.binary_encoded && |
| tdesc.hbaseTable.binary_encoded[i]; |
| cols_.push_back(HBaseTableDescriptor::HBaseColumnDescriptor( |
| tdesc.hbaseTable.families[i], tdesc.hbaseTable.qualifiers[i], is_binary_encoded)); |
| } |
| } |
| |
| string HBaseTableDescriptor::DebugString() const { |
| stringstream out; |
| out << "HBaseTable(" << TableDescriptor::DebugString() << " table=" << table_name_; |
| out << " cols=["; |
| for (int i = 0; i < cols_.size(); ++i) { |
| out << (i > 0 ? " " : "") << cols_[i].family << ":" << cols_[i].qualifier << ":" |
| << cols_[i].binary_encoded; |
| } |
| out << "])"; |
| return out.str(); |
| } |
| |
| KuduTableDescriptor::KuduTableDescriptor(const TTableDescriptor& tdesc) |
| : TableDescriptor(tdesc), |
| table_name_(tdesc.kuduTable.table_name), |
| key_columns_(tdesc.kuduTable.key_columns), |
| master_addresses_(tdesc.kuduTable.master_addresses) { |
| } |
| |
| string KuduTableDescriptor::DebugString() const { |
| stringstream out; |
| out << "KuduTable(" << TableDescriptor::DebugString() << " table=" << table_name_; |
| out << " master_addrs=[" << join(master_addresses_, ",") << "]"; |
| out << " key_columns=["; |
| out << join(key_columns_, ":"); |
| out << "])"; |
| return out.str(); |
| } |
| |
| TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc) |
| : id_(tdesc.id), |
| byte_size_(tdesc.byteSize), |
| num_null_bytes_(tdesc.numNullBytes), |
| null_bytes_offset_(tdesc.byteSize - tdesc.numNullBytes), |
| has_varlen_slots_(false), |
| tuple_path_(tdesc.tuplePath) { |
| } |
| |
| void TupleDescriptor::AddSlot(SlotDescriptor* slot) { |
| slots_.push_back(slot); |
| if (slot->type().IsVarLenStringType()) { |
| string_slots_.push_back(slot); |
| has_varlen_slots_ = true; |
| } |
| if (slot->type().IsCollectionType()) { |
| collection_slots_.push_back(slot); |
| has_varlen_slots_ = true; |
| } |
| } |
| |
| bool TupleDescriptor::ContainsStringData() const { |
| if (!string_slots_.empty()) return true; |
| for (int i = 0; i < collection_slots_.size(); ++i) { |
| if (collection_slots_[i]->collection_item_descriptor_->ContainsStringData()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| string TupleDescriptor::DebugString() const { |
| stringstream out; |
| out << "Tuple(id=" << id_ << " size=" << byte_size_; |
| if (table_desc_ != NULL) { |
| //out << " " << table_desc_->DebugString(); |
| } |
| out << " slots=["; |
| for (size_t i = 0; i < slots_.size(); ++i) { |
| if (i > 0) out << ", "; |
| out << slots_[i]->DebugString(); |
| } |
| out << "]"; |
| out << " tuple_path=["; |
| for (size_t i = 0; i < tuple_path_.size(); ++i) { |
| if (i > 0) out << ", "; |
| out << tuple_path_[i]; |
| } |
| out << "]"; |
| out << ")"; |
| return out.str(); |
| } |
| |
| bool TupleDescriptor::LayoutEquals(const TupleDescriptor& other_desc) const { |
| if (byte_size() != other_desc.byte_size()) return false; |
| if (slots().size() != other_desc.slots().size()) return false; |
| |
| vector<SlotDescriptor*> slots = SlotsOrderedByIdx(); |
| vector<SlotDescriptor*> other_slots = other_desc.SlotsOrderedByIdx(); |
| for (int i = 0; i < slots.size(); ++i) { |
| if (!slots[i]->LayoutEquals(*other_slots[i])) return false; |
| } |
| return true; |
| } |
| |
| RowDescriptor::RowDescriptor(const DescriptorTbl& desc_tbl, |
| const vector<TTupleId>& row_tuples, |
| const vector<bool>& nullable_tuples) |
| : tuple_idx_nullable_map_(nullable_tuples) { |
| DCHECK_EQ(nullable_tuples.size(), row_tuples.size()); |
| DCHECK_GT(row_tuples.size(), 0); |
| for (int i = 0; i < row_tuples.size(); ++i) { |
| tuple_desc_map_.push_back(desc_tbl.GetTupleDescriptor(row_tuples[i])); |
| DCHECK(tuple_desc_map_.back() != NULL); |
| } |
| InitTupleIdxMap(); |
| InitHasVarlenSlots(); |
| } |
| |
| RowDescriptor::RowDescriptor(const RowDescriptor& lhs_row_desc, |
| const RowDescriptor& rhs_row_desc) { |
| tuple_desc_map_.insert(tuple_desc_map_.end(), lhs_row_desc.tuple_desc_map_.begin(), |
| lhs_row_desc.tuple_desc_map_.end()); |
| tuple_desc_map_.insert(tuple_desc_map_.end(), rhs_row_desc.tuple_desc_map_.begin(), |
| rhs_row_desc.tuple_desc_map_.end()); |
| tuple_idx_nullable_map_.insert(tuple_idx_nullable_map_.end(), |
| lhs_row_desc.tuple_idx_nullable_map_.begin(), |
| lhs_row_desc.tuple_idx_nullable_map_.end()); |
| tuple_idx_nullable_map_.insert(tuple_idx_nullable_map_.end(), |
| rhs_row_desc.tuple_idx_nullable_map_.begin(), |
| rhs_row_desc.tuple_idx_nullable_map_.end()); |
| InitTupleIdxMap(); |
| InitHasVarlenSlots(); |
| } |
| |
| RowDescriptor::RowDescriptor(TupleDescriptor* tuple_desc, bool is_nullable) |
| : tuple_desc_map_(1, tuple_desc), |
| tuple_idx_nullable_map_(1, is_nullable) { |
| InitTupleIdxMap(); |
| InitHasVarlenSlots(); |
| } |
| |
| void RowDescriptor::InitTupleIdxMap() { |
| // find max id |
| TupleId max_id = 0; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| max_id = max(tuple_desc_map_[i]->id(), max_id); |
| } |
| |
| tuple_idx_map_.resize(max_id + 1, INVALID_IDX); |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| tuple_idx_map_[tuple_desc_map_[i]->id()] = i; |
| } |
| } |
| |
| void RowDescriptor::InitHasVarlenSlots() { |
| has_varlen_slots_ = false; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| if (tuple_desc_map_[i]->HasVarlenSlots()) { |
| has_varlen_slots_ = true; |
| break; |
| } |
| } |
| } |
| |
| int RowDescriptor::GetRowSize() const { |
| int size = 0; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| size += tuple_desc_map_[i]->byte_size(); |
| } |
| return size; |
| } |
| |
| int RowDescriptor::GetTupleIdx(TupleId id) const { |
| DCHECK_LT(id, tuple_idx_map_.size()) << "RowDescriptor: " << DebugString(); |
| return tuple_idx_map_[id]; |
| } |
| |
| bool RowDescriptor::TupleIsNullable(int tuple_idx) const { |
| DCHECK_LT(tuple_idx, tuple_idx_nullable_map_.size()); |
| return tuple_idx_nullable_map_[tuple_idx]; |
| } |
| |
| bool RowDescriptor::IsAnyTupleNullable() const { |
| for (int i = 0; i < tuple_idx_nullable_map_.size(); ++i) { |
| if (tuple_idx_nullable_map_[i]) return true; |
| } |
| return false; |
| } |
| |
| void RowDescriptor::ToThrift(vector<TTupleId>* row_tuple_ids) const { |
| row_tuple_ids->clear(); |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| row_tuple_ids->push_back(tuple_desc_map_[i]->id()); |
| } |
| } |
| |
| bool RowDescriptor::IsPrefixOf(const RowDescriptor& other_desc) const { |
| if (tuple_desc_map_.size() > other_desc.tuple_desc_map_.size()) return false; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| // pointer comparison okay, descriptors are unique |
| if (tuple_desc_map_[i] != other_desc.tuple_desc_map_[i]) return false; |
| } |
| return true; |
| } |
| |
| bool RowDescriptor::Equals(const RowDescriptor& other_desc) const { |
| if (tuple_desc_map_.size() != other_desc.tuple_desc_map_.size()) return false; |
| return IsPrefixOf(other_desc); |
| } |
| |
| bool RowDescriptor::LayoutIsPrefixOf(const RowDescriptor& other_desc) const { |
| if (tuple_desc_map_.size() > other_desc.tuple_desc_map_.size()) return false; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| if (!tuple_desc_map_[i]->LayoutEquals(*other_desc.tuple_desc_map_[i])) return false; |
| } |
| return true; |
| } |
| |
| bool RowDescriptor::LayoutEquals(const RowDescriptor& other_desc) const { |
| if (tuple_desc_map_.size() != other_desc.tuple_desc_map_.size()) return false; |
| return LayoutIsPrefixOf(other_desc); |
| } |
| |
| string RowDescriptor::DebugString() const { |
| stringstream ss; |
| for (int i = 0; i < tuple_desc_map_.size(); ++i) { |
| ss << tuple_desc_map_[i]->DebugString() << endl; |
| } |
| return ss.str(); |
| } |
| |
| Status DescriptorTbl::CreatePartKeyExprs( |
| const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) { |
| // Prepare and open partition exprs |
| for (const auto& part_entry : hdfs_tbl.partition_descriptors()) { |
| HdfsPartitionDescriptor* part_desc = part_entry.second; |
| vector<ScalarExpr*> partition_key_value_exprs; |
| RETURN_IF_ERROR(ScalarExpr::Create(part_desc->thrift_partition_key_exprs_, |
| RowDescriptor(), nullptr, pool, &partition_key_value_exprs)); |
| for (const ScalarExpr* partition_expr : partition_key_value_exprs) { |
| DCHECK(partition_expr->IsLiteral()); |
| DCHECK(!partition_expr->HasFnCtx()); |
| DCHECK_EQ(partition_expr->GetNumChildren(), 0); |
| } |
| // TODO: RowDescriptor should arguably be optional in Prepare for known literals. |
| // Partition exprs are not used in the codegen case. Don't codegen them. |
| RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_value_exprs, nullptr, |
| pool, nullptr, nullptr, &part_desc->partition_key_value_evals_)); |
| RETURN_IF_ERROR(ScalarExprEvaluator::Open( |
| part_desc->partition_key_value_evals_, nullptr)); |
| } |
| return Status::OK(); |
| } |
| |
| Status DescriptorTbl::DeserializeThrift(const TDescriptorTableSerialized& serial_tbl, |
| TDescriptorTable* desc_tbl) { |
| uint32_t serial_tbl_len = serial_tbl.thrift_desc_tbl.length(); |
| return DeserializeThriftMsg( |
| reinterpret_cast<const uint8_t*>(serial_tbl.thrift_desc_tbl.data()), |
| &serial_tbl_len, false, desc_tbl); |
| } |
| |
| Status DescriptorTbl::CreateHdfsTblDescriptor( |
| const TDescriptorTableSerialized& serialized_thrift_tbl, |
| TableId tbl_id, ObjectPool* pool, HdfsTableDescriptor** desc) { |
| TDescriptorTable thrift_tbl; |
| RETURN_IF_ERROR(DeserializeThrift(serialized_thrift_tbl, &thrift_tbl)); |
| for (const TTableDescriptor& tdesc: thrift_tbl.tableDescriptors) { |
| if (tdesc.id == tbl_id) { |
| DCHECK(tdesc.__isset.hdfsTable); |
| RETURN_IF_ERROR(CreateTblDescriptorInternal( |
| tdesc, pool, reinterpret_cast<TableDescriptor**>(desc))); |
| return Status::OK(); |
| } |
| } |
| string error = Substitute("table $0 not found in descriptor table", tbl_id); |
| DCHECK(false) << error; |
| return Status(error); |
| } |
| |
| Status DescriptorTbl::CreateTblDescriptorInternal(const TTableDescriptor& tdesc, |
| ObjectPool* pool, TableDescriptor** desc) { |
| *desc = nullptr; |
| switch (tdesc.tableType) { |
| case TTableType::HDFS_TABLE: { |
| HdfsTableDescriptor* hdfs_tbl = pool->Add(new HdfsTableDescriptor(tdesc, pool)); |
| *desc = hdfs_tbl; |
| RETURN_IF_ERROR(CreatePartKeyExprs(*hdfs_tbl, pool)); |
| break; |
| } |
| case TTableType::HBASE_TABLE: |
| *desc = pool->Add(new HBaseTableDescriptor(tdesc)); |
| break; |
| case TTableType::DATA_SOURCE_TABLE: |
| *desc = pool->Add(new DataSourceTableDescriptor(tdesc)); |
| break; |
| case TTableType::KUDU_TABLE: |
| *desc = pool->Add(new KuduTableDescriptor(tdesc)); |
| break; |
| default: |
| DCHECK(false) << "invalid table type: " << tdesc.tableType; |
| } |
| return Status::OK(); |
| } |
| |
| Status DescriptorTbl::Create(ObjectPool* pool, |
| const TDescriptorTableSerialized& serialized_thrift_tbl, DescriptorTbl** tbl) { |
| TDescriptorTable thrift_tbl; |
| RETURN_IF_ERROR(DeserializeThrift(serialized_thrift_tbl, &thrift_tbl)); |
| return CreateInternal(pool, thrift_tbl, tbl); |
| } |
| |
| Status DescriptorTbl::CreateInternal(ObjectPool* pool, const TDescriptorTable& thrift_tbl, |
| DescriptorTbl** tbl) { |
| *tbl = pool->Add(new DescriptorTbl()); |
| // deserialize table descriptors first, they are being referenced by tuple descriptors |
| for (const TTableDescriptor& tdesc: thrift_tbl.tableDescriptors) { |
| TableDescriptor* desc; |
| RETURN_IF_ERROR(CreateTblDescriptorInternal(tdesc, pool, &desc)); |
| (*tbl)->tbl_desc_map_[tdesc.id] = desc; |
| } |
| |
| for (size_t i = 0; i < thrift_tbl.tupleDescriptors.size(); ++i) { |
| const TTupleDescriptor& tdesc = thrift_tbl.tupleDescriptors[i]; |
| TupleDescriptor* desc = pool->Add(new TupleDescriptor(tdesc)); |
| // fix up table pointer |
| if (tdesc.__isset.tableId) { |
| desc->table_desc_ = (*tbl)->GetTableDescriptor(tdesc.tableId); |
| } |
| (*tbl)->tuple_desc_map_[tdesc.id] = desc; |
| } |
| |
| for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) { |
| const TSlotDescriptor& tdesc = thrift_tbl.slotDescriptors[i]; |
| // Tuple descriptors are already populated in tbl |
| TupleDescriptor* parent = (*tbl)->GetTupleDescriptor(tdesc.parent); |
| DCHECK(parent != nullptr); |
| TupleDescriptor* collection_item_descriptor = tdesc.__isset.itemTupleId ? |
| (*tbl)->GetTupleDescriptor(tdesc.itemTupleId) : nullptr; |
| SlotDescriptor* slot_d = pool->Add( |
| new SlotDescriptor(tdesc, parent, collection_item_descriptor)); |
| (*tbl)->slot_desc_map_[tdesc.id] = slot_d; |
| parent->AddSlot(slot_d); |
| } |
| return Status::OK(); |
| } |
| |
| void DescriptorTbl::ReleaseResources() { |
| // close partition exprs of hdfs tables |
| for (auto entry: tbl_desc_map_) { |
| if (entry.second->type() != TTableType::HDFS_TABLE) continue; |
| static_cast<HdfsTableDescriptor*>(entry.second)->ReleaseResources(); |
| } |
| } |
| |
| TableDescriptor* DescriptorTbl::GetTableDescriptor(TableId id) const { |
| TableDescriptorMap::const_iterator i = tbl_desc_map_.find(id); |
| return i == tbl_desc_map_.end() ? nullptr : i->second; |
| } |
| |
| TupleDescriptor* DescriptorTbl::GetTupleDescriptor(TupleId id) const { |
| TupleDescriptorMap::const_iterator i = tuple_desc_map_.find(id); |
| return i == tuple_desc_map_.end() ? nullptr : i->second; |
| } |
| |
| SlotDescriptor* DescriptorTbl::GetSlotDescriptor(SlotId id) const { |
| SlotDescriptorMap::const_iterator i = slot_desc_map_.find(id); |
| return i == slot_desc_map_.end() ? nullptr : i->second; |
| } |
| |
| void DescriptorTbl::GetTupleDescs(vector<TupleDescriptor*>* descs) const { |
| descs->clear(); |
| for (TupleDescriptorMap::const_iterator i = tuple_desc_map_.begin(); |
| i != tuple_desc_map_.end(); ++i) { |
| descs->push_back(i->second); |
| } |
| } |
| |
| llvm::Value* SlotDescriptor::CodegenIsNull( |
| LlvmCodeGen* codegen, LlvmBuilder* builder, llvm::Value* tuple) const { |
| return CodegenIsNull(codegen, builder, null_indicator_offset_, tuple); |
| } |
| |
| // Example IR for getting the first null bit: |
| // %0 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple to i8* |
| // %null_byte_ptr = getelementptr i8, i8* %0, i32 0 |
| // %null_byte = load i8, i8* %null_byte_ptr |
| // %null_mask = and i8 %null_byte, 1 |
| // %is_null = icmp ne i8 %null_mask, 0 |
| llvm::Value* SlotDescriptor::CodegenIsNull(LlvmCodeGen* codegen, LlvmBuilder* builder, |
| const NullIndicatorOffset& null_indicator_offset, llvm::Value* tuple) { |
| llvm::Value* null_byte = |
| CodegenGetNullByte(codegen, builder, null_indicator_offset, tuple, nullptr); |
| llvm::Constant* mask = codegen->GetI8Constant(null_indicator_offset.bit_mask); |
| llvm::Value* null_mask = builder->CreateAnd(null_byte, mask, "null_mask"); |
| llvm::Constant* zero = codegen->GetI8Constant(0); |
| return builder->CreateICmpNE(null_mask, zero, "is_null"); |
| } |
| |
| // Example IR for setting the first null bit to a non-constant 'is_null' value: |
| // %14 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple to i8* |
| // %null_byte_ptr3 = getelementptr i8, i8* %14, i32 0 |
| // %null_byte4 = load i8, i8* %null_byte_ptr3 |
| // %null_bit_cleared = and i8 %null_byte4, -2 |
| // %15 = sext i1 %result_is_null to i8 |
| // %null_bit = and i8 %15, 1 |
| // %null_bit_set = or i8 %null_bit_cleared, %null_bit |
| // store i8 %null_bit_set, i8* %null_byte_ptr3 |
| void SlotDescriptor::CodegenSetNullIndicator( |
| LlvmCodeGen* codegen, LlvmBuilder* builder, llvm::Value* tuple, llvm::Value* is_null) |
| const { |
| DCHECK_EQ(is_null->getType(), codegen->bool_type()); |
| llvm::Value* null_byte_ptr; |
| llvm::Value* null_byte = |
| CodegenGetNullByte(codegen, builder, null_indicator_offset_, tuple, &null_byte_ptr); |
| llvm::Constant* mask = codegen->GetI8Constant(null_indicator_offset_.bit_mask); |
| llvm::Constant* not_mask = codegen->GetI8Constant(~null_indicator_offset_.bit_mask); |
| |
| llvm::ConstantInt* constant_is_null = llvm::dyn_cast<llvm::ConstantInt>(is_null); |
| llvm::Value* result = nullptr; |
| if (constant_is_null != nullptr) { |
| if (constant_is_null->isOne()) { |
| result = builder->CreateOr(null_byte, mask, "null_bit_set"); |
| } else { |
| DCHECK(constant_is_null->isZero()); |
| result = builder->CreateAnd(null_byte, not_mask, "null_bit_cleared"); |
| } |
| } else { |
| // Avoid branching by computing the new byte as: |
| // (null_byte & ~mask) | (-null & mask); |
| llvm::Value* byte_with_cleared_bit = |
| builder->CreateAnd(null_byte, not_mask, "null_bit_cleared"); |
| llvm::Value* sign_extended_null = |
| builder->CreateSExt(is_null, codegen->i8_type()); |
| llvm::Value* bit_only = builder->CreateAnd(sign_extended_null, mask, "null_bit"); |
| result = builder->CreateOr(byte_with_cleared_bit, bit_only, "null_bit_set"); |
| } |
| |
| builder->CreateStore(result, null_byte_ptr); |
| } |
| |
| llvm::Value* SlotDescriptor::CodegenGetNullByte( |
| LlvmCodeGen* codegen, LlvmBuilder* builder, |
| const NullIndicatorOffset& null_indicator_offset, llvm::Value* tuple, |
| llvm::Value** null_byte_ptr) { |
| llvm::Constant* byte_offset = |
| codegen->GetI32Constant(null_indicator_offset.byte_offset); |
| llvm::Value* tuple_bytes = builder->CreateBitCast(tuple, codegen->ptr_type()); |
| llvm::Value* byte_ptr = |
| builder->CreateInBoundsGEP(tuple_bytes, byte_offset, "null_byte_ptr"); |
| if (null_byte_ptr != nullptr) *null_byte_ptr = byte_ptr; |
| return builder->CreateLoad(byte_ptr, "null_byte"); |
| } |
| |
| vector<SlotDescriptor*> TupleDescriptor::SlotsOrderedByIdx() const { |
| vector<SlotDescriptor*> sorted_slots(slots().size()); |
| for (SlotDescriptor* slot: slots()) sorted_slots[slot->slot_idx_] = slot; |
| return sorted_slots; |
| } |
| |
| llvm::StructType* TupleDescriptor::GetLlvmStruct(LlvmCodeGen* codegen) const { |
| // Get slots in the order they will appear in LLVM struct. |
| vector<SlotDescriptor*> sorted_slots = SlotsOrderedByIdx(); |
| |
| // Add the slot types to the struct description. |
| vector<llvm::Type*> struct_fields; |
| int curr_struct_offset = 0; |
| for (SlotDescriptor* slot: sorted_slots) { |
| DCHECK_EQ(curr_struct_offset, slot->tuple_offset()); |
| slot->llvm_field_idx_ = struct_fields.size(); |
| struct_fields.push_back(codegen->GetSlotType(slot->type())); |
| curr_struct_offset = slot->tuple_offset() + slot->slot_size(); |
| } |
| // For each null byte, add a byte to the struct |
| for (int i = 0; i < num_null_bytes_; ++i) { |
| struct_fields.push_back(codegen->i8_type()); |
| ++curr_struct_offset; |
| } |
| |
| DCHECK_LE(curr_struct_offset, byte_size_); |
| if (curr_struct_offset < byte_size_) { |
| struct_fields.push_back(llvm::ArrayType::get(codegen->i8_type(), |
| byte_size_ - curr_struct_offset)); |
| } |
| |
| // Construct the struct type. Use the packed layout although not strictly necessary |
| // because the fields are already aligned, so LLVM should not add any padding. The |
| // fields are already aligned because we order the slots by descending size and only |
| // have powers-of-two slot sizes. Note that STRING and TIMESTAMP slots both occupy |
| // 16 bytes although their useful payload is only 12 bytes. |
| llvm::StructType* tuple_struct = llvm::StructType::get(codegen->context(), |
| llvm::ArrayRef<llvm::Type*>(struct_fields), true); |
| const llvm::DataLayout& data_layout = codegen->execution_engine()->getDataLayout(); |
| const llvm::StructLayout* layout = data_layout.getStructLayout(tuple_struct); |
| for (SlotDescriptor* slot: slots()) { |
| // Verify that the byte offset in the llvm struct matches the tuple offset |
| // computed in the FE. |
| DCHECK_EQ(layout->getElementOffset(slot->llvm_field_idx()), slot->tuple_offset()) |
| << id_; |
| } |
| return tuple_struct; |
| } |
| |
| string DescriptorTbl::DebugString() const { |
| stringstream out; |
| out << "tuples:\n"; |
| for (TupleDescriptorMap::const_iterator i = tuple_desc_map_.begin(); |
| i != tuple_desc_map_.end(); ++i) { |
| out << i->second->DebugString() << '\n'; |
| } |
| return out.str(); |
| } |
| |
| } |