| /** |
| * Copyright 2011-2015 Quickstep Technologies LLC. |
| * Copyright 2015-2016 Pivotal Software, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| #include "storage/StorageBlock.hpp" |
| |
| #include <climits> |
| #include <memory> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogRelationSchema.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "expressions/aggregation/AggregationHandle.hpp" |
| #include "expressions/predicate/Predicate.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "storage/BasicColumnStoreTupleStorageSubBlock.hpp" |
| #include "storage/CSBTreeIndexSubBlock.hpp" |
| #include "storage/CompressedColumnStoreTupleStorageSubBlock.hpp" |
| #include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp" |
| #include "storage/CountedReference.hpp" |
| #include "storage/HashTableBase.hpp" |
| #include "storage/IndexSubBlock.hpp" |
| #include "storage/InsertDestinationInterface.hpp" |
| #include "storage/PackedRowStoreTupleStorageSubBlock.hpp" |
| #include "storage/SMAIndexSubBlock.hpp" |
| #include "storage/SplitRowStoreTupleStorageSubBlock.hpp" |
| #include "storage/StorageBlockBase.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "storage/StorageBlockLayout.hpp" |
| #include "storage/StorageBlockLayout.pb.h" |
| #include "storage/StorageConfig.h" |
| #include "storage/StorageErrors.hpp" |
| #include "storage/SubBlocksReference.hpp" |
| #include "storage/TupleIdSequence.hpp" |
| #include "storage/TupleStorageSubBlock.hpp" |
| #include "storage/ValueAccessor.hpp" |
| #include "storage/ValueAccessorUtil.hpp" |
| #include "types/TypedValue.hpp" |
| #include "types/containers/ColumnVector.hpp" |
| #include "types/containers/ColumnVectorsValueAccessor.hpp" |
| #include "types/containers/Tuple.hpp" |
| #include "types/operations/comparisons/ComparisonUtil.hpp" |
| #include "utility/Macros.hpp" |
| |
| #include "glog/logging.h" |
| |
| using std::make_pair; |
| using std::pair; |
| using std::size_t; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| |
| namespace quickstep { |
| |
| class Type; |
| |
| StorageBlock::StorageBlock(const CatalogRelationSchema &relation, |
| const block_id id, |
| const StorageBlockLayout &layout, |
| const bool new_block, |
| void *block_memory, |
| const std::size_t block_memory_size) |
| : StorageBlockBase(id, block_memory, block_memory_size), |
| all_indices_consistent_(true), |
| all_indices_inconsistent_(false), |
| relation_(relation) { |
| if (new_block) { |
| if (block_memory_size_ < layout.getBlockHeaderSize()) { |
| throw BlockMemoryTooSmall("StorageBlock", block_memory_size_); |
| } |
| |
| layout.copyHeaderTo(block_memory_); |
| DEBUG_ASSERT(*static_cast<const int*>(block_memory_) > 0); |
| |
| if (!block_header_.ParseFromArray(static_cast<char*>(block_memory_) + sizeof(int), |
| *static_cast<const int*>(block_memory_))) { |
| FATAL_ERROR("A StorageBlockLayout created a malformed StorageBlockHeader."); |
| } |
| |
| // We mark a newly-created block as dirty, so that in the rare case that a |
| // block is evicted before anything is inserted into it, we still write it |
| // (and the header plus any sub-block specific fixed data structures) back |
| // to disk. |
| dirty_ = true; |
| |
| DEBUG_ASSERT(block_header_.IsInitialized()); |
| DEBUG_ASSERT(StorageBlockLayout::DescriptionIsValid(relation_, block_header_.layout())); |
| DEBUG_ASSERT(block_header_.index_size_size() == block_header_.layout().index_description_size()); |
| DEBUG_ASSERT(block_header_.index_size_size() == block_header_.index_consistent_size()); |
| } else { |
| if (block_memory_size < sizeof(int)) { |
| throw MalformedBlock(); |
| } |
| if (*static_cast<const int*>(block_memory_) <= 0) { |
| throw MalformedBlock(); |
| } |
| if (*static_cast<const int*>(block_memory_) + sizeof(int) > block_memory_size_) { |
| throw MalformedBlock(); |
| } |
| |
| if (!block_header_.ParseFromArray(static_cast<char*>(block_memory_) + sizeof(int), |
| *static_cast<const int*>(block_memory_))) { |
| throw MalformedBlock(); |
| } |
| if (!block_header_.IsInitialized()) { |
| throw MalformedBlock(); |
| } |
| if (!StorageBlockLayout::DescriptionIsValid(relation_, block_header_.layout())) { |
| throw MalformedBlock(); |
| } |
| if (block_header_.index_size_size() != block_header_.layout().index_description_size()) { |
| throw MalformedBlock(); |
| } |
| if (block_header_.index_size_size() != block_header_.index_consistent_size()) { |
| throw MalformedBlock(); |
| } |
| } |
| |
| size_t block_size_from_metadata = *static_cast<const int*>(block_memory_) + sizeof(int); |
| block_size_from_metadata += block_header_.tuple_store_size(); |
| for (int index_num = 0; |
| index_num < block_header_.index_size_size(); |
| ++index_num) { |
| block_size_from_metadata += block_header_.index_size(index_num); |
| } |
| |
| if (block_size_from_metadata > block_memory_size_) { |
| throw MalformedBlock(); |
| } else if (block_size_from_metadata < block_memory_size_) { |
| // WARNING: this isn't strictly an error, but it does indicate that there |
| // is unallocated space in the block. |
| } |
| |
| char *sub_block_address = static_cast<char*>(block_memory_) |
| + sizeof(int) |
| + *static_cast<const int*>(block_memory_); |
| tuple_store_.reset(CreateTupleStorageSubBlock( |
| relation_, |
| block_header_.layout().tuple_store_description(), |
| new_block, |
| sub_block_address, |
| block_header_.tuple_store_size())); |
| sub_block_address += block_header_.tuple_store_size(); |
| ad_hoc_insert_supported_ = tuple_store_->supportsAdHocInsert(); |
| ad_hoc_insert_efficient_ = tuple_store_->adHocInsertIsEfficient(); |
| |
| if (block_header_.index_size_size() > 0) { |
| all_indices_inconsistent_ = true; |
| } |
| for (int index_num = 0; |
| index_num < block_header_.index_size_size(); |
| ++index_num) { |
| indices_.push_back(CreateIndexSubBlock(*tuple_store_, |
| block_header_.layout().index_description(index_num), |
| new_block, |
| sub_block_address, |
| block_header_.index_size(index_num))); |
| sub_block_address += block_header_.index_size(index_num); |
| if (!indices_.back().supportsAdHocAdd()) { |
| ad_hoc_insert_efficient_ = false; |
| } |
| |
| if (block_header_.index_consistent(index_num)) { |
| indices_consistent_.push_back(true); |
| all_indices_inconsistent_ = false; |
| } else { |
| indices_consistent_.push_back(false); |
| all_indices_consistent_ = false; |
| } |
| } |
| } |
| |
| bool StorageBlock::insertTuple(const Tuple &tuple) { |
| if (!ad_hoc_insert_supported_) { |
| return false; |
| } |
| |
| const bool empty_before = tuple_store_->isEmpty(); |
| |
| TupleStorageSubBlock::InsertResult tuple_store_insert_result = tuple_store_->insertTuple(tuple); |
| if (tuple_store_insert_result.inserted_id < 0) { |
| DEBUG_ASSERT(tuple_store_insert_result.ids_mutated == false); |
| if (empty_before) { |
| throw TupleTooLargeForBlock(tuple.getByteSize()); |
| } else { |
| return false; |
| } |
| } |
| |
| bool update_succeeded = true; |
| |
| if (tuple_store_insert_result.ids_mutated) { |
| update_succeeded = rebuildIndexes(true); |
| if (!update_succeeded) { |
| tuple_store_->deleteTuple(tuple_store_insert_result.inserted_id); |
| if (!rebuildIndexes(true)) { |
| // It should always be possible to rebuild an index with the tuples |
| // which it originally contained. |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } |
| } else { |
| update_succeeded = insertEntryInIndexes(tuple_store_insert_result.inserted_id); |
| } |
| |
| if (update_succeeded) { |
| dirty_ = true; |
| return true; |
| } else { |
| if (empty_before) { |
| throw TupleTooLargeForBlock(tuple.getByteSize()); |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| bool StorageBlock::insertTupleInBatch(const Tuple &tuple) { |
| if (tuple_store_->insertTupleInBatch(tuple)) { |
| invalidateAllIndexes(); |
| dirty_ = true; |
| return true; |
| } else { |
| if (tuple_store_->isEmpty()) { |
| throw TupleTooLargeForBlock(tuple.getByteSize()); |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| tuple_id StorageBlock::bulkInsertTuples(ValueAccessor *accessor) { |
| const tuple_id num_inserted = tuple_store_->bulkInsertTuples(accessor); |
| if (num_inserted != 0) { |
| invalidateAllIndexes(); |
| dirty_ = true; |
| } else if (tuple_store_->isEmpty()) { |
| if (!accessor->iterationFinishedVirtual()) { |
| throw TupleTooLargeForBlock(0); |
| } |
| } |
| return num_inserted; |
| } |
| |
| tuple_id StorageBlock::bulkInsertTuplesWithRemappedAttributes( |
| const std::vector<attribute_id> &attribute_map, |
| ValueAccessor *accessor) { |
| const tuple_id num_inserted |
| = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, |
| accessor); |
| if (num_inserted != 0) { |
| invalidateAllIndexes(); |
| dirty_ = true; |
| } else if (tuple_store_->isEmpty()) { |
| if (!accessor->iterationFinishedVirtual()) { |
| throw TupleTooLargeForBlock(0); |
| } |
| } |
| return num_inserted; |
| } |
| |
| void StorageBlock::sample(const bool is_block_sample, |
| const int percentage, |
| InsertDestinationInterface *destination) const { |
| std::unique_ptr<ValueAccessor> accessor; |
| // Bulk insert if the sampling method is block sample or if the sample |
| // percent is 100 |
| if (is_block_sample || percentage == 100) { |
| accessor.reset(tuple_store_->createValueAccessor()); |
| destination->bulkInsertTuples(accessor.get()); |
| } else { |
| int actual_percentage = percentage; |
| std::random_device random_device; |
| std::mt19937 generator(random_device()); |
| std::unique_ptr<TupleIdSequence> sequence; |
| // Get the tuple id sequence |
| sequence.reset(tuple_store_->getExistenceMap()); |
| // Initialize a new TupleIdSequence from the existing sequence |
| // but with a default bit vector of all zeros |
| std::unique_ptr<TupleIdSequence> sequence_mask( |
| new TupleIdSequence(sequence->length())); |
| std::unordered_map<std::size_t, std::size_t> tuple_index_mapping; |
| bool invert_bits = false; |
| |
| // If we have to set more number of 1 in the TupleIdSequence |
| // calculate the number of 0's to be set(corresponding to the |
| // rows that will not be picked) and set those rows to 1. |
| if (percentage > 50) { |
| actual_percentage = 100 - percentage; |
| invert_bits = true; |
| } |
| const std::size_t num_tuples = sequence->length() * actual_percentage / 100; |
| |
| // Total possible tuple id space (N) is given by the length of |
| // TupleIdSequence |
| // Generate a random number between 0 and N |
| for (std::size_t n = 0; n < num_tuples; n++) { |
| std::uniform_real_distribution<> distribution(0, sequence->length() - (n + 1)); |
| std::size_t random_number = distribution(generator); |
| sequence_mask->set(tuple_index_mapping.find(random_number) == |
| tuple_index_mapping.end() |
| ? random_number |
| : tuple_index_mapping[random_number], |
| true); |
| tuple_index_mapping[random_number] = sequence->length() - (n + 1); |
| } |
| |
| // Since the bits set correspond to actual zeros invert the bitvector |
| if (invert_bits) { |
| sequence_mask->invert(); |
| } |
| accessor.reset(tuple_store_->createValueAccessor(sequence_mask.get())); |
| destination->bulkInsertTuples(accessor.get()); |
| } |
| } |
| |
| void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection, |
| const Predicate *predicate, |
| InsertDestinationInterface *destination) const { |
| ColumnVectorsValueAccessor temp_result; |
| { |
| SubBlocksReference sub_blocks_ref(*tuple_store_, |
| indices_, |
| indices_consistent_); |
| |
| std::unique_ptr<TupleIdSequence> matches; |
| std::unique_ptr<ValueAccessor> accessor; |
| if (predicate == nullptr) { |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } else { |
| matches.reset(getMatchesForPredicate(predicate)); |
| accessor.reset(tuple_store_->createValueAccessor(matches.get())); |
| } |
| |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin(); |
| selection_cit != selection.end(); |
| ++selection_cit) { |
| // TODO(chasseur): Can probably elide some copies for parts of the |
| // selection that are ScalarAttribute or ScalarLiteral. |
| temp_result.addColumn((*selection_cit)->getAllValues(accessor.get(), &sub_blocks_ref)); |
| } |
| } |
| |
| destination->bulkInsertTuples(&temp_result); |
| } |
| |
| void StorageBlock::selectSimple(const std::vector<attribute_id> &selection, |
| const Predicate *predicate, |
| InsertDestinationInterface *destination) const { |
| std::unique_ptr<ValueAccessor> accessor; |
| std::unique_ptr<TupleIdSequence> matches; |
| if (predicate == nullptr) { |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } else { |
| matches.reset(getMatchesForPredicate(predicate)); |
| accessor.reset(tuple_store_->createValueAccessor(matches.get())); |
| } |
| |
| destination->bulkInsertTuplesWithRemappedAttributes(selection, |
| accessor.get()); |
| } |
| |
| AggregationState* StorageBlock::aggregate( |
| const AggregationHandle &handle, |
| const std::vector<std::unique_ptr<const Scalar>> &arguments, |
| const std::vector<attribute_id> *arguments_as_attributes, |
| const Predicate *predicate, |
| std::unique_ptr<TupleIdSequence> *reuse_matches) const { |
| // If there is a filter predicate that hasn't already been evaluated, |
| // evaluate it now and save the results for other aggregates on this same |
| // block. |
| if (predicate && !*reuse_matches) { |
| reuse_matches->reset(getMatchesForPredicate(predicate)); |
| } |
| |
| #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION |
| // If all the arguments to this aggregate are plain relation attributes, |
| // aggregate directly on a ValueAccessor from this block to avoid a copy. |
| if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) { |
| DCHECK_EQ(arguments.size(), arguments_as_attributes->size()) |
| << "Mismatch between number of arguments and number of attribute_ids"; |
| return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get()); |
| } |
| // TODO(shoban): We may want to optimize for ScalarLiteral here. |
| #endif |
| |
| // Call aggregateHelperColumnVector() to materialize each argument as a |
| // ColumnVector, then aggregate over those. |
| return aggregateHelperColumnVector(handle, arguments, reuse_matches->get()); |
| } |
| |
| void StorageBlock::aggregateGroupBy( |
| const AggregationHandle &handle, |
| const std::vector<std::unique_ptr<const Scalar>> &arguments, |
| const std::vector<std::unique_ptr<const Scalar>> &group_by, |
| const Predicate *predicate, |
| AggregationStateHashTableBase *hash_table, |
| std::unique_ptr<TupleIdSequence> *reuse_matches, |
| std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { |
| DCHECK_GT(group_by.size(), 0u) |
| << "Called aggregateGroupBy() with zero GROUP BY expressions"; |
| |
| SubBlocksReference sub_blocks_ref(*tuple_store_, |
| indices_, |
| indices_consistent_); |
| |
| // IDs of 'arguments' as attributes in the ValueAccessor we create below. |
| std::vector<attribute_id> argument_ids; |
| |
| // IDs of GROUP BY key element(s) in the ValueAccessor we create below. |
| std::vector<attribute_id> key_ids; |
| |
| // An intermediate ValueAccessor that stores the materialized 'arguments' for |
| // this aggregate, as well as the GROUP BY expression values. |
| ColumnVectorsValueAccessor temp_result; |
| { |
| std::unique_ptr<ValueAccessor> accessor; |
| if (predicate) { |
| if (!*reuse_matches) { |
| // If there is a filter predicate that hasn't already been evaluated, |
| // evaluate it now and save the results for other aggregates on this |
| // same block. |
| reuse_matches->reset(getMatchesForPredicate(predicate)); |
| } |
| |
| // Create a filtered ValueAccessor that only iterates over predicate |
| // matches. |
| accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); |
| } else { |
| // Create a ValueAccessor that iterates over all tuples in this block |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } |
| |
| attribute_id attr_id = 0; |
| |
| // First, put GROUP BY keys into 'temp_result'. |
| if (reuse_group_by_vectors->empty()) { |
| // Compute GROUP BY values from group_by Scalars, and store them in |
| // reuse_group_by_vectors for reuse by other aggregates on this same |
| // block. |
| reuse_group_by_vectors->reserve(group_by.size()); |
| for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { |
| reuse_group_by_vectors->emplace_back( |
| group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); |
| temp_result.addColumn(reuse_group_by_vectors->back().get(), false); |
| key_ids.push_back(attr_id++); |
| } |
| } else { |
| // Reuse precomputed GROUP BY values from reuse_group_by_vectors. |
| DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) |
| << "Wrong number of reuse_group_by_vectors"; |
| for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) { |
| temp_result.addColumn(reuse_cv.get(), false); |
| key_ids.push_back(attr_id++); |
| } |
| } |
| |
| // Compute argument vectors and add them to 'temp_result'. |
| for (const std::unique_ptr<const Scalar> &argument : arguments) { |
| temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); |
| argument_ids.push_back(attr_id++); |
| } |
| } |
| |
| // Actually do aggregation into '*hash_table'. |
| handle.aggregateValueAccessorIntoHashTable(&temp_result, |
| argument_ids, |
| key_ids, |
| hash_table); |
| } |
| |
| void StorageBlock::aggregateDistinct( |
| const AggregationHandle &handle, |
| const std::vector<std::unique_ptr<const Scalar>> &arguments, |
| const std::vector<attribute_id> *arguments_as_attributes, |
| const std::vector<std::unique_ptr<const Scalar>> &group_by, |
| const Predicate *predicate, |
| AggregationStateHashTableBase *distinctify_hash_table, |
| std::unique_ptr<TupleIdSequence> *reuse_matches, |
| std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { |
| DCHECK_GT(arguments.size(), 0u) |
| << "Called aggregateDistinct() with zero argument expressions"; |
| DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr)); |
| |
| std::vector<attribute_id> key_ids; |
| |
| // An intermediate ValueAccessor that stores the materialized 'arguments' for |
| // this aggregate, as well as the GROUP BY expression values. |
| ColumnVectorsValueAccessor temp_result; |
| { |
| std::unique_ptr<ValueAccessor> accessor; |
| if (predicate) { |
| if (!*reuse_matches) { |
| // If there is a filter predicate that hasn't already been evaluated, |
| // evaluate it now and save the results for other aggregates on this |
| // same block. |
| reuse_matches->reset(getMatchesForPredicate(predicate)); |
| } |
| |
| // Create a filtered ValueAccessor that only iterates over predicate |
| // matches. |
| accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); |
| } else { |
| // Create a ValueAccessor that iterates over all tuples in this block |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } |
| |
| #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION |
| // If all the arguments to this aggregate are plain relation attributes, |
| // aggregate directly on a ValueAccessor from this block to avoid a copy. |
| if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) { |
| DCHECK_EQ(arguments.size(), arguments_as_attributes->size()) |
| << "Mismatch between number of arguments and number of attribute_ids"; |
| DCHECK_EQ(group_by.size(), 0u); |
| handle.insertValueAccessorIntoDistinctifyHashTable( |
| accessor.get(), *arguments_as_attributes, distinctify_hash_table); |
| return; |
| } |
| #endif |
| |
| SubBlocksReference sub_blocks_ref(*tuple_store_, |
| indices_, |
| indices_consistent_); |
| attribute_id attr_id = 0; |
| |
| if (!group_by.empty()) { |
| // Put GROUP BY keys into 'temp_result'. |
| if (reuse_group_by_vectors->empty()) { |
| // Compute GROUP BY values from group_by Scalars, and store them in |
| // reuse_group_by_vectors for reuse by other aggregates on this same |
| // block. |
| reuse_group_by_vectors->reserve(group_by.size()); |
| for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { |
| reuse_group_by_vectors->emplace_back( |
| group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); |
| temp_result.addColumn(reuse_group_by_vectors->back().get(), false); |
| key_ids.push_back(attr_id++); |
| } |
| } else { |
| // Reuse precomputed GROUP BY values from reuse_group_by_vectors. |
| DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) |
| << "Wrong number of reuse_group_by_vectors"; |
| for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) { |
| temp_result.addColumn(reuse_cv.get(), false); |
| key_ids.push_back(attr_id++); |
| } |
| } |
| } |
| // Compute argument vectors and add them to 'temp_result'. |
| for (const std::unique_ptr<const Scalar> &argument : arguments) { |
| temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); |
| key_ids.push_back(attr_id++); |
| } |
| } |
| |
| handle.insertValueAccessorIntoDistinctifyHashTable( |
| &temp_result, key_ids, distinctify_hash_table); |
| } |
| |
| |
| // TODO(chasseur): Vectorization for updates. |
| StorageBlock::UpdateResult StorageBlock::update( |
| const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments, |
| const Predicate *predicate, |
| InsertDestinationInterface *relocation_destination) { |
| if (relation_.getID() != relocation_destination->getRelation().getID()) { |
| FATAL_ERROR("StorageBlock::update() called with a relocation_destination " |
| "that does not belong to the same relation."); |
| } |
| |
| UpdateResult retval; |
| // TODO(chasseur): Be smarter and only update indexes that need to be updated. |
| std::unique_ptr<TupleIdSequence> matches(getMatchesForPredicate(predicate)); |
| |
| // If nothing matches the predicate, return immediately. |
| if (matches->empty()) { |
| retval.indices_consistent = all_indices_consistent_; |
| retval.relocation_destination_used = false; |
| return retval; |
| } |
| |
| // Remove index entries for tuples which will be updated. |
| bool rebuild_some_indices = false; |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); |
| it != indices_.end(); |
| ++it) { |
| if (it->supportsAdHocRemove()) { |
| it->bulkRemoveEntries(*matches); |
| } else { |
| rebuild_some_indices = true; |
| } |
| } |
| |
| // To be safe, relocate ALL tuples if the relation is partitioned and we are |
| // updating the partitioning attribute. |
| const bool relocate_all = |
| assignments.find(relocation_destination->getPartitioningAttribute()) != assignments.end(); |
| |
| // IDs of tuples which should be re-added to indices. |
| TupleIdSequence in_place_ids(tuple_store_->getMaxTupleID() + 1); |
| // IDs of tuples which must be reinserted. |
| TupleIdSequence relocate_ids(tuple_store_->getMaxTupleID() + 1); |
| std::vector<Tuple> relocation_buffer; |
| |
| std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor()); |
| for (TupleIdSequence::const_iterator match_it = matches->begin(); |
| match_it != matches->end(); |
| ++match_it) { |
| // Generate a map of updated values according to 'assignments'. |
| std::unique_ptr<std::unordered_map<attribute_id, TypedValue>> |
| updated_values(generateUpdatedValues(*accessor, *match_it, assignments)); |
| |
| // Determine if the tuple can be modified in-place without deleting and |
| // reinserting. |
| if (!relocate_all |
| && tuple_store_->canSetAttributeValuesInPlaceTyped(*match_it, *updated_values)) { |
| // Update attribute values in place. |
| for (std::unordered_map<attribute_id, TypedValue>::const_iterator update_it |
| = updated_values->begin(); |
| update_it != updated_values->end(); |
| ++update_it) { |
| tuple_store_->setAttributeValueInPlaceTyped(*match_it, update_it->first, update_it->second); |
| } |
| |
| in_place_ids.set(*match_it, true); |
| } else { |
| // Make a copy of the tuple with the updated values. |
| std::vector<TypedValue> updated_tuple_values; |
| for (CatalogRelationSchema::const_iterator attr_it = relation_.begin(); |
| attr_it != relation_.end(); |
| ++attr_it) { |
| std::unordered_map<attribute_id, TypedValue>::iterator update_it |
| = updated_values->find(attr_it->getID()); |
| if (update_it == updated_values->end()) { |
| updated_tuple_values.emplace_back(tuple_store_->getAttributeValueTyped(*match_it, attr_it->getID())); |
| updated_tuple_values.back().ensureNotReference(); |
| } else { |
| updated_tuple_values.emplace_back(std::move(update_it->second)); |
| } |
| } |
| |
| relocation_buffer.emplace_back(std::move(updated_tuple_values)); |
| relocate_ids.set(*match_it, true); |
| } |
| } |
| |
| bool rebuild_all = false; |
| retval.indices_consistent = all_indices_consistent_; |
| retval.relocation_destination_used = false; |
| if (!relocate_ids.empty()) { |
| // Delete relocated tuples. |
| if (tuple_store_->bulkDeleteTuples(&relocate_ids)) { |
| rebuild_all = true; |
| } |
| |
| if (relocate_all) { |
| // Immediately bulk-insert into InsertDestination if required by |
| // partitioning. |
| retval.relocation_destination_used = true; |
| relocation_destination->insertTuplesFromVector(relocation_buffer.begin(), |
| relocation_buffer.end()); |
| } else { |
| // Reinsert into this block until space is exhausted. |
| for (std::vector<Tuple>::const_iterator copy_it = relocation_buffer.begin(); |
| copy_it != relocation_buffer.end(); |
| ++copy_it) { |
| if (rebuild_all || (!tuple_store_->adHocInsertIsEfficient())) { |
| // If we must rebuild anyway, we might as well use the fast insert path. |
| if (tuple_store_->insertTupleInBatch(*copy_it)) { |
| rebuild_all = true; |
| } else { |
| retval.relocation_destination_used = true; |
| relocation_destination->insertTuplesFromVector(copy_it, relocation_buffer.end()); |
| break; |
| } |
| } else { |
| const TupleStorageSubBlock::InsertResult reinsert_result |
| = tuple_store_->insertTuple(*copy_it); |
| if (reinsert_result.inserted_id < 0) { |
| DCHECK(!reinsert_result.ids_mutated); |
| retval.relocation_destination_used = true; |
| relocation_destination->insertTuplesFromVector(copy_it, relocation_buffer.end()); |
| break; |
| } |
| |
| if (reinsert_result.ids_mutated) { |
| rebuild_all = true; |
| } else { |
| // Only bother adding 'reinsert_id' to 'in_place_ids' if not rebuilding. |
| in_place_ids.set(reinsert_result.inserted_id, true); |
| } |
| } |
| } |
| } |
| } |
| |
| // TODO(chasseur): Consider doing a partial rollback when an index rebuild |
| // fails. |
| if (rebuild_all) { |
| tuple_store_->rebuild(); |
| |
| // Rebuild indexes. |
| retval.indices_consistent = rebuildIndexes(false); |
| } else if (!indices_.empty()) { |
| all_indices_inconsistent_ = true; |
| int index_num = 0; |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); |
| it != indices_.end(); |
| ++it, ++index_num) { |
| bool add_successful; |
| if (it->supportsAdHocAdd() && ((!rebuild_some_indices) || it->supportsAdHocRemove())) { |
| add_successful = it->bulkAddEntries(in_place_ids); |
| #ifdef QUICKSTEP_REBUILD_INDEX_ON_UPDATE_OVERFLOW |
| if (!add_successful) { |
| add_successful = it->rebuild(); |
| } |
| #endif |
| } else { |
| add_successful = it->rebuild(); |
| } |
| if (add_successful) { |
| block_header_.set_index_consistent(index_num, true); |
| indices_consistent_[index_num] = true; |
| all_indices_inconsistent_ = false; |
| } else { |
| block_header_.set_index_consistent(index_num, false); |
| indices_consistent_[index_num] = false; |
| all_indices_consistent_ = false; |
| } |
| } |
| |
| updateHeader(); |
| retval.indices_consistent = all_indices_consistent_; |
| } |
| |
| dirty_ = true; |
| return retval; |
| } |
| |
| namespace { |
| |
| // Helper class to sort tuple. |
| // |
| // TODO(shoban): Refine this to use values directly instead of pointing to |
| // memory in storage block for better cache locality. |
| class SortReference { |
| public: |
| SortReference(const void *value, const tuple_id tuple) : tuple_(tuple), value_(value) {} |
| |
| inline const void* getDataPtr() const { |
| return value_; |
| } |
| |
| inline tuple_id getTupleID() const { |
| return tuple_; |
| } |
| |
| private: |
| tuple_id tuple_; |
| const void *value_; |
| }; |
| |
| } // namespace |
| |
| void StorageBlock::sortColumn(bool use_input_sequence, |
| const Scalar &sort_attribute, |
| bool sort_is_ascending, |
| bool null_first, |
| OrderedTupleIdSequence *sorted_sequence) const { |
| const attribute_id sort_attr_id = sort_attribute.getAttributeIdForValueAccessor(); |
| DCHECK_NE(sort_attr_id, -1) |
| << "Attempted to sort a StorageBlock on a sort attribute that is not " |
| << "actually a ScalarAttribute."; |
| |
| const Type &sort_attr_type = sort_attribute.getType(); |
| |
| std::unique_ptr<ValueAccessor> accessor; |
| std::vector<SortReference> refs; |
| OrderedTupleIdSequence nulls; |
| |
| refs.reserve(use_input_sequence ? sorted_sequence->size() : getTupleStorageSubBlock().numTuples()); |
| |
| // TODO(shoban): Refer the TODO in sortColumnHelperValueAccessor() for |
| // optimizing for NULL checking. |
| ValueAccessor *all_accessor = tuple_store_->createValueAccessor(nullptr); |
| InvokeOnValueAccessorNotAdapter( |
| all_accessor, |
| [&](auto *all_accessor) -> void { // NOLINT(build/c++11) |
| if (use_input_sequence) { |
| auto *seq_value_accessor = new OrderedTupleIdSequenceAdapterValueAccessor< |
| typename std::remove_reference<decltype(*all_accessor)>::type>( |
| all_accessor, *sorted_sequence); |
| accessor.reset(seq_value_accessor); |
| seq_value_accessor->beginIteration(); |
| while (seq_value_accessor->next()) { |
| const void *value = seq_value_accessor->getUntypedValue(sort_attr_id); |
| if (value) { |
| refs.emplace_back(value, seq_value_accessor->getCurrentPosition()); |
| } else { |
| nulls.emplace_back(seq_value_accessor->getCurrentPosition()); |
| } |
| } |
| } else { |
| accessor.reset(all_accessor); |
| while (all_accessor->next()) { |
| const void *value = all_accessor->getUntypedValue(sort_attr_id); |
| if (value) { |
| refs.emplace_back(value, all_accessor->getCurrentPosition()); |
| } else { |
| nulls.emplace_back(all_accessor->getCurrentPosition()); |
| } |
| } |
| } |
| }); |
| |
| if (use_input_sequence) { |
| if (sort_is_ascending) { |
| StableSortWrappedValues<SortReference, vector<SortReference>::iterator>( |
| sort_attr_type, refs.begin(), refs.end()); |
| } else { |
| // Use reverse iterators to sort in descending order. |
| StableSortWrappedValues<SortReference, vector<SortReference>::reverse_iterator>( |
| sort_attr_type, refs.rbegin(), refs.rend()); |
| } |
| } else { |
| if (sort_is_ascending) { |
| SortWrappedValues<SortReference, vector<SortReference>::iterator>( |
| sort_attr_type, refs.begin(), refs.end()); |
| } else { |
| // Use reverse iterators to sort in descending order. |
| SortWrappedValues<SortReference, vector<SortReference>::reverse_iterator>( |
| sort_attr_type, refs.rbegin(), refs.rend()); |
| } |
| } |
| |
| sorted_sequence->clear(); |
| if (null_first) { |
| sorted_sequence->insert(sorted_sequence->end(), nulls.begin(), nulls.end()); |
| } |
| for (const SortReference &ref : refs) { |
| sorted_sequence->emplace_back(ref.getTupleID()); |
| } |
| if (!null_first) { |
| sorted_sequence->insert(sorted_sequence->end(), nulls.begin(), nulls.end()); |
| } |
| } |
| |
| void StorageBlock::sort(const PtrVector<Scalar> &order_by, // NOLINT(build/include_what_you_use) |
| const std::vector<bool> &sort_is_ascending, |
| const std::vector<bool> &null_first, |
| OrderedTupleIdSequence *sorted_sequence, |
| InsertDestinationInterface *output_destination) const { |
| // TODO(shoban): We, currently, use a scheme where we stable sort the |
| // tuple-id-sequence from last to first columns in ORDER BY clause to produce |
| // the final sorted sequence. We could on the other hand sort from first to |
| // last columns in ORDER BY clause, where the subsequent column's sort can be |
| // broken down into sort of smaller ranges of tuple-id-sequences for which the |
| // previous columns have the same value. This can have several pros and cons |
| // the method used. Average-case asymptotics is definitely better in the |
| // later. Need to do an analysis of the two methods. |
| |
| DEBUG_ASSERT(order_by.size() == sort_is_ascending.size()); |
| DEBUG_ASSERT(order_by.size() == null_first.size()); |
| DEBUG_ASSERT(order_by.size() > 0); |
| |
| // TODO(shoban): We should use reverse_iterator in conjunction with rbegin() |
| // and rend() for better readability, if PtrVector supports it. |
| PtrVector<Scalar>::const_iterator order_it = order_by.end(); |
| std::vector<bool>::const_iterator sort_is_ascending_it = sort_is_ascending.end(); |
| std::vector<bool>::const_iterator null_first_it = null_first.end(); |
| --order_it; |
| --sort_is_ascending_it; |
| --null_first_it; |
| |
| // Reserve capacity once. std::vector::clear() does not change capacity. |
| sorted_sequence->reserve(getTupleStorageSubBlock().numTuples()); |
| |
| // Sort the last column based on TupleIdSequence. |
| // We can use regular sort on the last column, since there no inherent order |
| // prior to this. std::sort() is an in-place impementation, whereas |
| // std::stable_sort() uses extra memory. Hence, std::sort() is slight better |
| // than std::stable_sort(). |
| sortColumn(false, *order_it, *sort_is_ascending_it, *null_first_it, sorted_sequence); |
| |
| while (order_it != order_by.begin()) { |
| --order_it; |
| --sort_is_ascending_it; |
| --null_first_it; |
| |
| // Sort the other columns in reverse order based on OrderedTupleIdSequence |
| // to maintain the previous sorted order. |
| sortColumn(true, *order_it, *sort_is_ascending_it, *null_first_it, sorted_sequence); |
| } |
| |
| // Write to output destination. |
| if (output_destination) { |
| std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(nullptr)); |
| std::unique_ptr<ValueAccessor> ordered_accessor; |
| InvokeOnValueAccessorNotAdapter( |
| accessor.get(), |
| [&](auto *accessor) -> void { // NOLINT(build/c++11) |
| ordered_accessor.reset( |
| accessor->createSharedOrderedTupleIdSequenceAdapter(*sorted_sequence)); |
| }); |
| |
| // TODO(quickstep-team): This might depending on the block layouts write one |
| // or more sorted blocks for each input blocks. It would be useful to |
| // remember the list of sorted blocks as an initial run for the merge phase |
| // of sorting. |
| output_destination->bulkInsertTuples(ordered_accessor.get(), true); |
| } |
| } |
| |
| void StorageBlock::deleteTuples(const Predicate *predicate) { |
| std::unique_ptr<TupleIdSequence> matches(getMatchesForPredicate(predicate)); |
| |
| if (!matches->empty()) { |
| // First, remove entries from indices that support ad-hoc removal. |
| bool rebuild_some_indices = false; |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); it != indices_.end(); ++it) { |
| if (it->supportsAdHocRemove()) { |
| it->bulkRemoveEntries(*matches); |
| } else { |
| rebuild_some_indices = true; |
| } |
| } |
| |
| // Delete tuples from the TupleStorageSubBlock. |
| if (tuple_store_->bulkDeleteTuples(matches.get())) { |
| // If the tuple-ID sequence was mutated, rebuild all indices. |
| if (!rebuildIndexes(true)) { |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } else if (rebuild_some_indices) { |
| // Rebuild any remaining indices that don't support ad-hoc removal. |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); it != indices_.end(); ++it) { |
| if (!it->supportsAdHocRemove()) { |
| if (!it->rebuild()) { |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } |
| } |
| } |
| |
| dirty_ = true; |
| } |
| } |
| |
| TupleStorageSubBlock* StorageBlock::CreateTupleStorageSubBlock( |
| const CatalogRelationSchema &relation, |
| const TupleStorageSubBlockDescription &description, |
| const bool new_block, |
| void *sub_block_memory, |
| const std::size_t sub_block_memory_size) { |
| DEBUG_ASSERT(description.IsInitialized()); |
| switch (description.sub_block_type()) { |
| case TupleStorageSubBlockDescription::PACKED_ROW_STORE: |
| return new PackedRowStoreTupleStorageSubBlock(relation, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| case TupleStorageSubBlockDescription::BASIC_COLUMN_STORE: |
| return new BasicColumnStoreTupleStorageSubBlock(relation, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| case TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE: |
| return new CompressedPackedRowStoreTupleStorageSubBlock(relation, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| case TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE: |
| return new CompressedColumnStoreTupleStorageSubBlock(relation, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| case TupleStorageSubBlockDescription::SPLIT_ROW_STORE: |
| return new SplitRowStoreTupleStorageSubBlock(relation, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| default: |
| if (new_block) { |
| FATAL_ERROR("A StorageBlockLayout provided an unknown TupleStorageSubBlockType."); |
| } else { |
| throw MalformedBlock(); |
| } |
| } |
| } |
| |
| IndexSubBlock* StorageBlock::CreateIndexSubBlock( |
| const TupleStorageSubBlock &tuple_store, |
| const IndexSubBlockDescription &description, |
| const bool new_block, |
| void *sub_block_memory, |
| const std::size_t sub_block_memory_size) { |
| DEBUG_ASSERT(description.IsInitialized()); |
| switch (description.sub_block_type()) { |
| case IndexSubBlockDescription::CSB_TREE: |
| return new CSBTreeIndexSubBlock(tuple_store, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| case IndexSubBlockDescription::SMA: |
| return new SMAIndexSubBlock(tuple_store, |
| description, |
| new_block, |
| sub_block_memory, |
| sub_block_memory_size); |
| default: |
| if (new_block) { |
| FATAL_ERROR("A StorageBlockLayout provided an unknown IndexBlockType."); |
| } else { |
| throw MalformedBlock(); |
| } |
| } |
| } |
| |
| bool StorageBlock::insertEntryInIndexes(const tuple_id new_tuple) { |
| DEBUG_ASSERT(ad_hoc_insert_supported_); |
| DEBUG_ASSERT(new_tuple >= 0); |
| DEBUG_ASSERT(all_indices_consistent_); |
| |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); |
| it != indices_.end(); |
| ++it) { |
| bool entry_added; |
| if (it->supportsAdHocAdd()) { |
| entry_added = it->addEntry(new_tuple); |
| } else { |
| entry_added = it->rebuild(); |
| } |
| if (!entry_added) { |
| // Roll back if index is full. |
| // |
| // TODO(chasseur): What about fragmented indexes, where rebuilding could allow success? |
| bool rebuild_some_indices = false; |
| for (PtrVector<IndexSubBlock>::iterator fixer_it = indices_.begin(); |
| fixer_it != it; |
| ++fixer_it) { |
| // Do ad-hoc removal for those indices which support it. Those that |
| // don't are rebuilt after the entry is removed from the tuple_store_ |
| // below. |
| if (fixer_it->supportsAdHocRemove()) { |
| fixer_it->removeEntry(new_tuple); |
| } else { |
| rebuild_some_indices = true; |
| } |
| } |
| |
| if (tuple_store_->deleteTuple(new_tuple)) { |
| // The tuple-ID sequence was mutated, so rebuild all indices. |
| if (!rebuildIndexes(true)) { |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } else if (rebuild_some_indices) { |
| // Rebuild those indices that were modified that don't support ad-hoc |
| // removal. |
| for (PtrVector<IndexSubBlock>::iterator fixer_it = indices_.begin(); |
| fixer_it != it; |
| ++fixer_it) { |
| if (!fixer_it->supportsAdHocRemove()) { |
| if (!fixer_it->rebuild()) { |
| // It should always be possible to rebuild an index with the |
| // tuples which it originally contained. |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| bool StorageBlock::bulkInsertEntriesInIndexes(TupleIdSequence *new_tuples, |
| const bool roll_back_on_failure) { |
| DEBUG_ASSERT(ad_hoc_insert_supported_); |
| DEBUG_ASSERT(all_indices_consistent_); |
| |
| // If 'roll_back_on_failure' is false, we will allow some indices to become |
| // inconsistent. |
| if ((!indices_.empty()) && (!roll_back_on_failure)) { |
| all_indices_inconsistent_ = true; |
| } |
| |
| int index_num = 0; |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); |
| it != indices_.end(); |
| ++it, ++index_num) { |
| bool entries_added; |
| if (it->supportsAdHocAdd()) { |
| entries_added = it->bulkAddEntries(*new_tuples); |
| } else { |
| entries_added = it->rebuild(); |
| } |
| if (!entries_added) { |
| if (roll_back_on_failure) { |
| // Roll back if index full. |
| bool rebuild_some_indices = false; |
| for (PtrVector<IndexSubBlock>::iterator fixer_it = indices_.begin(); |
| fixer_it != it; |
| ++fixer_it) { |
| // Do ad-hoc removal for those indices which support it. Those that |
| // don't are rebuilt after the entries are removed from the |
| // tuple_store_ below. |
| if (fixer_it->supportsAdHocRemove()) { |
| fixer_it->bulkRemoveEntries(*new_tuples); |
| } else { |
| rebuild_some_indices = true; |
| } |
| } |
| |
| if (tuple_store_->bulkDeleteTuples(new_tuples)) { |
| // The tuple-ID sequence was mutated, so rebuild all indices. |
| if (!rebuildIndexes(true)) { |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } else if (rebuild_some_indices) { |
| // Rebuild those indices that were modified that don't support ad-hoc |
| // removal. |
| for (PtrVector<IndexSubBlock>::iterator fixer_it = indices_.begin(); |
| fixer_it != it; |
| ++fixer_it) { |
| if (!fixer_it->supportsAdHocRemove()) { |
| if (!fixer_it->rebuild()) { |
| // It should always be possible to rebuild an index with the |
| // tuples which it originally contained. |
| FATAL_ERROR("Rebuilding an IndexSubBlock failed after removing tuples."); |
| } |
| } |
| } |
| } |
| |
| return false; |
| } else { |
| block_header_.set_index_consistent(index_num, false); |
| indices_consistent_[index_num] = false; |
| all_indices_consistent_ = false; |
| } |
| } else if (!roll_back_on_failure) { |
| all_indices_inconsistent_ = false; |
| } |
| } |
| |
| if (!all_indices_consistent_) { |
| updateHeader(); |
| } |
| return all_indices_consistent_; |
| } |
| |
| bool StorageBlock::rebuildIndexes(bool short_circuit) { |
| if (indices_.empty()) { |
| return true; |
| } |
| |
| all_indices_consistent_ = true; |
| all_indices_inconsistent_ = true; |
| |
| int index_num = 0; |
| for (PtrVector<IndexSubBlock>::iterator it = indices_.begin(); |
| it != indices_.end(); |
| ++it, ++index_num) { |
| if (it->rebuild()) { |
| all_indices_inconsistent_ = false; |
| indices_consistent_[index_num] = true; |
| block_header_.set_index_consistent(index_num, true); |
| } else { |
| all_indices_consistent_ = false; |
| indices_consistent_[index_num] = false; |
| block_header_.set_index_consistent(index_num, false); |
| if (short_circuit) { |
| return false; |
| } |
| } |
| } |
| updateHeader(); |
| |
| return all_indices_consistent_; |
| } |
| |
| TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const { |
| if (predicate == nullptr) { |
| return tuple_store_->getExistenceMap(); |
| } |
| |
| std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor()); |
| std::unique_ptr<TupleIdSequence> existence_map; |
| if (!tuple_store_->isPacked()) { |
| existence_map.reset(tuple_store_->getExistenceMap()); |
| } |
| SubBlocksReference sub_blocks_ref(*tuple_store_, |
| indices_, |
| indices_consistent_); |
| return predicate->getAllMatches(value_accessor.get(), |
| &sub_blocks_ref, |
| nullptr, |
| existence_map.get()); |
| } |
| |
| std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues( |
| const ValueAccessor &accessor, |
| const tuple_id tuple, |
| const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments) const { |
| std::unordered_map<attribute_id, TypedValue> *update_map |
| = new std::unordered_map<attribute_id, TypedValue>(); |
| |
| for (unordered_map<attribute_id, unique_ptr<const Scalar>>::const_iterator it = assignments.begin(); |
| it != assignments.end(); |
| ++it) { |
| pair<std::unordered_map<attribute_id, TypedValue>::iterator, bool> insert_result |
| = update_map->emplace(it->first, |
| it->second->getValueForSingleTuple(accessor, tuple)); |
| DCHECK(insert_result.second); |
| insert_result.first->second.ensureNotReference(); |
| } |
| |
| return update_map; |
| } |
| |
| AggregationState* StorageBlock::aggregateHelperColumnVector( |
| const AggregationHandle &handle, |
| const std::vector<std::unique_ptr<const Scalar>> &arguments, |
| const TupleIdSequence *matches) const { |
| if (arguments.empty()) { |
| // Special case. This is a nullary aggregate (i.e. COUNT(*)). |
| return handle.accumulateNullary(matches == nullptr ? tuple_store_->numTuples() |
| : matches->size()); |
| } else { |
| // Set up a ValueAccessor that will be used when materializing argument |
| // values below (possibly filtered based on the '*matches' to a filter |
| // predicate). |
| std::unique_ptr<ValueAccessor> accessor; |
| if (matches == nullptr) { |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } else { |
| accessor.reset(tuple_store_->createValueAccessor(matches)); |
| } |
| |
| SubBlocksReference sub_blocks_ref(*tuple_store_, |
| indices_, |
| indices_consistent_); |
| |
| // Materialize each argument's values for this block as a ColumnVector. |
| std::vector<std::unique_ptr<ColumnVector>> column_vectors; |
| for (const std::unique_ptr<const Scalar> &argument : arguments) { |
| column_vectors.emplace_back(argument->getAllValues(accessor.get(), &sub_blocks_ref)); |
| } |
| |
| // Have the AggregationHandle actually do the aggregation. |
| return handle.accumulateColumnVectors(column_vectors); |
| } |
| } |
| |
| #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION |
| AggregationState* StorageBlock::aggregateHelperValueAccessor( |
| const AggregationHandle &handle, |
| const std::vector<attribute_id> &argument_ids, |
| const TupleIdSequence *matches) const { |
| // Set up a ValueAccessor to aggregate over (possibly filtered based on the |
| // '*matches' to a filter predicate). |
| std::unique_ptr<ValueAccessor> accessor; |
| if (matches == nullptr) { |
| accessor.reset(tuple_store_->createValueAccessor()); |
| } else { |
| accessor.reset(tuple_store_->createValueAccessor(matches)); |
| } |
| |
| // Have the AggregationHandle actually do the aggregation. |
| return handle.accumulateValueAccessor( |
| accessor.get(), |
| argument_ids); |
| } |
| #endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION |
| |
| void StorageBlock::updateHeader() { |
| DEBUG_ASSERT(*static_cast<const int*>(block_memory_) == block_header_.ByteSize()); |
| |
| if (!block_header_.SerializeToArray(static_cast<char*>(block_memory_) + sizeof(int), |
| block_header_.ByteSize())) { |
| FATAL_ERROR("Failed to do binary serialization of StorageBlockHeader in StorageBlock::updateHeader()"); |
| } |
| } |
| |
| void StorageBlock::invalidateAllIndexes() { |
| if ((!indices_.empty()) && (!all_indices_inconsistent_)) { |
| for (unsigned int index_num = 0; |
| index_num < indices_.size(); |
| ++index_num) { |
| indices_consistent_[index_num] = false; |
| block_header_.set_index_consistent(index_num, false); |
| } |
| all_indices_consistent_ = false; |
| all_indices_inconsistent_ = true; |
| |
| updateHeader(); |
| } |
| } |
| |
| } // namespace quickstep |