Assemble things together for the DISTINCT aggregation feature.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 7cb6048..f9d7d98 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1209,6 +1209,9 @@
       unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
       aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
     }
+
+    // Set whether it is a DISTINCT aggregation.
+    aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
   }
 
   std::vector<const Type*> group_by_types;
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 3191a28..d70ea4c 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -23,16 +23,21 @@
          "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Delete.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Delete/")
+add_test(quickstep_queryoptimizer_tests_executiongenerator_distinct
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Distinct.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Distinct/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_drop
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Drop.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Drop/")
- add_test(quickstep_queryoptimizer_tests_executiongenerator_index
-          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
-          "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
-          "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
-          "${CMAKE_CURRENT_BINARY_DIR}/Index/")
+add_test(quickstep_queryoptimizer_tests_executiongenerator_index
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Index.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Index/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_insert
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
@@ -63,6 +68,7 @@
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
diff --git a/query_optimizer/tests/execution_generator/Distinct.test b/query_optimizer/tests/execution_generator/Distinct.test
new file mode 100644
index 0000000..feab898
--- /dev/null
+++ b/query_optimizer/tests/execution_generator/Distinct.test
@@ -0,0 +1,70 @@
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+CREATE TABLE foo(x INT, y DOUBLE, z INT);
+
+INSERT INTO foo
+SELECT i,
+       (i + 0.5) % 100,
+       i % 3
+FROM generate_series(0, 29999) AS gs(i);
+
+
+SELECT COUNT(*),
+       COUNT(DISTINCT x),
+       COUNT(DISTINCT y),
+       COUNT(DISTINCT z)
+FROM foo;
+--
++--------------------+--------------------+--------------------+--------------------+
+|COUNT(*)            |COUNT(DISTINCT x)   |COUNT(DISTINCT y)   |COUNT(DISTINCT z)   |
++--------------------+--------------------+--------------------+--------------------+
+|               30000|               30000|                 100|                   3|
++--------------------+--------------------+--------------------+--------------------+
+==
+
+SELECT SUM(y),
+       SUM(DISTINCT y),
+       COUNT(DISTINCT y),
+       AVG(DISTINCT y),
+       z
+FROM foo
+GROUP BY z
+ORDER BY z;
+--
++------------------------+------------------------+--------------------+------------------------+-----------+
+|SUM(y)                  |SUM(DISTINCT y)         |COUNT(DISTINCT y)   |AVG(DISTINCT y)         |z          |
++------------------------+------------------------+--------------------+------------------------+-----------+
+|                  500000|                    5000|                 100|                      50|          0|
+|                  500000|                    5000|                 100|                      50|          1|
+|                  500000|                    5000|                 100|                      50|          2|
++------------------------+------------------------+--------------------+------------------------+-----------+
+==
+
+SELECT MAX(x) * SUM(DISTINCT y),
+       COUNT(DISTINCT x % y) + z,
+       z
+FROM foo
+GROUP BY z
+ORDER BY z;
+--
++------------------------+-------------------------+-----------+
+|(MAX(x)*SUM(DISTINCT y))|(COUNT(DISTINCT (x%y))+z)|z          |
++------------------------+-------------------------+-----------+
+|               149985000|                      196|          0|
+|               149990000|                      197|          1|
+|               149995000|                      195|          2|
++------------------------+-------------------------+-----------+
+==
diff --git a/query_optimizer/tests/logical_generator/Select.test b/query_optimizer/tests/logical_generator/Select.test
index 2129de1..2709ce3 100644
--- a/query_optimizer/tests/logical_generator/Select.test
+++ b/query_optimizer/tests/logical_generator/Select.test
@@ -596,3 +596,60 @@
 +-output_attributes=
   +-AttributeReference[id=7,name=,alias=(int_col+2),relation=subquery,
     type=Int NULL]
+==
+
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | |   | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=SUM]
+| | |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=AVG,is_distinct=true]
+| | |   |   +-Add
+| | |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   |     +-AttributeReference[id=3,name=double_col,relation=test,
+| | |   |       type=Double NULL]
+| | |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | |     type=Double NULL]
+| | |     +-AggregateFunction[function=AVG,is_distinct=true]
+| | |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| |   | type=Double NULL]
+| |   +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| |     type=Double NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long]
+|   +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+  | type=Long]
+  +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+    type=Double NULL]
diff --git a/query_optimizer/tests/physical_generator/Select.test b/query_optimizer/tests/physical_generator/Select.test
index 69911ec..d653a9b 100644
--- a/query_optimizer/tests/physical_generator/Select.test
+++ b/query_optimizer/tests/physical_generator/Select.test
@@ -1586,3 +1586,108 @@
 |     +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
 +-output_attributes=
   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+==
