Added the initial distributed prototype.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 882d1da..2b76e06 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -708,3 +708,88 @@
 
 # Link against other required system and third-party libraries.
 target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_NATIVENET)
+  include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include)
+
+  # Build the quickstep_distributed_cli_shell executable.
+  add_executable (quickstep_distributed_cli_shell cli/QuickstepCliDistributed.cpp)
+  # Link against direct deps (will transitively pull in everything needed).
+  target_link_libraries(quickstep_distributed_cli_shell
+                        gflags_nothreads-static
+                        glog
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_Catalog_proto
+                        quickstep_cli_LineReader
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
+                        quickstep_utility_Macros
+                        tmb
+                        ${GRPCPLUSPLUS_LIBRARIES})
+  if (ENABLE_HDFS)
+    target_link_libraries(quickstep_distributed_cli_shell
+                          quickstep_storage_FileManagerHdfs)
+  endif()
+
+  # Link against other required system and third-party libraries.
+  target_link_libraries(quickstep_distributed_cli_shell
+                        ${LIBS})
+
+  # Build the quickstep_distributed_conductor executable.
+  add_executable (quickstep_distributed_conductor cli/ConductorDistributed.cpp)
+  # Link against direct deps (will transitively pull in everything needed).
+  target_link_libraries(quickstep_distributed_conductor
+                        gflags_nothreads-static
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_catalog_Catalog_proto
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_QueryProcessor
+                        quickstep_utility_SqlError
+                        tmb
+                        ${GRPCPLUSPLUS_LIBRARIES})
+  if (ENABLE_HDFS)
+    target_link_libraries(quickstep_distributed_conductor
+                          quickstep_storage_FileManagerHdfs)
+  endif()
+
+  # Link against other required system and third-party libraries.
+  target_link_libraries(quickstep_distributed_conductor ${LIBS})
+
+  # Build the quickstep_distributed_worker executable.
+  add_executable (quickstep_distributed_worker cli/WorkerDistributed.cpp)
+  # Link against direct deps (will transitively pull in everything needed).
+  target_link_libraries(quickstep_distributed_worker
+                        gflags_nothreads-static
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_cli_InputParserUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_storage_PreloaderThread
+                        quickstep_storage_StorageManager
+                        quickstep_utility_PtrVector
+                        tmb
+                        ${GRPCPLUSPLUS_LIBRARIES})
+  if (ENABLE_HDFS)
+    target_link_libraries(quickstep_distributed_worker
+                          quickstep_storage_FileManagerHdfs)
+  endif()
+
+  # Link against other required system and third-party libraries.
+  target_link_libraries(quickstep_distributed_worker ${LIBS})
+endif()
diff --git a/cli/ConductorDistributed.cpp b/cli/ConductorDistributed.cpp
new file mode 100644
index 0000000..111a9a0
--- /dev/null
+++ b/cli/ConductorDistributed.cpp
@@ -0,0 +1,282 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep Conductor */
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/SqlError.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+
+using quickstep::QueryExecutionUtil;
+using quickstep::QueryProcessor;
+using quickstep::kDistributedCliRegistrationResponseMessage;
+using quickstep::kDistributedCliRegistrationMessage;
+using quickstep::kQueryExecutionErrorMessage;
+using quickstep::kQueryExecutionSuccessMessage;
+using quickstep::kSqlQueryMessage;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+                                 std::int32_t value) {
+  if (value > 0 && value < 65536) {
+    return true;
+  } else {
+    std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+              "Filesystem path for quickstep database storage.");
+
+}  // namespace quickstep
+
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+  grpc_init();
+
+  quickstep::SqlParserWrapper parser_wrapper;
+
+  // Setup the paths used by StorageManager.
+  string fixed_storage_path(quickstep::FLAGS_storage_path);
+  if (!fixed_storage_path.empty()
+      && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+    fixed_storage_path.push_back(quickstep::kPathSeparator);
+  }
+
+  string catalog_path(fixed_storage_path);
+  catalog_path.append("catalog.pb.bin");
+
+  // Setup QueryProcessor, including CatalogDatabase and StorageManager.
+  std::unique_ptr<QueryProcessor> query_processor;
+  try {
+    query_processor.reset(new QueryProcessor(catalog_path, fixed_storage_path));
+  } catch (const std::exception &e) {
+    LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what();
+  } catch (...) {
+    LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+  }
+
+  tmb::NativeNetClientMessageBus bus;
+  bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+  bus.Initialize();
+  bus.ResetBus();
+
+  const client_id conductor_id = bus.Connect();
+  LOG(INFO) << "Conductor TMB Client ID: " << conductor_id;
+
+  bus.RegisterClientAsReceiver(conductor_id, kDistributedCliRegistrationMessage);
+  bus.RegisterClientAsSender(conductor_id, kDistributedCliRegistrationResponseMessage);
+
+  bus.RegisterClientAsReceiver(conductor_id, kSqlQueryMessage);
+  bus.RegisterClientAsSender(conductor_id, kQueryExecutionSuccessMessage);
+  bus.RegisterClientAsSender(conductor_id, kQueryExecutionErrorMessage);
+
+  quickstep::ForemanDistributed foreman(&bus, query_processor->getDefaultDatabase());
+
+  for (;;) {
+    AnnotatedMessage annotated_message(bus.Receive(conductor_id, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const client_id sender = annotated_message.sender;
+
+    LOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+              << "' message from client " << sender;
+    switch (tagged_message.message_type()) {
+      case kDistributedCliRegistrationMessage: {
+        quickstep::serialization::EmptyMessage proto;
+
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+        TaggedMessage message(static_cast<const void*>(proto_bytes),
+                              proto_length,
+                              kDistributedCliRegistrationResponseMessage);
+        free(proto_bytes);
+
+        LOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+                  << kDistributedCliRegistrationResponseMessage
+                  << "') to Distributed CLI " << sender;
+        QueryExecutionUtil::SendTMBMessage(&bus,
+                                           conductor_id,
+                                           sender,
+                                           move(message));
+        break;
+      }
+      case kSqlQueryMessage: {
+        quickstep::serialization::SqlQueryMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        LOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+        string *command_string = new string(move(proto.sql_query()));
+
+        parser_wrapper.feedNextBuffer(command_string);
+        quickstep::ParseResult parse_result = parser_wrapper.getNextStatement();
+
+        unique_ptr<quickstep::QueryHandle> query_handle;
+        try {
+          query_handle.reset(query_processor->generateQueryHandle(*parse_result.parsed_statement));
+        } catch (const quickstep::SqlError &sql_error) {
+          // Set the query execution status along with the error message.
+          quickstep::serialization::QueryExecutionErrorMessage proto;
+          proto.set_error_message(sql_error.formatMessage(*command_string));
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                proto_length,
+                                kQueryExecutionErrorMessage);
+          free(proto_bytes);
+
+          LOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+                    << kQueryExecutionErrorMessage
+                    << "') to Distributed CLI " << sender;
+          QueryExecutionUtil::SendTMBMessage(&bus,
+                                             conductor_id,
+                                             sender,
+                                             move(message));
+          break;
+        }
+
+        try {
+          foreman.setQueryHandle(query_handle.get());
+          foreman.start();
+          foreman.join();
+
+          quickstep::serialization::QueryExecutionSuccessMessage proto;
+
+          const quickstep::CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+          if (query_result_relation) {
+            proto.mutable_query_result_relation()->MergeFrom(query_result_relation->getProto());
+          }
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                proto_length,
+                                kQueryExecutionSuccessMessage);
+          free(proto_bytes);
+
+          LOG(INFO) << "Conductor sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage
+                    << "') to Distributed CLI " << sender;
+          QueryExecutionUtil::SendTMBMessage(&bus,
+                                             conductor_id,
+                                             sender,
+                                             move(message));
+
+          if (query_result_relation) {
+            const quickstep::relation_id query_result_relation_id = query_result_relation->getID();
+            DCHECK(query_processor->getDefaultDatabase()->hasRelationWithId(query_result_relation_id));
+            query_processor->getDefaultDatabase()->dropRelationById(query_result_relation_id);
+          }
+
+          query_processor->saveCatalog();
+        } catch (const std::exception &e) {
+          string error_message("QUERY EXECUTION ERROR: ");
+          error_message.append(e.what());
+          error_message.push_back('\n');
+
+          quickstep::serialization::QueryExecutionErrorMessage proto;
+          proto.set_error_message(error_message);
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+          TaggedMessage message(static_cast<const void*>(proto_bytes),
+                                proto_length,
+                                kQueryExecutionErrorMessage);
+          free(proto_bytes);
+
+          LOG(INFO) << "Conductor (on behalf of Executor) sent QueryExecutionErrorMessage (typed '"
+                    << kQueryExecutionErrorMessage
+                    << "') to Distributed CLI " << sender;
+          QueryExecutionUtil::SendTMBMessage(&bus,
+                                             conductor_id,
+                                             sender,
+                                             move(message));
+        }
+        break;
+      }
+      default: {
+        LOG(FATAL) << "Unknown TMB message type";
+      }
+    }
+  }
+
+  return 0;
+}
diff --git a/cli/QuickstepCliDistributed.cpp b/cli/QuickstepCliDistributed.cpp
new file mode 100644
index 0000000..5bc6ccf
--- /dev/null
+++ b/cli/QuickstepCliDistributed.cpp
@@ -0,0 +1,293 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+/* A standalone command-line interface to distributed QuickStep */
+
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/LineReader.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include "storage/FileManagerHdfs.hpp"
+#endif
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::fprintf;
+using std::free;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+
+using quickstep::ParseResult;
+using quickstep::QueryExecutionUtil;
+using quickstep::block_id;
+using quickstep::kDistributedCliRegistrationResponseMessage;
+using quickstep::kDistributedCliRegistrationMessage;
+using quickstep::kQueryExecutionErrorMessage;
+using quickstep::kQueryExecutionSuccessMessage;
+using quickstep::kSqlQueryMessage;
+
+using tmb::Address;
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+              "Filesystem path for quickstep database storage.");
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+                                  std::int32_t value) {
+  if (value > 0 && value < 65536) {
+    return true;
+  } else {
+    std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+}  // namespace quickstep
+
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+  grpc_init();
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (quickstep::FLAGS_use_hdfs) {
+    LOG(INFO) << "Using HDFS as the default persistent storage, with namenode at "
+              << quickstep::FLAGS_hdfs_namenode_host << ":"
+              << quickstep::FLAGS_hdfs_namenode_port << " and block replication factor "
+              << quickstep::FLAGS_hdfs_num_replications << "\n";
+  }
+#endif
+
+  // Setup the paths used by StorageManager.
+  string fixed_storage_path(quickstep::FLAGS_storage_path);
+  if (!fixed_storage_path.empty()
+      && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+    fixed_storage_path.push_back(quickstep::kPathSeparator);
+  }
+
+  quickstep::StorageManager storage_manager(fixed_storage_path);
+
+  tmb::NativeNetClientMessageBus bus;
+  bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+  bus.Initialize();
+
+  const client_id cli_id = bus.Connect();
+  LOG(INFO) << "CLI TMB Client ID: " << cli_id;
+
+  bus.RegisterClientAsSender(cli_id, kDistributedCliRegistrationMessage);
+  bus.RegisterClientAsReceiver(cli_id, kDistributedCliRegistrationResponseMessage);
+
+  LOG(INFO) << "Distributed CLI sent DistributedCliRegistrationMessage (typed '"
+            << kDistributedCliRegistrationMessage
+            << "') to all";
+
+  Address all_addresses;
+  all_addresses.All(true);
+
+  // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+  tmb::MessageStyle style;
+
+  quickstep::serialization::EmptyMessage proto;
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage cli_reg_message(static_cast<const void*>(proto_bytes),
+                                proto_length,
+                                kDistributedCliRegistrationMessage);
+  free(proto_bytes);
+
+  MessageBus::SendStatus send_status =
+      bus.Send(cli_id, all_addresses, style, move(cli_reg_message));
+  DCHECK(send_status == MessageBus::SendStatus::kOK);
+
+  // Wait for Conductor to response.
+  AnnotatedMessage cli_reg_response_message(bus.Receive(cli_id, 0, true));
+  DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+            cli_reg_response_message.tagged_message.message_type());
+  const client_id conductor_id = cli_reg_response_message.sender;
+
+  LOG(INFO) << "Distributed CLI received typed '" << kDistributedCliRegistrationResponseMessage
+            << "' message from client " << conductor_id;
+
+  // Prepare for executing a query.
+  bus.RegisterClientAsSender(cli_id, kSqlQueryMessage);
+  bus.RegisterClientAsReceiver(cli_id, kQueryExecutionSuccessMessage);
+  bus.RegisterClientAsReceiver(cli_id, kQueryExecutionErrorMessage);
+
+  LineReaderImpl line_reader("distributed_quickstep> ",
+                             "                  ...> ");
+  quickstep::SqlParserWrapper parser_wrapper;
+  std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+  for (;;) {
+    string *command_string = new string();
+    *command_string = line_reader.getNextCommand();
+    if (command_string->size() == 0) {
+      delete command_string;
+      break;
+    }
+
+    parser_wrapper.feedNextBuffer(command_string);
+
+    bool quitting = false;
+    for (;;) {
+      ParseResult result = parser_wrapper.getNextStatement();
+      if (result.condition == ParseResult::kSuccess) {
+        if (result.parsed_statement->getStatementType() == quickstep::ParseStatement::kQuit) {
+          quitting = true;
+          break;
+        }
+
+        LOG(INFO) << "Distributed CLI sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                  << "') to Foreman";
+        quickstep::serialization::SqlQueryMessage proto;
+        proto.set_sql_query(*command_string);
+
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+                                        proto_length,
+                                        kSqlQueryMessage);
+        free(proto_bytes);
+
+        QueryExecutionUtil::SendTMBMessage(&bus,
+                                           cli_id,
+                                           conductor_id,
+                                           move(sql_query_message));
+
+        start = std::chrono::steady_clock::now();
+
+        AnnotatedMessage annotated_message(bus.Receive(cli_id, 0, true));
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+        LOG(INFO) << "Distributed CLI received typed '" << tagged_message.message_type()
+                  << "' message from client " << annotated_message.sender;
+        switch (tagged_message.message_type()) {
+          case kQueryExecutionSuccessMessage: {
+            end = std::chrono::steady_clock::now();
+
+            quickstep::serialization::QueryExecutionSuccessMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            if (proto.has_query_result_relation()) {
+              quickstep::CatalogRelation query_result_relation(proto.query_result_relation());
+
+              quickstep::PrintToScreen::PrintRelation(query_result_relation,
+                                                      &storage_manager,
+                                                      stdout);
+
+              const std::vector<block_id> blocks(query_result_relation.getBlocksSnapshot());
+              for (const block_id block : blocks) {
+                storage_manager.deleteBlockOrBlobFile(block);
+              }
+            }
+
+            printf("Execution time: %g seconds\nQuery Complete\n", std::chrono::duration<double>(end - start).count());
+            break;
+          }
+          case kQueryExecutionErrorMessage: {
+            quickstep::serialization::QueryExecutionErrorMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            fprintf(stderr, "%s", proto.error_message().c_str());
+            break;
+          }
+          default: {
+            LOG(ERROR) << "Unknown TMB message type";
+          }
+        }
+      } else {
+        if (result.condition == ParseResult::kError) {
+          fprintf(stderr, "%s", result.error_message.c_str());
+        }
+        break;
+      }
+    }
+
+    if (quitting) {
+      break;
+    }
+  }
+
+  bus.Disconnect(cli_id);
+
+  return 0;
+}
diff --git a/cli/WorkerDistributed.cpp b/cli/WorkerDistributed.cpp
new file mode 100644
index 0000000..ff49aec
--- /dev/null
+++ b/cli/WorkerDistributed.cpp
@@ -0,0 +1,210 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep Worker */
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "cli/InputParserUtil.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include "storage/FileManagerHdfs.hpp"
+#endif
+
+#include "storage/PreloaderThread.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/PtrVector.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/native_net_client_message_bus.h"
+
+using std::fflush;
+using std::fprintf;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using quickstep::InputParserUtil;
+using quickstep::Worker;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+#ifdef QUICKSTEP_OS_WINDOWS
+static constexpr char kPathSeparator = '\\';
+static constexpr char kDefaultStoragePath[] = "qsstor\\";
+#else
+static constexpr char kPathSeparator = '/';
+static constexpr char kDefaultStoragePath[] = "qsstor/";
+#endif
+
+DEFINE_int32(num_workers, 1, "Number of worker threads");
+
+DEFINE_bool(preload_buffer_pool, false,
+            "If true, pre-load all known blocks into buffer pool before "
+            "accepting queries (should also set --buffer_pool_slots to be "
+            "large enough to accomodate the entire database).");
+
+DEFINE_string(storage_path, kDefaultStoragePath,
+              "Filesystem path for quickstep database storage.");
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+                                  std::int32_t value) {
+  if (value > 0 && value < 65536) {
+    return true;
+  } else {
+    fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+DEFINE_string(worker_affinities, "",
+              "A comma-separated list of CPU IDs to pin worker threads to "
+              "(leaving this empty will cause all worker threads to inherit "
+              "the affinity mask of the Quickstep process, which typically "
+              "means that they will all be runable on any CPU according to "
+              "the kernel's own scheduling policy).");
+
+}  // namespace quickstep
+
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+  grpc_init();
+
+  if (quickstep::FLAGS_num_workers > 0) {
+    printf("Starting Quickstep with %d worker thread(s)\n",
+           quickstep::FLAGS_num_workers);
+  } else {
+    LOG(FATAL) << "Quickstep needs at least one worker thread";
+  }
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (quickstep::FLAGS_use_hdfs) {
+    LOG(INFO) << "Using HDFS as the default persistent storage, with namenode at "
+              << quickstep::FLAGS_hdfs_namenode_host << ":"
+              << quickstep::FLAGS_hdfs_namenode_port << " and block replication factor "
+              << quickstep::FLAGS_hdfs_num_replications << "\n";
+  }
+#endif
+
+  // Setup the paths used by StorageManager.
+  string fixed_storage_path(quickstep::FLAGS_storage_path);
+  if (!fixed_storage_path.empty()
+      && (fixed_storage_path.back() != quickstep::kPathSeparator)) {
+    fixed_storage_path.push_back(quickstep::kPathSeparator);
+  }
+
+  quickstep::StorageManager storage_manager(fixed_storage_path);
+
+  tmb::NativeNetClientMessageBus bus;
+  bus.AddServer(quickstep::FLAGS_tmb_server_ip.c_str(), quickstep::FLAGS_tmb_server_port);
+  bus.Initialize();
+
+  // Parse the CPU affinities for workers and the preloader thread, if enabled
+  // to warm up the buffer pool.
+  const vector<int> worker_cpu_affinities =
+      InputParserUtil::ParseWorkerAffinities(quickstep::FLAGS_num_workers,
+                                             quickstep::FLAGS_worker_affinities);
+
+  if (quickstep::FLAGS_preload_buffer_pool) {
+    // TODO(quickstep-team): Enable Preloading by reconstructing CatalogDatabase
+    // from Catalog in Conductor.
+    quickstep::CatalogDatabase database(nullptr, "default");
+    quickstep::PreloaderThread preloader(database,
+                                         &storage_manager,
+                                         worker_cpu_affinities.front());
+    printf("Preloading buffer pool... ");
+    fflush(stdout);
+    preloader.start();
+    preloader.join();
+    printf("DONE\n");
+  }
+
+  // Get the NUMA affinities for workers.
+  vector<int> cpu_numa_nodes = InputParserUtil::GetNUMANodesForCPUs();
+  if (cpu_numa_nodes.empty()) {
+    // libnuma is not present. Assign -1 as the NUMA node for every worker.
+    cpu_numa_nodes.assign(worker_cpu_affinities.size(), -1);
+  }
+
+  vector<int> worker_numa_nodes;
+  quickstep::PtrVector<Worker> workers;
+  vector<client_id> worker_client_ids;
+
+  // Initialize the worker threads.
+  DCHECK_EQ(static_cast<size_t>(quickstep::FLAGS_num_workers),
+            worker_cpu_affinities.size());
+  for (size_t worker_idx = 0;
+       worker_idx < worker_cpu_affinities.size();
+       ++worker_idx) {
+    int numa_node_id = -1;
+    if (worker_cpu_affinities[worker_idx] >= 0) {
+      // This worker can be NUMA affinitized.
+      numa_node_id = cpu_numa_nodes[worker_cpu_affinities[worker_idx]];
+    }
+    worker_numa_nodes.push_back(numa_node_id);
+
+    workers.push_back(new Worker(worker_idx,
+                                 &bus,
+                                 worker_cpu_affinities[worker_idx]));
+    worker_client_ids.push_back(workers.back().getBusClientID());
+  }
+
+  // TODO(zuyu): Move WorkerDirectory within Shiftboss once the latter is added.
+  quickstep::WorkerDirectory worker_directory(worker_cpu_affinities.size(),
+                                              worker_client_ids,
+                                              worker_numa_nodes);
+
+  quickstep::Shiftboss shiftboss(&bus,
+                                 &storage_manager,
+                                 &worker_directory);
+  shiftboss.start();
+
+  // Start the worker threads.
+  for (Worker &worker : workers) {
+    worker.start();
+  }
+
+  for (Worker &worker : workers) {
+    worker.join();
+  }
+  shiftboss.join();
+
+  return 0;
+}
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 95807d4..b9a964c 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,6 +20,10 @@
 import "query_execution/QueryContext.proto";
 import "relational_operators/WorkOrder.proto";
 
+// Used for any messages that do not carry payloads.
+message EmptyMessage {
+}
+
 message ShiftbossRegistrationMessage {
   // The total Work Order processing capacity in Shiftboss, which equals to the
   // sum of the capacity of each worker managed by Shiftboss.
@@ -87,7 +91,7 @@
 }
 
 message QueryExecutionSuccessMessage {
-  optional CatalogRelation query_result_relation = 1;
+  optional CatalogRelationSchema query_result_relation = 1;
 }
 
 message QueryExecutionErrorMessage {