Merge branch 'master' of github.com:pivotalsoftware/quickstep into bloom-optimized-selection
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 94da838..64b4f16 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -36,6 +38,9 @@
 add_library(quickstep_catalog_CatalogRelationSchema
             CatalogRelationSchema.cpp
             CatalogRelationSchema.hpp)
+add_library(quickstep_catalog_CatalogRelationStatistics
+            CatalogRelationStatistics.cpp
+            CatalogRelationStatistics.hpp)
 add_library(quickstep_catalog_CatalogTypedefs ../empty_src.cpp CatalogTypedefs.hpp)
 add_library(quickstep_catalog_IndexScheme IndexScheme.cpp IndexScheme.hpp)
 if(QUICKSTEP_HAVE_LIBNUMA)
@@ -98,6 +103,7 @@
                       glog
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
                       quickstep_catalog_IndexScheme
@@ -111,6 +117,10 @@
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector)
+target_link_libraries(quickstep_catalog_CatalogRelationStatistics
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_Catalog_proto
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_catalog_IndexScheme
                       glog
                       quickstep_catalog_Catalog_proto
@@ -173,6 +183,7 @@
                       quickstep_catalog_CatalogErrors
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_IndexScheme
                       quickstep_catalog_PartitionScheme
diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto
index 81e28cf..ce4bc2e 100644
--- a/catalog/Catalog.proto
+++ b/catalog/Catalog.proto
@@ -1,7 +1,7 @@
 //   Copyright 2011-2015 Quickstep Technologies LLC.
 //   Copyright 2015-2016 Pivotal Software, Inc.
 //   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
-//    University of Wisconsin—Madison.
+//     University of Wisconsin—Madison.
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -80,6 +80,16 @@
   repeated IndexEntry index_entries = 1;
 }
 
+message CatalogRelationStatistics {
+  optional fixed64 num_tuples = 1;
+  
+  message NumDistinctValuesEntry {
+    required int32 attr_id = 1;
+    required fixed64 num_distinct_values = 2;
+  }
+  repeated NumDistinctValuesEntry num_distinct_values_map = 2;
+}
+
 message CatalogRelationSchema {
   required int32 relation_id = 1;
   required string name = 2;
@@ -99,6 +109,7 @@
     optional IndexScheme index_scheme = 18;
     optional PartitionScheme partition_scheme = 19;
     optional NUMAPlacementScheme placement_scheme = 20;
+    optional CatalogRelationStatistics statistics = 21;
   }
 }
 
diff --git a/catalog/CatalogRelation.cpp b/catalog/CatalogRelation.cpp
index 36f82d9..01aebb5 100644
--- a/catalog/CatalogRelation.cpp
+++ b/catalog/CatalogRelation.cpp
@@ -132,6 +132,14 @@
   }
 
   default_layout_.reset(new StorageBlockLayout(*this, proto_default_layout));
+
+  if (proto.HasExtension(serialization::CatalogRelation::statistics)) {
+    statistics_.reset(
+        new CatalogRelationStatistics(
+            proto.GetExtension(serialization::CatalogRelation::statistics)));
+  } else {
+    statistics_.reset(new CatalogRelationStatistics());
+  }
 }
 
 serialization::CatalogRelationSchema CatalogRelation::getProto() const {
@@ -177,6 +185,9 @@
 #endif
   }
 
+  proto.MutableExtension(serialization::CatalogRelation::statistics)
+      ->MergeFrom(statistics_->getProto());
+
   return proto;
 }
 
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index 312f3b4..e0d5350 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -29,6 +29,7 @@
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogConfig.h"
 #include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogRelationStatistics.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "catalog/IndexScheme.hpp"
 
@@ -79,7 +80,8 @@
                   const relation_id id = -1,
                   bool temporary = false)
       : CatalogRelationSchema(parent, name, id, temporary),