+
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | |   | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=SUM]
+| | |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=AVG,is_distinct=true]
+| | |   |   +-Add
+| | |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   |     +-AttributeReference[id=3,name=double_col,relation=test,
+| | |   |       type=Double NULL]
+| | |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | |     type=Double NULL]
+| | |     +-AggregateFunction[function=AVG,is_distinct=true]
+| | |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| |   | type=Double NULL]
+| |   +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| |     type=Double NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long]
+|   +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+  | type=Long]
+  +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+    type=Double NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| |   | +-AggregateFunction[function=COUNT,is_distinct=true]
+| |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=AVG,is_distinct=true]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   |     +-AttributeReference[id=3,name=double_col,relation=test,
+| |   |       type=Double NULL]
+| |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Double NULL]
+| |     +-AggregateFunction[function=AVG,is_distinct=true]
+| |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-filter_predicate=Greater
+| | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | | type=Double NULL]
+| | +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| |   type=Double NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long]
+|   +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+  | type=Long]
+  +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+    type=Double NULL]
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index e7cc386..1290d5e 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -2270,6 +2270,63 @@
   +-AttributeReference[id=1,name=date_digits,relation=,type=Long]
 ==
 
+SELECT COUNT(DISTINCT int_col), SUM(float_col)
+FROM test
+GROUP BY long_col
+HAVING AVG(DISTINCT int_col + double_col) > AVG(DISTINCT float_col);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=Aggregate
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-aggregate_expressions=
+| | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | |   | +-AggregateFunction[function=COUNT,is_distinct=true]
+| | |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=SUM]
+| | |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| | |   | type=Double NULL]
+| | |   | +-AggregateFunction[function=AVG,is_distinct=true]
+| | |   |   +-Add
+| | |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   |     +-AttributeReference[id=3,name=double_col,relation=test,
+| | |   |       type=Double NULL]
+| | |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| | |     type=Double NULL]
+| | |     +-AggregateFunction[function=AVG,is_distinct=true]
+| | |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-filter_predicate=Greater
+| |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| |   | type=Double NULL]
+| |   +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+| |     type=Double NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,type=Long]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long]
+|   +-Alias[id=7,name=,alias=SUM(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=COUNT(DISTINCT int_col),relation=,
+  | type=Long]
+  +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
+    type=Double NULL]
+==
+
 select interval '4 day' + interval '5 year'
 from test
 --
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 40f72cc..b362f89 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -212,6 +212,7 @@
     // Add an aggregate.
     serialization::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
     aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+    aggr_proto->set_is_distinct(false);
     if (is_expression) {
       unique_ptr<ScalarBinaryExpression> exp(
           new ScalarBinaryExpression(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd),
@@ -226,6 +227,7 @@
     // Add another aggregate.
     aggr_proto = aggr_state_proto->add_aggregates();
     aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+    aggr_proto->set_is_distinct(false);
     if (is_expression) {
       unique_ptr<ScalarBinaryExpression> exp(
           new ScalarBinaryExpression(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kMultiply),
@@ -299,6 +301,7 @@
     // Add an aggregate.
     serialization::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
     aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+    aggr_proto->set_is_distinct(false);
 
     unique_ptr<ScalarAttribute> attr(new ScalarAttribute(*table_->getAttributeByName(stem + "-0")));
     aggr_proto->add_argument()->CopyFrom(attr->getProto());
@@ -306,6 +309,7 @@
     // Add another aggregate.
     aggr_proto = aggr_state_proto->add_aggregates();
     aggr_proto->mutable_function()->CopyFrom(AggregateFunctionFactory::Get(agg_type).getProto());
+    aggr_proto->set_is_distinct(false);
     attr.reset(new ScalarAttribute(*table_->getAttributeByName(stem + "-1")));
     aggr_proto->add_argument()->CopyFrom(attr->getProto());
 
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index a129cc7..a3a669c 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,6 +59,7 @@
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction*> &aggregate_functions,
     std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+    std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
     const Predicate *predicate,
     const std::size_t estimated_num_entries,
@@ -68,6 +69,7 @@
       predicate_(predicate),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
+      is_distinct_(std::move(is_distinct)),
       storage_manager_(storage_manager) {
   // Sanity checks: each aggregate has a corresponding list of arguments.
   DCHECK(aggregate_functions.size() == arguments_.size());
@@ -85,6 +87,7 @@
 
     handles_.emplace_back(new AggregationHandleDistinct());
     arguments_.push_back({});
+    is_distinct_.emplace_back(false);
 
     group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
         hash_table_impl_type,
@@ -97,7 +100,8 @@
         = aggregate_functions.begin();
     std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it
         = arguments_.begin();
-    for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it) {
+    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
+    for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
       // AggregationHandle.
       std::vector<const Type*> argument_types;
@@ -145,6 +149,25 @@
         arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
 #endif
       }
+
+      // Initialize the corresponding distinctify hash table if this is a DISTINCT
+      // aggregation.
+      if (*is_distinct_it) {
+        std::vector<const Type*> key_types(group_by_types);
+        key_types.insert(key_types.end(), argument_types.begin(), argument_types.end());
+        // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating
+        // the number of entries in the distinctify hash table. We may estimate
+        // for each distinct aggregation an estimated_num_distinct_keys value during
+        // query optimization, if it worths.
+        distinctify_hashtables_.emplace_back(
+            handles_.back()->createDistinctifyHashTable(
+                hash_table_impl_type,
+                key_types,
+                estimated_num_entries,
+                storage_manager));
+      } else {
+        distinctify_hashtables_.emplace_back(nullptr);
+      }
     }
   }
 }
@@ -158,6 +181,7 @@
   // Rebuild contructor arguments from their representation in 'proto'.
   std::vector<const AggregateFunction*> aggregate_functions;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
+  std::vector<bool> is_distinct;
   for (int agg_idx = 0; agg_idx < proto.aggregates_size(); ++agg_idx) {
     const serialization::Aggregate &agg_proto = proto.aggregates(agg_idx);
 
@@ -171,6 +195,8 @@
           agg_proto.argument(argument_idx),
           database));
     }
