Assemble things together for the DISTINCT aggregation feature.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 7cb6048..f9d7d98 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1209,6 +1209,9 @@
unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
}
+
+ // Set whether it is a DISTINCT aggregation.
+ aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
}
std::vector<const Type*> group_by_types;
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 3191a28..d70ea4c 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -23,16 +23,21 @@
"${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
"${CMAKE_CURRENT_BINARY_DIR}/Delete.test"
"${CMAKE_CURRENT_BINARY_DIR}/Delete/")
+add_test(quickstep_queryoptimizer_tests_executiongenerator_distinct
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Distinct.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Distinct/")
add_test(quickstep_queryoptimizer_tests_executiongenerator_drop
"../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
"${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
"${CMAKE_CURRENT_BINARY_DIR}/Drop.test"
"${CMAKE_CURRENT_BINARY_DIR}/Drop/")
- add_test(quickstep_queryoptimizer_tests_executiongenerator_index
- "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
- "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
- "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
- "${CMAKE_CURRENT_BINARY_DIR}/Index/")
+add_test(quickstep_queryoptimizer_tests_executiongenerator_index
+ "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+ "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
+ "${CMAKE_CURRENT_BINARY_DIR}/Index/")
add_test(quickstep_queryoptimizer_tests_executiongenerator_insert
"../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
"${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
@@ -63,6 +68,7 @@
# duration of their test.
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
diff --git a/query_optimizer/tests/execution_generator/Distinct.test b/query_optimizer/tests/execution_generator/Distinct.test
new file mode 100644
index 0000000..feab898
--- /dev/null
+++ b/query_optimizer/tests/execution_generator/Distinct.test
@@ -0,0 +1,70 @@
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# University of Wisconsin—Madison.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CREATE TABLE foo(x INT, y DOUBLE, z INT);
+
+INSERT INTO foo
+SELECT i,
+ (i + 0.5) % 100,
+ i % 3
+FROM generate_series(0, 29999) AS gs(i);
+
+
+SELECT COUNT(*),
+ COUNT(DISTINCT x),
+ COUNT(DISTINCT y),
+ COUNT(DISTINCT z)
+FROM foo;
+--
++--------------------+--------------------+--------------------+--------------------+
+|COUNT(*) |COUNT(DISTINCT x) |COUNT(DISTINCT y) |COUNT(DISTINCT z) |
++--------------------+--------------------+--------------------+--------------------+
+| 30000| 30000| 100| 3|
++--------------------+--------------------+--------------------+--------------------+
+==
+
+SELECT SUM(y),
+ SUM(DISTINCT y),
+ COUNT(DISTINCT y),
+ AVG(DISTINCT y),
+ z
+FROM foo
+GROUP BY z
+ORDER BY z;
+--
++------------------------+------------------------+--------------------+------------------------+-----------+
+|SUM(y) |SUM(DISTINCT y) |COUNT(DISTINCT y) |AVG(DISTINCT y) |z |
++------------------------+------------------------+--------------------+------------------------+-----------+
+| 500000| 5000| 100| 50| 0|
+| 500000| 5000| 100| 50| 1|
+| 500000| 5000| 100| 50| 2|
++------------------------+------------------------+--------------------+------------------------+-----------+
+==
+
+SELECT MAX(x) * SUM(DISTINCT y),
+ COUNT(DISTINCT x % y) + z,
+ z
+FROM foo
+GROUP BY z
+ORDER BY z;
+--
++------------------------+-------------------------+-----------+
+|(MAX(x)*SUM(DISTINCT y))|(COUNT(DISTINCT (x%y))+z)|z |
++------------------------+-------------------------+-----------+
+| 149985000| 196| 0|
+| 149990000| 197| 1|
+| 149995000| 195| 2|
++------------------------+-------------------------+-----------+
+==
diff --git a/query_optimizer/tests/logical_generator/Select.test b/query_optimizer/tests/logical_generator/Select.test
index 2129de1..2709ce3 100644
--- a/query_optimizer/tests/logical_generator/Select.test
+++ b/query_optimizer/tests/logical_generator/Select.test
@@ -596,3 +596,60 @@
+-output_attributes=
+-AttributeReference[id=7,name=,alias=(int_col+2),relation=subquery,
type=Int NULL]
+==
+
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | | +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | | | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=SUM]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | | +-Add
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | type=Double NULL]
+| | | +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | | type=Double NULL]
+| | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | type=Double NULL]
+| | +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | type=Double NULL]
+| +-project_list=
+| +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+| | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+| | type=Long]
+| +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+ | type=Long]
+ +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+ type=Double NULL]
diff --git a/query_optimizer/tests/physical_generator/Select.test b/query_optimizer/tests/physical_generator/Select.test
index 69911ec..d653a9b 100644
--- a/query_optimizer/tests/physical_generator/Select.test
+++ b/query_optimizer/tests/physical_generator/Select.test
@@ -1586,3 +1586,108 @@
| +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+-output_attributes=
+-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+==
+
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | | +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | | | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=SUM]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | | +-Add
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | type=Double NULL]
+| | | +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | | type=Double NULL]
+| | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | type=Double NULL]
+| | +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | type=Double NULL]
+| +-project_list=
+| +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+| | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+| | type=Long]
+| +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+ | type=Long]
+ +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+ type=Double NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-aggregate_expressions=
+| | +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Double NULL]
+| | | +-AggregateFunction[function=SUM]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Double NULL]
+| | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | +-Add
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | type=Double NULL]
+| | +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Double NULL]
+| | +-AggregateFunction[function=AVG,is_distinct=true]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-filter_predicate=Greater
+| | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | type=Double NULL]
+| | +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | type=Double NULL]
+| +-project_expressions=
+| +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+| | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+| | type=Long]
+| +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+ | type=Long]
+ +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+ type=Double NULL]
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index e7cc386..1290d5e 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -2270,6 +2270,63 @@
+-AttributeReference[id=1,name=date_digits,relation=,type=Long]
==
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | | +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | | | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=SUM]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | | type=Double NULL]
+| | | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | | +-Add
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | type=Double NULL]
+| | | +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | | type=Double NULL]
+| | | +-AggregateFunction[function=AVG,is_distinct=true]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | type=Double NULL]
+| | +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | type=Double NULL]
+| +-project_list=
+| +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+| | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+| | type=Long]
+| +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+ | type=Long]
+ +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+ type=Double NULL]
+==
+
select interval '4 day' + interval '5 year'
from test
--
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 40f72cc..b362f89 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -212,6 +212,7 @@
// Add an aggregate.
serialization::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+ aggr_proto->set_is_distinct(false);
if (is_expression) {
unique_ptr<ScalarBinaryExpression> exp(
new ScalarBinaryExpression(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd),
@@ -226,6 +227,7 @@
// Add another aggregate.
aggr_proto = aggr_state_proto->add_aggregates();
aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+ aggr_proto->set_is_distinct(false);
if (is_expression) {
unique_ptr<ScalarBinaryExpression> exp(
new ScalarBinaryExpression(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kMultiply),
@@ -299,6 +301,7 @@
// Add an aggregate.
serialization::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+ aggr_proto->set_is_distinct(false);
unique_ptr<ScalarAttribute> attr(new ScalarAttribute(*table_->getAttributeByName(stem + "-0")));
aggr_proto->add_argument()->CopyFrom(attr->getProto());
@@ -306,6 +309,7 @@
// Add another aggregate.
aggr_proto = aggr_state_proto->add_aggregates();
aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+ aggr_proto->set_is_distinct(false);
attr.reset(new ScalarAttribute(*table_->getAttributeByName(stem + "-1")));
aggr_proto->add_argument()->CopyFrom(attr->getProto());
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index a129cc7..a3a669c 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,6 +59,7 @@
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction*> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+ std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
@@ -68,6 +69,7 @@
predicate_(predicate),
group_by_list_(std::move(group_by)),
arguments_(std::move(arguments)),
+ is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
// Sanity checks: each aggregate has a corresponding list of arguments.
DCHECK(aggregate_functions.size() == arguments_.size());
@@ -85,6 +87,7 @@
handles_.emplace_back(new AggregationHandleDistinct());
arguments_.push_back({});
+ is_distinct_.emplace_back(false);
group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
hash_table_impl_type,
@@ -97,7 +100,8 @@
= aggregate_functions.begin();
std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it
= arguments_.begin();
- for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it) {
+ std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
+ for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
// AggregationHandle.
std::vector<const Type*> argument_types;
@@ -145,6 +149,25 @@
arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
#endif
}
+
+ // Initialize the corresponding distinctify hash table if this is a DISTINCT
+ // aggregation.
+ if (*is_distinct_it) {
+ std::vector<const Type*> key_types(group_by_types);
+ key_types.insert(key_types.end(), argument_types.begin(), argument_types.end());
+ // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating
+ // the number of entries in the distinctify hash table. We may estimate
+ // for each distinct aggregation an estimated_num_distinct_keys value during
+ // query optimization, if it worths.
+ distinctify_hashtables_.emplace_back(
+ handles_.back()->createDistinctifyHashTable(
+ hash_table_impl_type,
+ key_types,
+ estimated_num_entries,
+ storage_manager));
+ } else {
+ distinctify_hashtables_.emplace_back(nullptr);
+ }
}
}
}
@@ -158,6 +181,7 @@
// Rebuild contructor arguments from their representation in 'proto'.
std::vector<const AggregateFunction*> aggregate_functions;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
+ std::vector<bool> is_distinct;
for (int agg_idx = 0; agg_idx < proto.aggregates_size(); ++agg_idx) {
const serialization::Aggregate &agg_proto = proto.aggregates(agg_idx);
@@ -171,6 +195,8 @@
agg_proto.argument(argument_idx),
database));
}
+
+ is_distinct.emplace_back(agg_proto.is_distinct());
}
std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
@@ -192,6 +218,7 @@
return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
aggregate_functions,
std::move(arguments),
+ std::move(is_distinct),
std::move(group_by_expressions),
predicate.release(),
proto.estimated_num_entries(),
@@ -256,7 +283,7 @@
}
}
-void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) {
if (group_by_list_.empty()) {
finalizeSingleState(output_destination);
} else {
@@ -268,8 +295,10 @@
const std::vector<std::unique_ptr<AggregationState>> &local_state) {
DEBUG_ASSERT(local_state.size() == single_states_.size());
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- handles_[agg_idx]->mergeStates(*local_state[agg_idx],
- single_states_[agg_idx].get());
+ if (!is_distinct_[agg_idx]) {
+ handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+ single_states_[agg_idx].get());
+ }
}
}
@@ -293,13 +322,28 @@
local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
}
#endif
- // Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(
- block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- predicate_.get(),
- &reuse_matches));
+ if (is_distinct_[agg_idx]) {
+ // Call StorageBlock::aggregateDistinct() to put the arguments as keys
+ // directly into the (threadsafe) shared global distinctify HashTable
+ // for this aggregate.
+ block->aggregateDistinct(*handles_[agg_idx],
+ arguments_[agg_idx],
+ local_arguments_as_attributes,
+ {}, /* group_by */
+ predicate_.get(),
+ distinctify_hashtables_[agg_idx].get(),
+ &reuse_matches,
+ nullptr /* reuse_group_by_vectors */);
+ local_state.emplace_back(nullptr);
+ } else {
+ // Call StorageBlock::aggregate() to actually do the aggregation.
+ local_state.emplace_back(
+ block->aggregate(*handles_[agg_idx],
+ arguments_[agg_idx],
+ local_arguments_as_attributes,
+ predicate_.get(),
+ &reuse_matches));
+ }
}
// Merge per-block aggregation states back with global state.
@@ -322,24 +366,38 @@
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- //
- // TODO(shoban): Implement optional code path for using local hash table per
- // block, which can be merged with global hash table for all blocks
- // aggregated on.
- block->aggregateGroupBy(*handles_[agg_idx],
- arguments_[agg_idx],
- group_by_list_,
- predicate_.get(),
- group_by_hashtables_[agg_idx].get(),
- &reuse_matches,
- &reuse_group_by_vectors);
+ if (is_distinct_[agg_idx]) {
+ // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
+ // values and the aggregation arguments together as keys directly into the
+ // (threadsafe) shared global distinctify HashTable for this aggregate.
+ block->aggregateDistinct(*handles_[agg_idx],
+ arguments_[agg_idx],
+ nullptr, /* arguments_as_attributes */
+ group_by_list_,
+ predicate_.get(),
+ distinctify_hashtables_[agg_idx].get(),
+ &reuse_matches,
+ &reuse_group_by_vectors);
+ } else {
+ // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+ // directly into the (threadsafe) shared global HashTable for this
+ // aggregate.
+ //
+ // TODO(shoban): Implement optional code path for using local hash table per
+ // block, which can be merged with global hash table for all blocks
+ // aggregated on.
+ block->aggregateGroupBy(*handles_[agg_idx],
+ arguments_[agg_idx],
+ group_by_list_,
+ predicate_.get(),
+ group_by_hashtables_[agg_idx].get(),
+ &reuse_matches,
+ &reuse_group_by_vectors);
+ }
}
}
-void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
// Simply build up a Tuple from the finalized values for each aggregate and
// insert it in '*output_destination'.
std::vector<TypedValue> attribute_values;
@@ -347,13 +405,18 @@
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
+ if (is_distinct_[agg_idx]) {
+ single_states_[agg_idx].reset(
+ handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+ }
+
attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
@@ -363,11 +426,17 @@
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
- ColumnVector* col =
+ if (is_distinct_[agg_idx]) {
+ handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
+ *distinctify_hashtables_[agg_idx],
+ group_by_hashtables_[agg_idx].get());
+ }
+
+ ColumnVector* agg_result_col =
handles_[agg_idx]->finalizeHashTable(*group_by_hashtables_[agg_idx],
&group_by_keys);
- if (col != nullptr) {
- final_values.emplace_back(col);
+ if (agg_result_col != nullptr) {
+ final_values.emplace_back(agg_result_col);
}
}
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 565d1f8..b883ed1 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
+#include <cstddef>
#include <memory>
#include <vector>
@@ -79,6 +80,8 @@
* @param arguments For each entry in aggregate_functions, a corresponding
* list of argument expressions to that aggregate. This is moved-from,
* with AggregationOperationState taking ownership.
+ * @param is_distinct For each entry in aggregate_functions, whether DISTINCT
+ * should be applied to the entry's arguments.
* @param group_by A list of expressions to compute the GROUP BY values. If
* empty, no grouping is used. This is moved-from, with
* AggregationOperationState taking ownership.
@@ -97,6 +100,7 @@
AggregationOperationState(const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction*> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+ std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
@@ -153,7 +157,7 @@
* @param output_destination An InsertDestination where the finalized output
* tuple(s) from this aggregate are to be written.
**/
- void finalizeAggregate(InsertDestination *output_destination) const;
+ void finalizeAggregate(InsertDestination *output_destination);
private:
// Merge locally (per storage block) aggregated states with global aggregation
@@ -164,8 +168,8 @@
void aggregateBlockSingleState(const block_id input_block);
void aggregateBlockHashTable(const block_id input_block);
- void finalizeSingleState(InsertDestination *output_destination) const;
- void finalizeHashTable(InsertDestination *output_destination) const;
+ void finalizeSingleState(InsertDestination *output_destination);
+ void finalizeHashTable(InsertDestination *output_destination);
// Common state for all aggregates in this operation: the input relation, the
// filter predicate (if any), and the list of GROUP BY expressions (if any).
@@ -178,6 +182,13 @@
std::vector<std::unique_ptr<AggregationHandle>> handles_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+ // For each aggregate, whether DISTINCT should be applied to the aggregate's
+ // arguments.
+ std::vector<bool> is_distinct_;
+
+ // Hash table for obtaining distinct (i.e. unique) arguments.
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
+
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all an aggregate's argument expressions are simply attributes in
// 'input_relation_', then this caches the attribute IDs of those arguments.
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 13836a3..031f782 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -1,5 +1,5 @@
// Copyright 2011-2015 Quickstep Technologies LLC.
-// Copyright 2015 Pivotal Software, Inc.
+// Copyright 2015-2016 Pivotal Software, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
message Aggregate {
required AggregateFunction function = 1;
repeated Scalar argument = 2;
+ required bool is_distinct = 3;
}
message AggregationOperationState {
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 6fd6bb2..e9ddb70 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1,6 +1,6 @@
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -486,6 +486,95 @@
hash_table);
}
+void StorageBlock::aggregateDistinct(
+ const AggregationHandle &handle,
+ const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<attribute_id> *arguments_as_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ AggregationStateHashTableBase *distinctify_hash_table,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+ DCHECK_GT(arguments.size(), 0u)
+ << "Called aggregateDistinct() with zero argument expressions";
+ DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr));
+
+ std::vector<attribute_id> key_ids;
+
+ // An intermediate ValueAccessor that stores the materialized 'arguments' for
+ // this aggregate, as well as the GROUP BY expression values.
+ ColumnVectorsValueAccessor temp_result;
+ {
+ std::unique_ptr<ValueAccessor> accessor;
+ if (predicate) {
+ if (!*reuse_matches) {
+ // If there is a filter predicate that hasn't already been evaluated,
+ // evaluate it now and save the results for other aggregates on this
+ // same block.
+ reuse_matches->reset(getMatchesForPredicate(predicate));
+ }
+
+ // Create a filtered ValueAccessor that only iterates over predicate
+ // matches.
+ accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+ } else {
+ // Create a ValueAccessor that iterates over all tuples in this block
+ accessor.reset(tuple_store_->createValueAccessor());
+ }
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ // If all the arguments to this aggregate are plain relation attributes,
+ // aggregate directly on a ValueAccessor from this block to avoid a copy.
+ if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
+ DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
+ << "Mismatch between number of arguments and number of attribute_ids";
+ DCHECK_EQ(group_by.size(), 0u);
+ handle.insertValueAccessorIntoDistinctifyHashTable(
+ accessor.get(), *arguments_as_attributes, distinctify_hash_table);
+ return;
+ }
+#endif
+
+ SubBlocksReference sub_blocks_ref(*tuple_store_,
+ indices_,
+ indices_consistent_);
+ attribute_id attr_id = 0;
+
+ if (!group_by.empty()) {
+ // Put GROUP BY keys into 'temp_result'.
+ if (reuse_group_by_vectors->empty()) {
+ // Compute GROUP BY values from group_by Scalars, and store them in
+ // reuse_group_by_vectors for reuse by other aggregates on this same
+ // block.
+ reuse_group_by_vectors->reserve(group_by.size());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ reuse_group_by_vectors->emplace_back(
+ group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+ temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ } else {
+ // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+ DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+ << "Wrong number of reuse_group_by_vectors";
+ for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+ temp_result.addColumn(reuse_cv.get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ }
+ }
+ // Compute argument vectors and add them to 'temp_result'.
+ for (const std::unique_ptr<const Scalar> &argument : arguments) {
+ temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref));
+ key_ids.push_back(attr_id++);
+ }
+ }
+
+ handle.insertValueAccessorIntoDistinctifyHashTable(
+ &temp_result, key_ids, distinctify_hash_table);
+}
+
+
// TODO(chasseur): Vectorization for updates.
StorageBlock::UpdateResult StorageBlock::update(
const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 49536c3..da3bc70 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -1,6 +1,6 @@
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -447,6 +447,50 @@
*reuse_group_by_vectors) const;
/**
+ * @brief Inserts the GROUP BY expressions and aggregation arguments together
+ * as keys into the distinctify hash table.
+ *
+ * This is the first step for DISTINCT aggregation. It populates the distinctify
+ * hash table so that arguments are distinctified within each GROUP BY group.
+ * Later, a second-round aggregation on the distinctify hash table will be
+ * performed to actually compute the aggregated result for each GROUP BY group.
+ *
+ * @param handle Aggregation handle to compute aggregates with.
+ * @param arguments The arguments to the aggregation function as Scalars.
+ * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
+ * for each of the elements in arguments, and is used to elide a copy.
+ * Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
+ * @param group_by The list of GROUP BY attributes/expressions.
+ * @param predicate A predicate for selection. \c nullptr indicates that all
+ * tuples should be aggregated on.
+ * @param distinctify_hash_table Hash table to store the arguments and GROUP
+ * BY expressions together as hash table key and a bool constant \c true
+ * as hash table value. (So the hash table actually serves as a hash set.)
+ * @param reuse_matches This parameter is used to store and reuse tuple-id
+ * sequence of matches pre-computed in an earlier invocations of
+ * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
+ * use. Current invocation of aggregateGroupBy() will reuse
+ * TupleIdSequence if passed, otherwise computes a TupleIdSequence based
+ * on \c predicate and stores in \c reuse_matches. We use
+ * std::unique_ptr for each of use, since the caller will not have to
+ * selective free.
+ * @param reuse_group_by_vectors This parameter is used to store and reuse
+ * GROUP BY attribute vectors pre-computed in an earlier invocation of
+ * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+ * for ease of use. Current invocation of aggregateGroupBy() will reuse
+ * ColumnVectors if non-empty, otherwise computes ColumnVectors based
+ * on \c group_by and stores them in \c reuse_group_by_vectors.
+ */
+ void aggregateDistinct(const AggregationHandle &handle,
+ const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<attribute_id> *arguments_as_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ AggregationStateHashTableBase *distinctify_hash_table,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
+ /**
* @brief Perform an UPDATE query over the tuples in this StorageBlock.
* @warning In some edge cases, calling this method may cause IndexSubBlocks
* in this block to become inconsistent (the TupleStorageSubBlock