-        default_layout_(nullptr) {
+        default_layout_(nullptr),
+        statistics_(new CatalogRelationStatistics()) {
   }
 
   /**
@@ -377,6 +379,24 @@
            * getDefaultStorageBlockLayout().estimateTuplesPerBlock();
   }
 
+  /**
+   * @brief Get an immutable reference to the statistics of this catalog relation.
+   *
+   * @return A reference to the statistics of this catalog relation.
+   */
+  const CatalogRelationStatistics& getStatistics() const {
+    return *statistics_;
+  }
+
+  /**
+   * @brief Get a mutable pointer to the statistics of this catalog relation.
+   *
+   * @return A pointer to the statistics of this catalog relation.
+   */
+  CatalogRelationStatistics* getStatisticsMutable() {
+    return statistics_.get();
+  }
+
  private:
   // A list of blocks belonged to the relation.
   std::vector<block_id> blocks_;
@@ -397,6 +417,8 @@
   // Mutex for locking the index scheme.
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> index_scheme_mutex_;
 
+  std::unique_ptr<CatalogRelationStatistics> statistics_;
+
 #ifdef QUICKSTEP_HAVE_LIBNUMA
   // NUMA placement scheme object which has the mapping between the partitions
   // of the relation and the NUMA nodes/sockets. It also maintains a mapping
diff --git a/catalog/CatalogRelationStatistics.cpp b/catalog/CatalogRelationStatistics.cpp
new file mode 100644
index 0000000..2bd92b4
--- /dev/null
+++ b/catalog/CatalogRelationStatistics.cpp
@@ -0,0 +1,49 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "catalog/CatalogRelationStatistics.hpp"
+
+#include "catalog/Catalog.pb.h"
+
+namespace quickstep {
+
+CatalogRelationStatistics::CatalogRelationStatistics(
+    const serialization::CatalogRelationStatistics &proto) {
+  if (proto.has_num_tuples()) {
+    num_tuples_ = proto.num_tuples();
+  }
+  for (int i = 0; i < proto.num_distinct_values_map_size(); ++i) {
+    const auto &entry = proto.num_distinct_values_map(i);
+    num_distinct_values_map_.emplace(entry.attr_id(),
+                                     entry.num_distinct_values());
+  }
+}
+
+serialization::CatalogRelationStatistics CatalogRelationStatistics::getProto() const {
+  serialization::CatalogRelationStatistics proto;
+  if (num_tuples_ != 0) {
+    proto.set_num_tuples(num_tuples_);
+  }
+  for (const auto &pair : num_distinct_values_map_) {
+    auto entry = proto.add_num_distinct_values_map();
+    entry->set_attr_id(pair.first);
+    entry->set_num_distinct_values(pair.second);
+  }
+  return proto;
+}
+
+}  // namespace quickstep
diff --git a/catalog/CatalogRelationStatistics.hpp b/catalog/CatalogRelationStatistics.hpp
new file mode 100644
index 0000000..572d141
--- /dev/null
+++ b/catalog/CatalogRelationStatistics.hpp
@@ -0,0 +1,122 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_CATALOG_CATALOG_RELATION_STATISTICS_HPP_
+#define QUICKSTEP_CATALOG_CATALOG_RELATION_STATISTICS_HPP_
+
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Catalog
+ *  @{
+ */
+
+/**
+ * @brief Statistics of a catalog relation. E.g. total number of tuples,
+ *        number of distinct values for each column.
+ **/
+class CatalogRelationStatistics {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  CatalogRelationStatistics()
+      : num_tuples_(0) {}
+
+  /**
+   * @brief Reconstruct a CatalogRelationStatistics object from its serialized
+   *        Protocol Buffer form.
+   *
+   * @param proto The Protocol Buffer serialization of a CatalogRelationStatistics
+   *        object, previously produced by getProto().
+   **/
+  explicit CatalogRelationStatistics(const serialization::CatalogRelationStatistics &proto);
+
+  /**
+   * @brief Serialize the CatalogRelationStatistics object as Protocol Buffer.
+   *
+   * @return The Protocol Buffer representation of the CatalogRelationStatistics
+   *         object.
+   **/
+  serialization::CatalogRelationStatistics getProto() const;
+
+  /**
+   * @brief Set the number of tuples statistic.
+   *
+   * @param num_tuples The number of tuples statistic.
+   */
+  void setNumTuples(std::size_t num_tuples) {
+    num_tuples_ = num_tuples;
+  }
+
+  /**
+   * @brief Get the number of tuples statistic.
+   *
+   * @return The number of tuples. Returns 0 if the statistic is not set.
+   */
+  std::size_t getNumTuples() const {
+    return num_tuples_;
+  }
+
+  /**
+   * @brief Set the number of distinct values statistic for a column (catalog attribute).
+   *
+   * @param attr_id The id of the column.
+   * @param num_distinct_values The number of distinct values statistic.
+   */
+  void setNumDistinctValues(attribute_id attr_id, std::size_t num_distinct_values) {
+    num_distinct_values_map_[attr_id] = num_distinct_values;
+  }
+
+  /**
+   * @brief Get the number of distinct values statistic for a column (catalog attribute).
+   *
+   * @param The id of the column.
+   * @return The number of distinct values statistic for the column. Returns 0
+   *         if the statistic is not set.
+   */
+  std::size_t getNumDistinctValues(attribute_id attr_id) const {
+    const auto it = num_distinct_values_map_.find(attr_id);
+    if (it == num_distinct_values_map_.end()) {
+      return static_cast<std::size_t>(0);
+    } else {
+      return it->second;
+    }
+  }
+
+ private:
+  // Total number of tuples in the relation.
+  std::size_t num_tuples_;
+
+  // Number of distinct values for each column.
+  std::unordered_map<attribute_id, std::size_t> num_distinct_values_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(CatalogRelationStatistics);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CATALOG_CATALOG_RELATION_STATISTICS_HPP_
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index a1989d5..8fee7a4 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -73,10 +75,24 @@
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
-                      quickstep_parser_ParseStatement
+                      quickstep_cli_DropRelation
                       quickstep_cli_PrintToScreen
-                      quickstep_utility_Macros
-                      quickstep_utility_PtrVector                        
+                      quickstep_parser_ParseStatement
+                      quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_Foreman
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryPlan
+                      quickstep_queryoptimizer_QueryProcessor
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_TupleStorageSubBlock
+                      quickstep_parser_ParseString
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_utility_PtrVector
                       quickstep_utility_SqlError)
 
 target_link_libraries(quickstep_cli_DefaultsConfigurator
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index f38121f..3cb3f86 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstddef>
+#include <cstdint>
 #include <cstdio>
 #include <memory>
 #include <string>
@@ -28,13 +29,26 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationSchema.hpp"
+#include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "parser/ParseStatement.hpp"
+#include "parser/ParseString.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/Foreman.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/PtrVector.hpp"
-#include "utility/Macros.hpp"
 #include "utility/SqlError.hpp"
 
-#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 using std::fprintf;
@@ -52,15 +66,22 @@
 
 void executeDescribeDatabase(
     const PtrVector<ParseString> *arguments,
-    const CatalogDatabase &catalog_database, FILE *out) {
+    const CatalogDatabase &catalog_database,
+    StorageManager *storage_manager,
+    FILE *out) {
   // Column width initialized to 6 to take into account the header name
   // and the column value table
   int max_column_width = C::kInitMaxColumnWidth;
+  vector<std::size_t> num_tuples;
+  vector<std::size_t> num_blocks;
   const CatalogRelation *relation = nullptr;
   if (arguments->size() == 0) {
     for (const CatalogRelation &rel : catalog_database) {
       max_column_width =
           std::max(static_cast<int>(rel.getName().length()), max_column_width);
+      num_blocks.push_back(rel.size_blocks());
+      num_tuples.push_back(
+          PrintToScreen::GetNumTuplesInRelation(rel, storage_manager));
     }
   } else {
     const ParseString &table_name = arguments->front();
@@ -72,26 +93,51 @@
     }
     max_column_width = std::max(static_cast<int>(relation->getName().length()),
                                     max_column_width);
+    num_blocks.push_back(relation->size_blocks());
+    num_tuples.push_back(PrintToScreen::GetNumTuplesInRelation(
+        *relation,
+        storage_manager));
   }
   // Only if we have relations work on the printing logic.
   if (catalog_database.size() > 0) {
+    const std::size_t max_num_blocks = *std::max_element(num_blocks.begin(), num_blocks.end());
+    const std::size_t max_num_rows = *std::max_element(num_tuples.begin(), num_tuples.end());
+    const int max_num_rows_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_rows),
+                                    C::kInitMaxColumnWidth);
+    const int max_num_blocks_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_blocks),
+                                      C::kInitMaxColumnWidth+2);
+
     vector<int> column_widths;