+
+    is_distinct.emplace_back(agg_proto.is_distinct());
   }
 
   std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
@@ -192,6 +218,7 @@
   return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
                                        aggregate_functions,
                                        std::move(arguments),
+                                       std::move(is_distinct),
                                        std::move(group_by_expressions),
                                        predicate.release(),
                                        proto.estimated_num_entries(),
@@ -256,7 +283,7 @@
   }
 }
 
-void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) {
   if (group_by_list_.empty()) {
     finalizeSingleState(output_destination);
   } else {
@@ -268,8 +295,10 @@
     const std::vector<std::unique_ptr<AggregationState>> &local_state) {
   DEBUG_ASSERT(local_state.size() == single_states_.size());
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    handles_[agg_idx]->mergeStates(*local_state[agg_idx],
-                                   single_states_[agg_idx].get());
+    if (!is_distinct_[agg_idx]) {
+      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+                                     single_states_[agg_idx].get());
+    }
   }
 }
 
@@ -293,13 +322,28 @@
       local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
     }
 #endif
-    // Call StorageBlock::aggregate() to actually do the aggregation.
-    local_state.emplace_back(
-        block->aggregate(*handles_[agg_idx],
-                         arguments_[agg_idx],
-                         local_arguments_as_attributes,
-                         predicate_.get(),
-                         &reuse_matches));
+    if (is_distinct_[agg_idx]) {
+      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
+      // directly into the (threadsafe) shared global distinctify HashTable
+      // for this aggregate.
+      block->aggregateDistinct(*handles_[agg_idx],
+                               arguments_[agg_idx],
+                               local_arguments_as_attributes,
+                               {}, /* group_by */
+                               predicate_.get(),
+                               distinctify_hashtables_[agg_idx].get(),
+                               &reuse_matches,
+                               nullptr /* reuse_group_by_vectors */);
+      local_state.emplace_back(nullptr);
+    } else {
+      // Call StorageBlock::aggregate() to actually do the aggregation.
+      local_state.emplace_back(
+          block->aggregate(*handles_[agg_idx],
+                           arguments_[agg_idx],
+                           local_arguments_as_attributes,
+                           predicate_.get(),
+                           &reuse_matches));
+    }
   }
 
   // Merge per-block aggregation states back with global state.
