| /** |
| * Copyright 2016, Quickstep Research Group, Computer Sciences Department, |
| * University of Wisconsin—Madison. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| #include "relational_operators/WindowAggregationOperator.hpp" |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogAttribute.hpp" |
| #include "catalog/CatalogDatabase.hpp" |
| #include "catalog/CatalogRelation.hpp" |
| #include "expressions/aggregation/AggregateFunction.hpp" |
| #include "expressions/aggregation/AggregationHandle.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "expressions/scalar/ScalarAttribute.hpp" |
| #include "expressions/scalar/ScalarBinaryExpression.hpp" |
| #include "expressions/scalar/ScalarLiteral.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/WorkOrdersContainer.hpp" |
| #include "storage/HashTableBase.hpp" |
| #include "storage/InsertDestination.hpp" |
| #include "storage/StorageBlock.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "storage/StorageManager.hpp" |
| #include "storage/TupleIdSequence.hpp" |
| #include "types/TypeFactory.hpp" |
| #include "types/IntType.hpp" |
| #include "types/NumericSuperType.hpp" |
| #include "types/Type.hpp" |
| #include "types/TypedValue.hpp" |
| #include "types/containers/ColumnVector.hpp" |
| #include "types/containers/ColumnVectorsValueAccessor.hpp" |
| #include "types/operations/binary_operations/BinaryOperation.hpp" |
| #include "types/operations/binary_operations/BinaryOperationFactory.hpp" |
| #include "types/operations/binary_operations/BinaryOperationID.hpp" |
| #include "utility/HashPair.hpp" |
| |
| #include "glog/logging.h" |
| |
| namespace quickstep { |
| |
| WindowAggregationOperator::WindowAggregationOperator( |
| const CatalogRelation &input_relation, |
| bool input_relation_is_stored, |
| const std::vector<const AggregateFunction *> &aggregate_functions, |
| std::vector<std::vector<std::unique_ptr<const Scalar>>> |
| &&aggregate_arguments, |
| std::vector<std::unique_ptr<const Scalar>> &&group_by, |
| const CatalogAttribute &window_attribute, |
| const TypedValue &window_duration, |
| std::int32_t age_duration, |
| const QueryContext::insert_destination_id output_dest_id, |
| const relation_id output_rel_id, |
| const serialization::HashTableImplType hash_table_impl_type, |
| StorageManager *storage_manager) |
| : input_relation_(input_relation), |
| input_relation_is_stored_(input_relation_is_stored), |
| input_relation_block_ids_(input_relation.getBlocksSnapshot()), |
| input_write_threshold_(input_relation_block_ids_.size() > 0 |
| ? input_relation_block_ids_.size() - 1 |
| : 0), |
| output_dest_id_(output_dest_id), |
| output_rel_id_(output_rel_id), |
| state_(window_attribute, std::move(group_by), window_duration, age_duration) { |
| DCHECK(!aggregate_functions.empty()); |
| DCHECK_EQ(aggregate_functions.size(), aggregate_arguments.size()); |
| |
| state_.arguments = std::move(aggregate_arguments); |
| |
| // Window attribute scalar expression to find the tumbling point. |
| state_.bucket_key.emplace_back(new ScalarBinaryExpression( |
| BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide), |
| new ScalarAttribute(state_.window_attribute), |
| new ScalarLiteral( |
| state_.window_duration, |
| TypeFactory::GetType(state_.window_duration.getTypeID())))); |
| |
| // Hashtable key types. |
| std::vector<const Type *> key_types; |
| for (const auto &key : state_.bucket_key) { |
| key_types.push_back(&key->getType()); |
| } |
| |
| // Operation to recreate the window attribute from the value in the hash |
| // table. |
| state_.rev_bucket_key.reset( |
| BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kMultiply) |
| .makeUncheckedBinaryOperatorForTypes( |
| *key_types.front(), IntType::InstanceNonNullable())); |
| |
| // Initialize aggregation handles and hash tables. |
| for (std::size_t i = 0; i < aggregate_functions.size(); ++i) { |
| std::vector<const Type *> agg_args; |
| for (const auto &arg : state_.arguments[i]) { |
| agg_args.push_back(&arg->getType()); |
| } |
| HashTableImplType hash_table_type; |
| if(hash_table_impl_type==serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING) |
| { |
| hash_table_type=HashTableImplType::kLinearOpenAddressing; |
| } |
| else if(hash_table_impl_type==serialization::HashTableImplType::SEPARATE_CHAINING) |
| { |
| hash_table_type=HashTableImplType::kSeparateChaining; |
| } |
| else if(hash_table_impl_type==serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING ) |
| { |
| hash_table_type=HashTableImplType::kSimpleScalarSeparateChaining; |
| } |
| DCHECK(aggregate_functions[i]->canApplyToTypes(agg_args)); |
| state_.handles.emplace_back(aggregate_functions[i]->createHandle(agg_args)); |
| state_.hashtables.emplace_back(state_.handles.back()->createGroupByHashTable( |
| hash_table_type, key_types, 100, storage_manager)); |
| } |
| } |
| |
| bool WindowAggregationOperator::getAllWorkOrders( |
| WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager, |
| const tmb::client_id foreman_client_id, |
| tmb::MessageBus *bus) { |
| if (input_relation_is_stored_) { |
| if (!started_) { |
| for (std::size_t i = 0; i < input_relation_block_ids_.size(); ++i) { |
| container->addNormalWorkOrder( |
| new WindowAggregationWorkOrder(input_relation_block_ids_[i], |
| input_relation_.getID(), |
| output_dest_id_, |
| &state_, |
| storage_manager, |
| query_context |
| ), |
| op_index_); |
| } |
| started_ = true; |
| } |
| return started_; |
| } else { |
| input_relation_block_ids_=input_relation_.getBlocksSnapshot(); |
| while (num_workorders_generated_ < input_relation_block_ids_.size()) { |
| container->addNormalWorkOrder( |
| new WindowAggregationWorkOrder( |
| input_relation_block_ids_[num_workorders_generated_], |
| input_relation_.getID(), |
| output_dest_id_, |
| &state_, |
| storage_manager, |
| query_context |
| ), |
| op_index_); |
| ++num_workorders_generated_; |
| } |
| return done_feeding_input_relation_; |
| } |
| } |
| |
| namespace window_vector{ |
| |
| struct VecTypedValueHash { |
| inline std::size_t operator() (const std::vector<TypedValue> &value) const { |
| std::size_t hash = 0; |
| for (const auto &v : value) { |
| hash = CombineHashes(hash, v.getHash()); |
| } |
| return hash; |
| } |
| }; |
| |
| struct VecTypedValueEqualTo { |
| // NOTE: TypedValues here are always the same type and never null. |
| |
| inline bool operator() (const std::vector<TypedValue> &left, const std::vector<TypedValue> &right) const{ |
| bool equal = true; |
| for (std::size_t i = 0; i < left.size(); ++i) { |
| equal = equal && left[i].fastEqualCheck(right[i]); |
| } |
| return equal; |
| } |
| }; |
| |
| } // namespace |
| |
| void WindowAggregationWorkOrder::execute() { |
| VLOG(3) << "WindowAggregationWorkOrder::execute() called."; |
| DCHECK(query_context != nullptr); |
| //DCHECK(database != nullptr); |
| DCHECK(storage_manager != nullptr); |
| |
| InsertDestination *output_dest = query_context->getInsertDestination(output_dest_id_); |
| DCHECK(output_dest != nullptr); |
| //TODO |
| BlockReference block(storage_manager->getBlock( |
| input_block_id_, output_dest->getRelation())); |
| |
| std::unique_ptr<TupleIdSequence> reuse_matches; |
| std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; |
| |
| // Aggregate the block. |
| for (std::size_t i = 0; i < state_->handles.size(); i++) { |
| block->aggregateGroupBy(*state_->handles[i], |
| state_->arguments[i], |
| state_->bucket_key, |
| nullptr, |
| state_->hashtables[i].get(), |
| &reuse_matches, |
| &reuse_group_by_vectors); |
| } |
| |
| // Bye-pass writing changes, if not required. |
| if (write_mode_ == kNoBuckets) { |
| return; |
| } |
| |
| // Find changed buckets, and create a set of buckets. |
| DCHECK(reuse_group_by_vectors.back()->isNative()); |
| std::unordered_set<std::vector<TypedValue>, |
| window_vector::VecTypedValueHash, |
| window_vector::VecTypedValueEqualTo> changed_buckets; |
| std::size_t num_tuples = static_cast<const NativeColumnVector *>( |
| reuse_group_by_vectors.back().get()) |
| ->size(); |
| std::size_t num_cols = reuse_group_by_vectors.size(); |
| for (std::size_t row = 0; row < num_tuples; ++row) { |
| std::vector<TypedValue> tuple_key; |
| tuple_key.reserve(num_cols); |
| for (std::size_t col = 0; col < num_cols; ++col) { |
| if (reuse_group_by_vectors[col]->isNative()) { |
| const auto &cv = static_cast<const NativeColumnVector &>( |
| *reuse_group_by_vectors[col]); |
| tuple_key.emplace_back(cv.getTypedValue(row)); |
| } else { |
| const auto &cv = static_cast<const IndirectColumnVector &>( |
| *reuse_group_by_vectors[col]); |
| tuple_key.emplace_back(cv.getTypedValue(row)); |
| } |
| } |
| changed_buckets.emplace(std::move(tuple_key)); |
| } |
| |
| // Construct vector of changed buckets for aggregate values. |
| std::vector<std::vector<TypedValue>> changed_buckets_vec; |
| changed_buckets_vec.reserve(changed_buckets.size()); |
| for (const auto &v : changed_buckets) { |
| changed_buckets_vec.emplace_back(v); |
| } |
| |
| // Construct the aggregate values. |
| std::vector<std::unique_ptr<ColumnVector>> agg_columns; |
| for (std::size_t agg = 0; agg < state_->handles.size(); ++agg) { |
| agg_columns.emplace_back(state_->handles[agg]->finalizeHashTable( |
| *state_->hashtables[agg], &changed_buckets_vec)); |
| } |
| |
| // Construct the window values. |
| std::unique_ptr<ColumnVector> window_column; |
| std::unique_ptr<NativeColumnVector> pre_window_column(new NativeColumnVector( |
| state_->bucket_key.back()->getType(), changed_buckets_vec.size())); |
| for (auto &key : changed_buckets_vec) { |
| pre_window_column->appendTypedValue(std::move(key.back())); |
| } |
| window_column.reset( |
| state_->rev_bucket_key->applyToColumnVectorAndStaticValue( |
| *pre_window_column, state_->window_duration)); |
| |
| // Construct the partition by values. |
| std::vector<std::unique_ptr<ColumnVector>> grp_columns; |
| for (std::size_t col = 0; col < state_->bucket_key.size() - 1; ++col) { |
| if (state_->bucket_key[col]->getType().getTypeID() == TypeID::kVarChar) { |
| std::unique_ptr<IndirectColumnVector> group_by_col(new IndirectColumnVector( |
| state_->bucket_key[col]->getType(), changed_buckets_vec.size())); |
| for (auto &key : changed_buckets_vec) { |
| group_by_col->appendTypedValue(std::move(key[col])); |
| } |
| grp_columns.emplace_back(group_by_col.release()); |
| } else { |
| std::unique_ptr<NativeColumnVector> group_by_col(new NativeColumnVector( |
| state_->bucket_key[col]->getType(), changed_buckets_vec.size())); |
| for (auto &key : changed_buckets_vec) { |
| group_by_col->appendTypedValue(std::move(key[col])); |
| } |
| grp_columns.emplace_back(group_by_col.release()); |
| } |
| } |
| |
| // Write output. |
| ColumnVectorsValueAccessor result; |
| result.addColumn(window_column.release()); |
| for (auto &col : grp_columns) { |
| result.addColumn(col.release()); |
| } |
| for (auto &col : agg_columns) { |
| result.addColumn(col.release()); |
| } |
| output_dest->bulkInsertTuples(&result, true); |
| } |
| |
| } // namespace quickstep |