Refactored command execution in the distributed version.
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 49b7dc1..63f3259 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -123,7 +123,7 @@
   data_exchanger_.set_storage_manager(storage_manager_.get());
   data_exchanger_.start();
 
-  // Prepare for submitting a query.
+  // Prepare for submitting a query or a command.
   bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
@@ -131,8 +131,6 @@
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
 
-  // Prepare for submitting a command.
-  bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
   bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
 }
 
@@ -166,51 +164,37 @@
         }
 
         if (statement.getStatementType() == ParseStatement::kCommand) {
-          const ParseCommand &command = static_cast<const ParseCommand &>(statement);
-          const std::string &command_str = command.command()->value();
+          const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+          const std::string &command = parse_command.command()->value();
           try {
-            if (command_str == C::kAnalyzeCommand) {
+            if (command == C::kAnalyzeCommand) {
               // TODO(zuyu): support '\analyze'.
-              THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
-            } else if (command_str != C::kDescribeDatabaseCommand &&
-                       command_str != C::kDescribeTableCommand) {
-              THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+              THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
+            } else if (command != C::kDescribeDatabaseCommand &&
+                       command != C::kDescribeTableCommand) {
+              THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
             }
           } catch (const SqlError &error) {
             fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
             reset_parser = true;
             break;
           }
-
-          DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
-                     << "') to Conductor";
-          S::CommandMessage proto;
-          proto.set_command(*command_string);
-
-          const size_t proto_length = proto.ByteSize();
-          char *proto_bytes = static_cast<char*>(malloc(proto_length));
-          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-          TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
-          free(proto_bytes);
-
-          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
-        } else {
-          DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
-                     << "') to Conductor";
-          S::SqlQueryMessage proto;
-          proto.set_sql_query(*command_string);
-
-          const size_t proto_length = proto.ByteSize();
-          char *proto_bytes = static_cast<char*>(malloc(proto_length));
-          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-          TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
-          free(proto_bytes);
-
-          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
         }
 
+        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                   << "') to Conductor";
+        S::SqlQueryMessage proto;
+        proto.set_sql_query(*command_string);
+
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+        free(proto_bytes);
+
+        QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
+
         start = std::chrono::steady_clock::now();
 
         const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index b877b04..1b8bfb2 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -95,21 +95,18 @@
   bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
 
-  bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
-  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
-
   bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
 
+
   bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
 
   block_locator_ = make_unique<BlockLocator>(&bus_);
   block_locator_->start();
 
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_,
-                                             std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
-                                             catalog_database_);
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
   foreman_->start();
 }
 
@@ -132,14 +129,6 @@
             QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
         break;
       }
-      case kCommandMessage: {
-        S::CommandMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-        DLOG(INFO) << "Conductor received the following command: " << proto.command();
-
-        processCommandMessage(sender, new string(move(proto.command())));
-        break;
-      }
       case kSqlQueryMessage: {
         S::SqlQueryMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -161,91 +150,59 @@
   }
 }
 
-void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
-  parser_wrapper_.feedNextBuffer(command_string);
-  ParseResult parse_result = parser_wrapper_.getNextStatement();
-
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+  SqlParserWrapper parser_wrapper;
+  parser_wrapper.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper.getNextStatement();
   CHECK(parse_result.condition == ParseResult::kSuccess)
       << "Any syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
-  DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
-
-  const ParseCommand &command = static_cast<const ParseCommand &>(statement);
-  const PtrVector<ParseString> &arguments = *(command.arguments());
-  const string &command_str = command.command()->value();
-
-  string command_response;
 
   try {
-    if (command_str == C::kDescribeDatabaseCommand) {
-      command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
-    } else if (command_str == C::kDescribeTableCommand) {
-      if (arguments.empty()) {
+    if (statement.getStatementType() == ParseStatement::kCommand) {
+      const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+      const PtrVector<ParseString> &arguments = *(parse_command.arguments());
+      const string &command = parse_command.command()->value();
+
+      string command_response;
+      if (command == C::kDescribeDatabaseCommand) {
         command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
-      } else {
-        command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+      } else if (command == C::kDescribeTableCommand) {
+        if (arguments.empty()) {
+          command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+        } else {
+          command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+        }
       }
+
+      S::CommandResponseMessage proto;
+      proto.set_command_response(command_response);
+
+      const size_t proto_length = proto.ByteSize();
+      char *proto_bytes = static_cast<char*>(malloc(proto_length));
+      CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+      TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+      free(proto_bytes);
+
+      DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+                 << "') to Distributed CLI " << sender;
+      CHECK(MessageBus::SendStatus::kOK ==
+          QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+    } else {
+      auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+                                                   sender,
+                                                   statement.getPriority());
+      query_processor_->generateQueryHandle(statement, query_handle.get());
+      DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+          conductor_client_id_,
+          foreman_->getBusClientID(),
+          query_handle.release(),
+          &bus_);
     }
-  } catch (const SqlError &command_error) {
-    // Set the query execution status along with the error message.
-    S::QueryExecutionErrorMessage proto;
-    proto.set_error_message(command_error.formatMessage(*command_string));
-
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kQueryExecutionErrorMessage);
-    free(proto_bytes);
-
-    DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
-               << kQueryExecutionErrorMessage
-               << "') to Distributed CLI " << sender;
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-  }
-
-  S::CommandResponseMessage proto;
-  proto.set_command_response(command_response);
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
-             << "') to Distributed CLI " << sender;
-  CHECK(MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-}
-
-void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
-  parser_wrapper_.feedNextBuffer(command_string);
-  ParseResult parse_result = parser_wrapper_.getNextStatement();
-
-  CHECK(parse_result.condition == ParseResult::kSuccess)
-      << "Any SQL syntax error should be addressed in the DistributedCli.";
-
-  const ParseStatement &statement = *parse_result.parsed_statement;
-  DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
-
-  try {
-    auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
-                                                 sender,
-                                                 statement.getPriority());
-    query_processor_->generateQueryHandle(statement, query_handle.get());
-    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
-    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-        conductor_client_id_,
-        foreman_->getBusClientID(),
-        query_handle.release(),
-        &bus_);
   } catch (const SqlError &sql_error) {
     // Set the query execution status along with the error message.
     S::QueryExecutionErrorMessage proto;
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e7e003f..0c0f7e5 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -24,7 +24,6 @@
 #include <string>
 
 #include "cli/distributed/Role.hpp"
-#include "parser/SqlParserWrapper.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
@@ -61,12 +60,8 @@
   void run() override;
 
  private:
-  void processCommandMessage(const tmb::client_id sender, std::string *command_string);
-
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
-  SqlParserWrapper parser_wrapper_;
-
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 47246d8..a45e8df 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,10 +81,6 @@
   required uint64 shiftboss_index = 1;
 }
 
-message CommandMessage {
-  required string command = 1;
-}
-
 message SqlQueryMessage {
   required string sql_query = 1;
 }
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0fd0bdf..a49de5e 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,9 +90,7 @@
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
 
-  // From CLI to Conductor.
-  kCommandMessage,
-  kSqlQueryMessage,
+  kSqlQueryMessage, // From CLI to Conductor.
 
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.