@@ -322,24 +366,38 @@
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
-    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-    // directly into the (threadsafe) shared global HashTable for this
-    // aggregate.
-    //
-    // TODO(shoban): Implement optional code path for using local hash table per
-    // block, which can be merged with global hash table for all blocks
-    // aggregated on.
-    block->aggregateGroupBy(*handles_[agg_idx],
-                            arguments_[agg_idx],
-                            group_by_list_,
-                            predicate_.get(),
-                            group_by_hashtables_[agg_idx].get(),
-                            &reuse_matches,
-                            &reuse_group_by_vectors);
+    if (is_distinct_[agg_idx]) {
+      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
+      // values and the aggregation arguments together as keys directly into the
+      // (threadsafe) shared global distinctify HashTable for this aggregate.
+      block->aggregateDistinct(*handles_[agg_idx],
+                               arguments_[agg_idx],
+                               nullptr, /* arguments_as_attributes */
+                               group_by_list_,
+                               predicate_.get(),
+                               distinctify_hashtables_[agg_idx].get(),
+                               &reuse_matches,
+                               &reuse_group_by_vectors);
+    } else {
+      // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+      // directly into the (threadsafe) shared global HashTable for this
+      // aggregate.
+      //
+      // TODO(shoban): Implement optional code path for using local hash table per
+      // block, which can be merged with global hash table for all blocks
+      // aggregated on.
+      block->aggregateGroupBy(*handles_[agg_idx],
+                              arguments_[agg_idx],
+                              group_by_list_,
+                              predicate_.get(),
+                              group_by_hashtables_[agg_idx].get(),
+                              &reuse_matches,
+                              &reuse_group_by_vectors);
+    }
   }
 }
 
-void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
   // Simply build up a Tuple from the finalized values for each aggregate and
   // insert it in '*output_destination'.
   std::vector<TypedValue> attribute_values;
@@ -347,13 +405,18 @@
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
+    if (is_distinct_[agg_idx]) {
+      single_states_[agg_idx].reset(
+          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+    }
+
     attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
   }
 
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) const {
+void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
@@ -363,11 +426,17 @@
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
-    ColumnVector* col =
+    if (is_distinct_[agg_idx]) {
+      handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
+          *distinctify_hashtables_[agg_idx],
+          group_by_hashtables_[agg_idx].get());
+    }
+
+    ColumnVector* agg_result_col =
         handles_[agg_idx]->finalizeHashTable(*group_by_hashtables_[agg_idx],
                                              &group_by_keys);
-    if (col != nullptr) {
-      final_values.emplace_back(col);
+    if (agg_result_col != nullptr) {
+      final_values.emplace_back(agg_result_col);
     }
   }
 
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 565d1f8..b883ed1 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <vector>
 
@@ -79,6 +80,8 @@
    * @param arguments For each entry in aggregate_functions, a corresponding
    *        list of argument expressions to that aggregate. This is moved-from,
    *        with AggregationOperationState taking ownership.