-    column_widths.push_back(max_column_width+1);
-    column_widths.push_back(C::kInitMaxColumnWidth+1);
+    column_widths.push_back(max_column_width +1);
+    column_widths.push_back(C::kInitMaxColumnWidth + 1);
+    column_widths.push_back(max_num_blocks_digits + 1);
+    column_widths.push_back(max_num_rows_digits + 1);
     fputs("       List of relations\n\n", out);
     fprintf(out, "%-*s |", max_column_width+1, " Name");
-    fprintf(out, "%-*s\n", C::kInitMaxColumnWidth, " Type");
+    fprintf(out, "%-*s |", C::kInitMaxColumnWidth, " Type");
+    fprintf(out, "%-*s |", max_num_blocks_digits, " Blocks");
+    fprintf(out, "%-*s\n", max_num_rows_digits, " Rows");
     PrintToScreen::printHBar(column_widths, out);
     //  If there are no argument print the entire list of tables
     //  else print the particular table only.
+    vector<std::size_t>::const_iterator num_tuples_it = num_tuples.begin();
+    vector<std::size_t>::const_iterator num_blocks_it = num_blocks.begin();
     if (arguments->size() == 0) {
       for (const CatalogRelation &rel : catalog_database) {
         fprintf(out, " %-*s |", max_column_width, rel.getName().c_str());
-        fprintf(out, " %-*s\n", C::kInitMaxColumnWidth, "table");
+        fprintf(out, " %-*s |", C::kInitMaxColumnWidth - 1, "table");
+        fprintf(out, " %-*lu |", max_num_blocks_digits - 1, *num_blocks_it);
+        fprintf(out, " %-*lu\n", max_num_rows_digits - 1, *num_tuples_it);
+        ++num_tuples_it;
+        ++num_blocks_it;
       }
     } else {
       fprintf(out, " %-*s |", max_column_width, relation->getName().c_str());
-      fprintf(out, " %-*s\n", C::kInitMaxColumnWidth, "table");
+      fprintf(out, " %-*s |", C::kInitMaxColumnWidth -1, "table");
+      fprintf(out, " %-*lu |", max_num_blocks_digits - 1, *num_blocks_it);
+      fprintf(out, " %-*lu\n", max_num_rows_digits - 1, *num_tuples_it);
+      ++num_tuples_it;
+      ++num_blocks_it;
     }
     fputc('\n', out);
   }
@@ -162,22 +208,144 @@
   }
 }
 