+   * @param is_distinct For each entry in aggregate_functions, whether DISTINCT
+   *        should be applied to the entry's arguments.
    * @param group_by A list of expressions to compute the GROUP BY values. If
    *        empty, no grouping is used. This is moved-from, with
    *        AggregationOperationState taking ownership.
@@ -97,6 +100,7 @@
   AggregationOperationState(const CatalogRelationSchema &input_relation,
                             const std::vector<const AggregateFunction*> &aggregate_functions,
                             std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+                            std::vector<bool> &&is_distinct,
                             std::vector<std::unique_ptr<const Scalar>> &&group_by,
                             const Predicate *predicate,
                             const std::size_t estimated_num_entries,
@@ -153,7 +157,7 @@
    * @param output_destination An InsertDestination where the finalized output
    *        tuple(s) from this aggregate are to be written.
    **/
-  void finalizeAggregate(InsertDestination *output_destination) const;
+  void finalizeAggregate(InsertDestination *output_destination);
 
  private:
   // Merge locally (per storage block) aggregated states with global aggregation
@@ -164,8 +168,8 @@
   void aggregateBlockSingleState(const block_id input_block);
   void aggregateBlockHashTable(const block_id input_block);
 
-  void finalizeSingleState(InsertDestination *output_destination) const;
-  void finalizeHashTable(InsertDestination *output_destination) const;
+  void finalizeSingleState(InsertDestination *output_destination);
+  void finalizeHashTable(InsertDestination *output_destination);
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
@@ -178,6 +182,13 @@
   std::vector<std::unique_ptr<AggregationHandle>> handles_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
 
+  // For each aggregate, whether DISTINCT should be applied to the aggregate's
+  // arguments.
+  std::vector<bool> is_distinct_;
+
+  // Hash table for obtaining distinct (i.e. unique) arguments.
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
+
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all an aggregate's argument expressions are simply attributes in
   // 'input_relation_', then this caches the attribute IDs of those arguments.
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 13836a3..031f782 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -1,5 +1,5 @@
 //   Copyright 2011-2015 Quickstep Technologies LLC.
-//   Copyright 2015 Pivotal Software, Inc.
+//   Copyright 2015-2016 Pivotal Software, Inc.
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
 message Aggregate {
   required AggregateFunction function = 1;
   repeated Scalar argument = 2;
+  required bool is_distinct = 3;
 }
 
 message AggregationOperationState {
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 6fd6bb2..e9ddb70 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -486,6 +486,95 @@
                                              hash_table);
 }
 
+void StorageBlock::aggregateDistinct(
+    const AggregationHandle &handle,
+    const std::vector<std::unique_ptr<const Scalar>> &arguments,
+    const std::vector<attribute_id> *arguments_as_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const Predicate *predicate,
+    AggregationStateHashTableBase *distinctify_hash_table,
+    std::unique_ptr<TupleIdSequence> *reuse_matches,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+  DCHECK_GT(arguments.size(), 0u)
+      << "Called aggregateDistinct() with zero argument expressions";
+  DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr));
+
+  std::vector<attribute_id> key_ids;
+
+  // An intermediate ValueAccessor that stores the materialized 'arguments' for
+  // this aggregate, as well as the GROUP BY expression values.
+  ColumnVectorsValueAccessor temp_result;
+  {
+    std::unique_ptr<ValueAccessor> accessor;
+    if (predicate) {
+      if (!*reuse_matches) {
+        // If there is a filter predicate that hasn't already been evaluated,
+        // evaluate it now and save the results for other aggregates on this
+        // same block.
+        reuse_matches->reset(getMatchesForPredicate(predicate));
+      }
+
+      // Create a filtered ValueAccessor that only iterates over predicate
+      // matches.
+      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+    } else {
+      // Create a ValueAccessor that iterates over all tuples in this block
+      accessor.reset(tuple_store_->createValueAccessor());
+    }
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+    // If all the arguments to this aggregate are plain relation attributes,
+    // aggregate directly on a ValueAccessor from this block to avoid a copy.
+    if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
+      DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
+          << "Mismatch between number of arguments and number of attribute_ids";
+      DCHECK_EQ(group_by.size(), 0u);
+      handle.insertValueAccessorIntoDistinctifyHashTable(
+          accessor.get(), *arguments_as_attributes, distinctify_hash_table);
+      return;
+    }
+#endif
+
+    SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                      indices_,
+                                      indices_consistent_);
+    attribute_id attr_id = 0;
+
+    if (!group_by.empty()) {
+      // Put GROUP BY keys into 'temp_result'.
+      if (reuse_group_by_vectors->empty()) {
+        // Compute GROUP BY values from group_by Scalars, and store them in
+        // reuse_group_by_vectors for reuse by other aggregates on this same
+        // block.
+        reuse_group_by_vectors->reserve(group_by.size());
+        for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+          reuse_group_by_vectors->emplace_back(
+              group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+          temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+          key_ids.push_back(attr_id++);
+        }
+      } else {
+        // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+        DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+            << "Wrong number of reuse_group_by_vectors";
+        for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+          temp_result.addColumn(reuse_cv.get(), false);
+          key_ids.push_back(attr_id++);
+        }
+      }
+    }
+    // Compute argument vectors and add them to 'temp_result'.
+    for (const std::unique_ptr<const Scalar> &argument : arguments) {
+      temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref));
+      key_ids.push_back(attr_id++);
+    }
+  }
+
+  handle.insertValueAccessorIntoDistinctifyHashTable(
+      &temp_result, key_ids, distinctify_hash_table);
+}
+
+
 // TODO(chasseur): Vectorization for updates.
 StorageBlock::UpdateResult StorageBlock::update(
     const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 49536c3..da3bc70 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -447,6 +447,50 @@
                             *reuse_group_by_vectors) const;
 
   /**
+   * @brief Inserts the GROUP BY expressions and aggregation arguments together
+   *        as keys into the distinctify hash table.
+   *
+   * This is the first step for DISTINCT aggregation. It populates the distinctify
+   * hash table so that arguments are distinctified within each GROUP BY group.
+   * Later, a second-round aggregation on the distinctify hash table will be
+   * performed to actually compute the aggregated result for each GROUP BY group.
+   *
+   * @param handle Aggregation handle to compute aggregates with.
+   * @param arguments The arguments to the aggregation function as Scalars.
+   * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
+   *        for each of the elements in arguments, and is used to elide a copy.
+   *        Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
+   * @param group_by The list of GROUP BY attributes/expressions.
+   * @param predicate A predicate for selection. \c nullptr indicates that all
+   *        tuples should be aggregated on.
+   * @param distinctify_hash_table Hash table to store the arguments and GROUP
+   *        BY expressions together as hash table key and a bool constant \c true
+   *        as hash table value. (So the hash table actually serves as a hash set.)
+   * @param reuse_matches This parameter is used to store and reuse tuple-id
+   *        sequence of matches pre-computed in an earlier invocations of
+   *        aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
+   *        use.  Current invocation of aggregateGroupBy() will reuse
+   *        TupleIdSequence if passed, otherwise computes a TupleIdSequence based
+   *        on \c predicate and stores in \c reuse_matches. We use
+   *        std::unique_ptr for each of use, since the caller will not have to
+   *        selective free.
+   * @param reuse_group_by_vectors This parameter is used to store and reuse
+   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
+   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
+   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
+   *        on \c group_by and stores them in \c reuse_group_by_vectors.
+   */
+  void aggregateDistinct(const AggregationHandle &handle,
+                         const std::vector<std::unique_ptr<const Scalar>> &arguments,
+                         const std::vector<attribute_id> *arguments_as_attributes,
+                         const std::vector<std::unique_ptr<const Scalar>> &group_by,
+                         const Predicate *predicate,
+                         AggregationStateHashTableBase *distinctify_hash_table,
+                         std::unique_ptr<TupleIdSequence> *reuse_matches,
+                         std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
+  /**
    * @brief Perform an UPDATE query over the tuples in this StorageBlock.
    * @warning In some edge cases, calling this method may cause IndexSubBlocks
    *          in this block to become inconsistent (the TupleStorageSubBlock