+/**
+ * @brief A helper function that executes a SQL query to obtain a scalar result.
+ */
+inline TypedValue executeQueryForSingleResult(const std::string &query_string,
+                                               StorageManager *storage_manager,
+                                               QueryProcessor *query_processor,
+                                               SqlParserWrapper *parser_wrapper,
+                                               Foreman *foreman) {
+  parser_wrapper->feedNextBuffer(new std::string(query_string));
+
+  ParseResult result = parser_wrapper->getNextStatement();
+  DCHECK(result.condition == ParseResult::kSuccess);
+
+  // Generate the query plan.
+  std::unique_ptr<QueryHandle> query_handle(
+      query_processor->generateQueryHandle(*result.parsed_statement));
+  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+  // Use foreman to execute the query plan.
+  foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
+  foreman->reconstructQueryContextFromProto(query_handle->getQueryContextProto());
+
+  foreman->start();
+  foreman->join();
+
+  // Retrieve the scalar result from the result relation.
+  const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+  DCHECK(query_result_relation != nullptr);
+
+  TypedValue value;
+  {
+    std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+    DCHECK_EQ(1u, blocks.size());
+    BlockReference block = storage_manager->getBlock(blocks[0], *query_result_relation);
+    const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+    DCHECK_EQ(1, tuple_store.numTuples());
+    DCHECK_EQ(1u, tuple_store.getRelation().size());
+
+    if (tuple_store.isPacked()) {
+      value = tuple_store.getAttributeValueTyped(0, 0);
+    } else {
+      std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+      value = tuple_store.getAttributeValueTyped(*existence_map->begin(), 0);
+    }
+    value.ensureNotReference();
+  }
+
+  // Drop the result relation.
+  DropRelation::Drop(*query_result_relation,
+                     query_processor->getDefaultDatabase(),
+                     query_processor->getStorageManager());
+
+  return value;
+}
+
+void executeAnalyze(QueryProcessor *query_processor,
+                    Foreman *foreman,
+                    FILE *out) {
+  const CatalogDatabase &database = *query_processor->getDefaultDatabase();
+  StorageManager *storage_manager = query_processor->getStorageManager();
+
+  std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
+  std::vector<std::reference_wrapper<const CatalogRelation>> relations(
+      database.begin(), database.end());
+
+  // Analyze each relation in the database.
+  for (const CatalogRelation &relation : relations) {
+    fprintf(out, "Analyzing %s ... ", relation.getName().c_str());
+    fflush(out);
+
+    CatalogRelation *mutable_relation =
+        query_processor->getDefaultDatabase()->getRelationByIdMutable(relation.getID());
+
+    // Get the number of distinct values for each column.
+    for (const CatalogAttribute &attribute : relation) {
+      std::string query_string = "SELECT COUNT(DISTINCT ";
+      query_string.append(attribute.getName());
+      query_string.append(") FROM ");
+      query_string.append(relation.getName());
+      query_string.append(";");
+
+      TypedValue num_distinct_values =
+          executeQueryForSingleResult(query_string,
+                                      storage_manager,
+                                      query_processor,
+                                      parser_wrapper.get(),
+                                      foreman);
+
+      DCHECK(num_distinct_values.getTypeID() == TypeID::kLong);
+      mutable_relation->getStatisticsMutable()->setNumDistinctValues(
+          attribute.getID(),
+          num_distinct_values.getLiteral<std::int64_t>());
+    }
+
+    // Get the number of tuples for the relation.
+    std::string query_string = "SELECT COUNT(*) FROM ";
+    query_string.append(relation.getName());
+    query_string.append(";");
+
+    TypedValue num_tuples =
+        executeQueryForSingleResult(query_string,
+                                    storage_manager,
+                                    query_processor,
+                                    parser_wrapper.get(),
+                                    foreman);
+
+    DCHECK(num_tuples.getTypeID() == TypeID::kLong);
+    mutable_relation->getStatisticsMutable()->setNumTuples(
+        num_tuples.getLiteral<std::int64_t>());
+
+    fprintf(out, "done\n");
+    fflush(out);
+  }
+  query_processor->markCatalogAltered();
+  query_processor->saveCatalog();
+}
+
 }  // namespace
 
 void executeCommand(const ParseStatement &statement,
                     const CatalogDatabase &catalog_database,
+                    StorageManager *storage_manager,
+                    QueryProcessor *query_processor,
+                    Foreman *foreman,
                     FILE *out) {
   const ParseCommand &command = static_cast<const ParseCommand &>(statement);
   const PtrVector<ParseString> *arguments = command.arguments();
   const std::string &command_str = command.command()->value();
   if (command_str == C::kDescribeDatabaseCommand) {
-    executeDescribeDatabase(arguments, catalog_database, out);
+    executeDescribeDatabase(arguments, catalog_database, storage_manager, out);
   } else if (command_str == C::kDescribeTableCommand) {
     if (arguments->size() == 0) {
-      executeDescribeDatabase(arguments, catalog_database, out);
+      executeDescribeDatabase(arguments, catalog_database, storage_manager, out);
     } else {
       executeDescribeTable(arguments, catalog_database, out);
     }
+  } else if (command_str == C::kAnalyzeCommand) {
+    executeAnalyze(query_processor, foreman, out);
   } else {
     THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
   }
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index 21eee6a..c819981 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -21,9 +21,6 @@
 #include <cstdio>
 #include <string>
 
-#include "parser/ParseStatement.hpp"
-#include "utility/Macros.hpp"
-
 using std::fprintf;
 using std::fputc;
 using std::string;
@@ -31,10 +28,13 @@
 namespace quickstep {
 
 class CatalogDatabase;
-class CatalogAttribute;
-class CatalogRelation;
+class Foreman;
+class ParseStatement;
+class QueryProcessor;
+class StorageManager;
 
 namespace cli {
+
 /** \addtogroup CLI
  *  @{
  */
@@ -46,16 +46,23 @@
 
 constexpr char kDescribeDatabaseCommand[] = "\\dt";
 constexpr char kDescribeTableCommand[] = "\\d";
+constexpr char kAnalyzeCommand[] = "\\analyze";
 
 /**
   * @brief Executes the command by calling the command handler.
-  *        
+  *
   * @param statement The parsed statement from the cli.
   * @param catalog_database The catalog information about the current database.
-  * @param out The stream where the output of the command has to be redirected to.      
+  * @param storage_manager The current StorageManager.
+  * @param query_processor The query processor to generate plans for SQL queries.
+  * @param foreman The foreman to execute query plans.
+  * @param out The stream where the output of the command has to be redirected to.
 */
 void executeCommand(const ParseStatement &statement,
                     const CatalogDatabase &catalog_database,
+                    StorageManager *storage_manager,
+                    QueryProcessor *query_processor,
+                    Foreman *foreman,
                     FILE *out);
 
 /** @} */
diff --git a/cli/PrintToScreen.cpp b/cli/PrintToScreen.cpp
index 227ff39..76e90eb 100644
--- a/cli/PrintToScreen.cpp
+++ b/cli/PrintToScreen.cpp
@@ -19,6 +19,7 @@
 
 #include <cstddef>
 #include <cstdio>
+#include <cmath>
 #include <memory>
 #include <vector>
 
@@ -47,6 +48,16 @@
             "If true, print query results to screen normally. If false, skip "
             "printing output (e.g. for benchmarking).");
 
+int PrintToScreen::GetNumberOfDigits(int number) {
+  if (number > 0) {
+    return static_cast<int>(log10 (number)) + 1;
+  } else if (number < 0) {
+    return static_cast<int>(log10 ( abs(number) )) + 2;
+  } else {
+    return 1;
+  }
+}
+
 void PrintToScreen::PrintRelation(const CatalogRelation &relation,
                                   StorageManager *storage_manager,
                                   FILE *out) {
diff --git a/cli/PrintToScreen.hpp b/cli/PrintToScreen.hpp
index 0b57b4b..6a29426 100644
--- a/cli/PrintToScreen.hpp
+++ b/cli/PrintToScreen.hpp
@@ -69,6 +69,14 @@
                               StorageManager *storage_manager,
                               FILE *out);
 
+  /**
+   * @brief Return the number of digits in a number
+   *
+   * @param number The input number.
+   * @param out The number of digits in the input number.
+   **/
+  static int GetNumberOfDigits(int number);
+
  private:
   // Undefined default constructor. Class is all-static and should not be
   // instantiated.
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 5881b3e..558d6eb 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -365,7 +365,11 @@
           try {
             quickstep::cli::executeCommand(
                 *result.parsed_statement,
-                *(query_processor->getDefaultDatabase()), stdout);
+                *(query_processor->getDefaultDatabase()),
+                query_processor->getStorageManager(),
+                query_processor.get(),
+                &foreman,
+                stdout);
           } catch (const quickstep::SqlError &sql_error) {
             fprintf(stderr, "%s",
                     sql_error.formatMessage(*command_string).c_str());
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 49930e1..9cd493e 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -87,6 +87,9 @@
           quickstep::cli::executeCommand(
               *result.parsed_statement,
               *(test_database_loader_.catalog_database()),
+              test_database_loader_.storage_manager(),
+              nullptr,
+              nullptr,
               output_stream.file());
         } else  {
           QueryHandle query_handle(optimizer_context.query_id());
diff --git a/cli/tests/command_executor/D.test b/cli/tests/command_executor/D.test
index 1d500df..45f8d8b 100644
--- a/cli/tests/command_executor/D.test
+++ b/cli/tests/command_executor/D.test
@@ -34,9 +34,21 @@
                    col3 DOUBLE,
                    col4 FLOAT,
                    col5 CHAR(5));
-CREATE INDEX foo4_index_1 ON foo4 (col1, col2) USING CSBTREE; 
-CREATE INDEX foo4_index_2 ON foo4 (col3, col4) USING CSBTREE; 
+CREATE INDEX foo4_index_1 ON foo4 (col1, col2) USING CSBTREE;
+CREATE INDEX foo4_index_2 ON foo4 (col3, col4) USING CSBTREE;
 CREATE TABLE averylongtablenamethatseemstoneverend (col1 INT);
+DROP TABLE TEST;
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (1);
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (2);
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (3);
+INSERT INTO foo values(1, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(2, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(3, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(4, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo2 values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo2 values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo3 values(5, 1, 1.0, 1.0, 'XYZZ');
 --
 ==
 \d foo
@@ -50,7 +62,7 @@
  col4   | Float  
  col5   | Char(5)
 ==
-\d foo2                   
+\d foo2
 --
  Table "foo2"
  Column                         | Type   
@@ -73,7 +85,7 @@
  col5   | Char(5)
  Indexes
   "foo3_index_1" CSB_TREE (col1)
-==                    
+==
 \d foo4
 --
  Table "foo4"
@@ -92,14 +104,13 @@
 --
        List of relations
 
- Name                                  | Type 
-+--------------------------------------+-------+
- Test                                  | table 
- foo                                   | table 
- foo2                                  | table 
- foo3                                  | table 
- foo4                                  | table 
- averylongtablenamethatseemstoneverend | table 
+ Name                                  | Type  | Blocks  | Rows 
++--------------------------------------+-------+---------+-------+
+ foo                                   | table | 1       | 5    
+ foo2                                  | table | 1       | 2    
+ foo3                                  | table | 1       | 1    
+ foo4                                  | table | 0       | 0    
+ averylongtablenamethatseemstoneverend | table | 1       | 3    
 
 ==
 \d invalidtable
diff --git a/cli/tests/command_executor/Dt.test b/cli/tests/command_executor/Dt.test
index 6458e15..1de6360 100644
--- a/cli/tests/command_executor/Dt.test
+++ b/cli/tests/command_executor/Dt.test
@@ -33,21 +33,41 @@
                    col3 DOUBLE,
                    col4 FLOAT,
                    col5 CHAR(5));
+DROP TABLE TEST;
 CREATE TABLE averylongtablenamethatseemstoneverend (col1 INT);
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (1);
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (2);
+INSERT INTO averylongtablenamethatseemstoneverend VALUES (3);
+INSERT INTO foo values(1, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(2, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(3, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(4, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo2 values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo2 values(5, 1, 1.0, 1.0, 'XYZ');
+INSERT INTO foo3 values(5, 1, 1.0, 1.0, 'XYZZ');
 --
 ==
 \dt
 --
        List of relations
 
- Name                                  | Type 
-+--------------------------------------+-------+
- Test                                  | table 
- foo                                   | table 
- foo2                                  | table 
- foo3                                  | table 
- foo4                                  | table 
- averylongtablenamethatseemstoneverend | table 
+ Name                                  | Type  | Blocks  | Rows 
++--------------------------------------+-------+---------+-------+
+ foo                                   | table | 1       | 5    
+ foo2                                  | table | 1       | 2    
+ foo3                                  | table | 1       | 1    
+ foo4                                  | table | 0       | 0    
+ averylongtablenamethatseemstoneverend | table | 1       | 3    
+
+==
+\dt foo
+--
+       List of relations
+
+ Name   | Type  | Blocks  | Rows 
++-------+-------+---------+-------+
+ foo    | table | 1       | 5    
 
 ==
 \dt invalidtable
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 7209cfa..c590b6e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1389,11 +1389,13 @@
 
     // Add distinctify hash table impl type if it is a DISTINCT aggregation.
     if (unnamed_aggregate_expression->is_distinct()) {
-      if (group_by_types.empty()) {
+      const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
+      DCHECK_GE(arguments.size(), 1u);
+      if (group_by_types.empty() && arguments.size() == 1) {
         aggr_state_proto->add_distinctify_hash_table_impl_types(
             SimplifyHashTableImplTypeProto(
                 HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
-                {&unnamed_aggregate_expression->getValueType()}));
+                {&arguments[0]->getValueType()}));
       } else {
         aggr_state_proto->add_distinctify_hash_table_impl_types(
             HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type));
diff --git a/query_optimizer/QueryProcessor.hpp b/query_optimizer/QueryProcessor.hpp
index 4514f45..32739dc 100644
--- a/query_optimizer/QueryProcessor.hpp
+++ b/query_optimizer/QueryProcessor.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -159,6 +161,14 @@
   void saveCatalog();
 
   /**
+   * @brief Set \p catalog_altered_ to true to indicate that the catalog
+   *        has been altered.
+   */
+  void markCatalogAltered() {
+    catalog_altered_ = true;
+  }
+
+  /**
    * @brief Get the default database in the Catalog held by this
    *        QueryProcessor.
    **/
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 104a02d..aa03794 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -59,7 +59,7 @@
 
 namespace {
 
-DEFINE_bool(vector_based_joined_tuple_collector, true,
+DEFINE_bool(vector_based_joined_tuple_collector, false,
             "If true, use simple vector-based joined tuple collector in "
             "hash join, with a final sort pass to group joined tuple pairs "
             "by inner block. If false, use unordered_map based collector that "
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 115248c..26a3e32 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -585,20 +585,22 @@
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
-target_link_libraries(quickstep_storage_FileManagerHdfs
-                      gflags_nothreads-static
-                      quickstep_storage_FileManager
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageErrors
-                      quickstep_utility_Macros
-                      quickstep_utility_StringUtil
-                      ${LIBHDFS3_LIBRARIES})
+  target_link_libraries(quickstep_storage_FileManagerHdfs
+                        glog
+                        gflags_nothreads-static
+                        quickstep_storage_FileManager
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageConstants
+                        quickstep_storage_StorageErrors
+                        quickstep_utility_Macros
+                        quickstep_utility_StringUtil
+                        ${LIBHDFS3_LIBRARIES})
 endif()
 if (QUICKSTEP_HAVE_FILE_MANAGER_POSIX)
   target_link_libraries(quickstep_storage_FileManagerLocal
                         quickstep_storage_FileManagerPosix)
   target_link_libraries(quickstep_storage_FileManagerPosix
+                        glog
                         quickstep_storage_FileManager
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageConstants
@@ -609,6 +611,7 @@
   target_link_libraries(quickstep_storage_FileManagerLocal
                         quickstep_storage_FileManagerWindows)
   target_link_libraries(quickstep_storage_FileManagerWindows
+                        glog
                         quickstep_storage_FileManager
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageConstants
diff --git a/storage/FileManager.hpp b/storage/FileManager.hpp
index d56c960..b179071 100644
--- a/storage/FileManager.hpp
+++ b/storage/FileManager.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -19,10 +19,7 @@
 #define QUICKSTEP_STORAGE_FILE_MANAGER_HPP_
 
 #include <cstddef>
-#include <cstdint>
-#include <cstdio>
 #include <string>
-#include <utility>
 
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
@@ -49,12 +46,12 @@
    * @param block_domain The domain of a block id.
    **/
   explicit FileManager(const std::string &storage_path)
-      : storage_path_(storage_path) { }
+      : storage_path_(storage_path) {}
 
   /**
    * @brief Virtual destructor.
    **/
-  virtual ~FileManager() { }
+  virtual ~FileManager() {}
 
   /**
    * @brief Get a block or blob's relative filename, which uses storage_path_
diff --git a/storage/FileManagerHdfs.cpp b/storage/FileManagerHdfs.cpp
index 5f9706e..e8f048b 100644
--- a/storage/FileManagerHdfs.cpp
+++ b/storage/FileManagerHdfs.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -32,10 +32,10 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageConstants.hpp"
 #include "storage/StorageErrors.hpp"
-#include "utility/Macros.hpp"
 #include "utility/StringUtil.hpp"
 
 #include "gflags/gflags.h"
+#include "glog/logging.h"
 
 using std::size_t;
 using std::sscanf;
@@ -76,20 +76,19 @@
 
 FileManagerHdfs::FileManagerHdfs(const string &storage_path)
     : FileManager(storage_path) {
-  DEBUG_ASSERT(hdfs_namenode_port_dummy);
-  DEBUG_ASSERT(hdfs_num_replications_dummy);
+  DCHECK(hdfs_namenode_port_dummy);
+  DCHECK(hdfs_num_replications_dummy);
 
   struct hdfsBuilder *builder = hdfsNewBuilder();
   hdfsBuilderSetNameNode(builder, FLAGS_hdfs_namenode_host.c_str());
   hdfsBuilderSetNameNodePort(builder, FLAGS_hdfs_namenode_port);
   // hdfsBuilderConnect releases builder.
   hdfs_ = hdfsBuilderConnect(builder);
-  DEBUG_ASSERT(hdfs_ != nullptr);
+  DCHECK(hdfs_ != nullptr);
 }
 
 FileManagerHdfs::~FileManagerHdfs() {
-  int status = hdfsDisconnect(hdfs_);
-  DEBUG_ASSERT(status == 0);
+  CHECK_EQ(0, hdfsDisconnect(hdfs_));
 }
 
 block_id_counter FileManagerHdfs::getMaxUsedBlockCounter(const block_id_domain block_domain) const {
@@ -97,7 +96,7 @@
   hdfsFileInfo *file_infos = hdfsListDirectory(hdfs_, storage_path_.c_str(), &num_files);
   if (file_infos == nullptr) {
     if (errno != ENOENT) {
-      LOG_WARNING("Failed to list file info with error: " << strerror(errno));
+      LOG(ERROR) << "Failed to list file info with error: " << strerror(errno);
     }
     return 0;
   }
@@ -113,9 +112,9 @@
     // NOTE(zuyu): mName looks like
     // "/user/<username>/<storage_path_>/qsblk_<block_domain>_[0-9]*.qsb".
     const char *filename = std::strrchr(file_infos[i].mName, '/');
-    if (filename != nullptr
-        && sscanf(filename, filename_pattern.c_str(), &counter) == 1
-        && counter > counter_max) {
+    if (filename != nullptr &&
+        sscanf(filename, filename_pattern.c_str(), &counter) == 1 &&
+        counter > counter_max) {
       counter_max = counter;
     }
   }
@@ -126,12 +125,12 @@
 }
 
 size_t FileManagerHdfs::numSlots(const block_id block) const {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   hdfsFileInfo *file_info = hdfsGetPathInfo(hdfs_, filename.c_str());
   if (file_info == nullptr) {
     if (errno != ENOENT) {
-      LOG_WARNING("Failed to get size of file " << filename << " with error: " << strerror(errno));
+      LOG(ERROR) << "Failed to get size of file " << filename << " with error: " << strerror(errno);
     }
     return 0;
   }
@@ -147,12 +146,12 @@
 }
 
 bool FileManagerHdfs::deleteBlockOrBlob(const block_id block) {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   if ((hdfsDelete(hdfs_, filename.c_str(), 0) == 0) || (errno == ENOENT)) {
     return true;
   } else {
-    LOG_WARNING("Failed to delete file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to delete file " << filename << " with error: " << strerror(errno);
     return false;
   }
 }
@@ -160,10 +159,10 @@
 bool FileManagerHdfs::readBlockOrBlob(const block_id block,
                                       void *buffer,
                                       const size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   hdfsFile file_handle = hdfsOpenFile(hdfs_,
                                       filename.c_str(),
@@ -172,7 +171,7 @@
                                       FLAGS_hdfs_num_replications,
                                       kSlotSizeBytes);
   if (file_handle == nullptr) {
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
@@ -183,17 +182,17 @@
       bytes_total += bytes;
     } else if (bytes == -1) {
       if (errno != EINTR) {
-        LOG_WARNING("Failed to read file " << filename << " with error: " << strerror(errno));
+        LOG(ERROR) << "Failed to read file " << filename << " with error: " << strerror(errno);
         break;
       }
     } else {
-      LOG_WARNING("Failed to read file " << filename << " since EOF was reached unexpectedly");
+      LOG(ERROR) << "Failed to read file " << filename << " since EOF was reached unexpectedly";
       break;
     }
   }
 
   if (hdfsCloseFile(hdfs_, file_handle) != 0) {
-    LOG_WARNING("Failed to close file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to close file " << filename << " with error: " << strerror(errno);
   }
 
   return (bytes_total == length);
@@ -202,10 +201,10 @@
 bool FileManagerHdfs::writeBlockOrBlob(const block_id block,
                                        const void *buffer,
                                        const size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   hdfsFile file_handle = hdfsOpenFile(hdfs_,
                                       filename.c_str(),
@@ -214,7 +213,7 @@
                                       FLAGS_hdfs_num_replications,
                                       kSlotSizeBytes);
   if (file_handle == nullptr) {
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
@@ -224,17 +223,17 @@
     if (bytes > 0) {
       bytes_total += bytes;
     } else if (bytes == -1) {
-      LOG_WARNING("Failed to write file " << filename << " with error: " << strerror(errno));
+      LOG(ERROR) << "Failed to write file " << filename << " with error: " << strerror(errno);
       break;
     }
   }
 
   if (hdfsSync(hdfs_, file_handle) != 0) {
-    LOG_WARNING("Failed to sync file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to sync file " << filename << " with error: " << strerror(errno);
   }
 
   if (hdfsCloseFile(hdfs_, file_handle) != 0) {
-    LOG_WARNING("Failed to close file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to close file " << filename << " with error: " << strerror(errno);
   }
 
   return (bytes_total == length);
diff --git a/storage/FileManagerPosix.cpp b/storage/FileManagerPosix.cpp
index 3bfb69d..0346f0d 100644
--- a/storage/FileManagerPosix.cpp
+++ b/storage/FileManagerPosix.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -35,9 +35,10 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageConstants.hpp"
 #include "storage/StorageErrors.hpp"
-#include "utility/Macros.hpp"
 #include "utility/StringUtil.hpp"
 
+#include "glog/logging.h"
+
 using std::size_t;
 using std::sscanf;
 using std::strerror;
@@ -64,9 +65,9 @@
   filename_pattern.append(".qsb");
 
   block_id_counter counter_max = 0, counter;
-  if (glob_result.gl_pathc > 0
-      && sscanf(glob_result.gl_pathv[glob_result.gl_pathc - 1], filename_pattern.c_str(), &counter) == 1
-      && counter > counter_max) {
+  if (glob_result.gl_pathc > 0 &&
+      sscanf(glob_result.gl_pathv[glob_result.gl_pathc - 1], filename_pattern.c_str(), &counter) == 1 &&
+      counter > counter_max) {
     counter_max = counter;
   }
 
@@ -75,12 +76,12 @@
 }
 
 size_t FileManagerPosix::numSlots(const block_id block) const {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   struct stat file_stat;
   if (stat(filename.c_str(), &file_stat) == -1) {
     if (errno != ENOENT) {
-      LOG_WARNING("Failed to retrieve info about file " << filename << " with error: " << strerror(errno));
+      LOG(ERROR) << "Failed to retrieve info about file " << filename << " with error: " << strerror(errno);
     }
     return 0;
   }
@@ -93,12 +94,12 @@
 }
 
 bool FileManagerPosix::deleteBlockOrBlob(const block_id block) {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   if ((unlink(filename.c_str()) == 0) || (errno == ENOENT)) {
     return true;
   } else {
-    LOG_WARNING("Failed to delete file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to delete file " << filename << " with error: " << strerror(errno);
     return false;
   }
 }
@@ -106,35 +107,35 @@
 bool FileManagerPosix::readBlockOrBlob(const block_id block,
                                        void *buffer,
                                        const std::size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
-  int fd = open(filename.c_str(), O_RDONLY);
+  const int fd = open(filename.c_str(), O_RDONLY);
   if (fd == -1) {
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
   size_t bytes_total = 0;
   while (bytes_total < length) {
-    ssize_t bytes = read(fd, static_cast<char*>(buffer) + bytes_total, length - bytes_total);
+    const ssize_t bytes = read(fd, static_cast<char*>(buffer) + bytes_total, length - bytes_total);
     if (bytes > 0) {
       bytes_total += bytes;
     } else if (bytes == -1) {
       if (errno != EINTR) {
-        LOG_WARNING("Failed to read file " << filename << " with error: " << strerror(errno));
+        LOG(ERROR) << "Failed to read file " << filename << " with error: " << strerror(errno);
         break;
       }
     } else {
-      LOG_WARNING("Failed to read file " << filename << " since EOF was reached unexpectedly");
+      LOG(ERROR) << "Failed to read file " << filename << " since EOF was reached unexpectedly";
       break;
     }
   }
 
   if (close(fd) != 0) {
-    LOG_WARNING("Failed to close file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to close file " << filename << " with error: " << strerror(errno);
   }
 
   return (bytes_total == length);
@@ -143,34 +144,34 @@
 bool FileManagerPosix::writeBlockOrBlob(const block_id block,
                                         const void *buffer,
                                         const std::size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
-  int fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
+  const int fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
   if (fd == -1) {
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
   size_t bytes_total = 0;
   while (bytes_total < length) {
-    ssize_t bytes = write(fd, static_cast<const char*>(buffer) + bytes_total, length - bytes_total);
+    const ssize_t bytes = write(fd, static_cast<const char*>(buffer) + bytes_total, length - bytes_total);
     if (bytes > 0) {
       bytes_total += bytes;
     } else if (bytes == -1 && errno != EINTR) {
-      LOG_WARNING("Failed to write file " << filename << " with error: " << strerror(errno));
+      LOG(ERROR) << "Failed to write file " << filename << " with error: " << strerror(errno);
       break;
     }
   }
 
   if (fsync(fd) != 0) {
-    LOG_WARNING("Failed to sync file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to sync file " << filename << " with error: " << strerror(errno);
   }
 
   if (close(fd) != 0) {
-    LOG_WARNING("Failed to close file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to close file " << filename << " with error: " << strerror(errno);
   }
 
   return (bytes_total == length);
diff --git a/storage/FileManagerPosix.hpp b/storage/FileManagerPosix.hpp
index d8c1649..b2aea27 100644
--- a/storage/FileManagerPosix.hpp
+++ b/storage/FileManagerPosix.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -37,9 +37,9 @@
 class FileManagerPosix : public FileManager {
  public:
   explicit FileManagerPosix(const std::string &storage_path)
-      : FileManager(storage_path) { }
+      : FileManager(storage_path) {}
 
-  ~FileManagerPosix() override { }
+  ~FileManagerPosix() override {}
 
   bool deleteBlockOrBlob(const block_id block) override;
 
diff --git a/storage/FileManagerWindows.cpp b/storage/FileManagerWindows.cpp
index 9e3d4c8..b1763ec 100644
--- a/storage/FileManagerWindows.cpp
+++ b/storage/FileManagerWindows.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -34,9 +34,10 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageConstants.hpp"
 #include "storage/StorageErrors.hpp"
-#include "utility/Macros.hpp"
 #include "utility/StringUtil.hpp"
 
+#include "glog/logging.h"
+
 using std::size_t;
 using std::strerror;
 using std::string;
@@ -60,7 +61,7 @@
   if (find_handle == INVALID_HANDLE_VALUE) {
     error_code = GetLastError();
     if (error_code != ERROR_FILE_NOT_FOUND) {
-      LOG_WARNING("Failed to retrieve blockfiles with error_code: " << error_code);
+      LOG(ERROR) << "Failed to retrieve blockfiles with error_code: " << error_code;
     }
     return 0;
   }
@@ -73,32 +74,32 @@
 
   block_id_counter counter_max = 0, counter;
   do {
-    if (sscanf(find_data.cFileName, filename_pattern.c_str(), &counter) == 1
-        && counter > counter_max) {
+    if (sscanf(find_data.cFileName, filename_pattern.c_str(), &counter) == 1 &&
+        counter > counter_max) {
       counter_max = counter;
     }
   } while (FindNextFile(find_handle, &find_data) != 0);
 
   error_code = GetLastError();
   if (error_code != ERROR_NO_MORE_FILES) {
-    LOG_WARNING("Failed to FindNextFile with error_code: " << error_code);
+    LOG(ERROR) << "Failed to FindNextFile with error_code: " << error_code;
   }
 
   if (FindClose(find_handle) == 0) {
-    LOG_WARNING("Failed to close the file with error_code: " << GetLastError());
+    LOG(ERROR) << "Failed to close the file with error_code: " << GetLastError();
   }
 
   return counter_max;
 }
 
 size_t FileManagerWindows::numSlots(const block_id block) const {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
   WIN32_FILE_ATTRIBUTE_DATA file_stat;
 
   if (!GetFileAttributesEx(filename.c_str(), GetFileExInfoStandard, &file_stat)) {
     DWORD error_code = GetLastError();
     if (error_code != ERROR_FILE_NOT_FOUND) {
-      LOG_WARNING("Failed to retrieve info about file " << filename << " with error_code: " << error_code);
+      LOG(ERROR) << "Failed to retrieve info about file " << filename << " with error_code: " << error_code;
     }
     return 0;
   }
@@ -113,28 +114,28 @@
 }
 
 bool FileManagerWindows::deleteBlockOrBlob(const block_id block) {
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   if ((DeleteFile(filename.c_str()) != 0) || (GetLastError() == ERROR_FILE_NOT_FOUND)) {
     return true;
   } else {
-    LOG_WARNING("Failed to delete file " << filename << " with error_code: " << GetLastError());
+    LOG(ERROR) << "Failed to delete file " << filename << " with error_code: " << GetLastError();
   }
 }
 
 bool FileManagerWindows::readBlockOrBlob(const block_id block,
                                          void *buffer,
                                          const size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   FILE *file = fopen(filename.c_str(), "rb");
   if (file == nullptr) {
     // Note: On most, but not all, library implementations, the errno variable
     //       is set to a system-specific error code on failure.
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
@@ -142,16 +143,16 @@
   const bool result_is_ok = (bytes == length);
   if (!result_is_ok) {
     if (std::feof(file)) {
-      LOG_WARNING("Failed to read file " << filename << " since EOF was reached unexpectedly");
+      LOG(ERROR) << "Failed to read file " << filename << " since EOF was reached unexpectedly";
     } else {
-      LOG_WARNING("Failed to read file " << filename << " with error: " << strerror(ferror(file)));
+      LOG(ERROR) << "Failed to read file " << filename << " with error: " << strerror(ferror(file));
       clearerr(file);
     }
   }
 
   if (fclose(file)) {
     // Note: fclose does not set errno on failure.
-    LOG_WARNING("Failed to close file " << filename);
+    LOG(ERROR) << "Failed to close file " << filename;
   }
 
   return result_is_ok;
@@ -160,37 +161,37 @@
 bool FileManagerWindows::writeBlockOrBlob(const block_id block,
                                           const void *buffer,
                                           const size_t length) {
-  DEBUG_ASSERT(buffer);
-  DEBUG_ASSERT(length % kSlotSizeBytes == 0);
+  DCHECK(buffer != nullptr);
+  DCHECK_EQ(0u, length % kSlotSizeBytes);
 
-  string filename(blockFilename(block));
+  const string filename(blockFilename(block));
 
   FILE *file = fopen(filename.c_str(), "wb");
   if (file == nullptr) {
     // Note: On most, but not all, library implementations, the errno variable
     //       is set to a system-specific error code on failure.
-    LOG_WARNING("Failed to open file " << filename << " with error: " << strerror(errno));
+    LOG(ERROR) << "Failed to open file " << filename << " with error: " << strerror(errno);
     return false;
   }
 
   const size_t bytes = std::fwrite(buffer, sizeof(char), length, file);
   const bool result_is_ok = (bytes == length);
   if (!result_is_ok) {
-    LOG_WARNING("Failed to write file " << filename << " with error: " << strerror(ferror(file)));
+    LOG(ERROR) << "Failed to write file " << filename << " with error: " << strerror(ferror(file));
     clearerr(file);
   }
 
   if (fflush(file)) {
-    LOG_WARNING("Failed to flush file " << filename << " with error: " << strerror(ferror(file)));
+    LOG(ERROR) << "Failed to flush file " << filename << " with error: " << strerror(ferror(file));
   }
 
   if (!FlushFileBuffers(reinterpret_cast<HANDLE>(_get_osfhandle(_fileno(file))))) {
-    LOG_WARNING("Failed to re-flush file " << filename << " with error: " << strerror(ferror(file)));
+    LOG(ERROR) << "Failed to re-flush file " << filename << " with error: " << strerror(ferror(file));
   }
 
   if (fclose(file)) {
     // Note: fclose does not set errno on failure.
-    LOG_WARNING("Failed to close file " << filename);
+    LOG(ERROR) << "Failed to close file " << filename;
   }
 
   return result_is_ok;
diff --git a/storage/FileManagerWindows.hpp b/storage/FileManagerWindows.hpp
index 085b418..e7b3aa8 100644
--- a/storage/FileManagerWindows.hpp
+++ b/storage/FileManagerWindows.hpp
@@ -37,9 +37,9 @@
 class FileManagerWindows : public FileManager {
  public:
   explicit FileManagerWindows(const std::string &storage_path)
-      : FileManager(storage_path) { }
+      : FileManager(storage_path) {}
 
-  ~FileManagerWindows() override { }
+  ~FileManagerWindows() override {}
 
   bool deleteBlockOrBlob(const block_id block) override;