|  | /** | 
|  | *   Copyright 2015-2016 Pivotal Software, Inc. | 
|  | * | 
|  | *   Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | *   you may not use this file except in compliance with the License. | 
|  | *   You may obtain a copy of the License at | 
|  | * | 
|  | *       http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | *   Unless required by applicable law or agreed to in writing, software | 
|  | *   distributed under the License is distributed on an "AS IS" BASIS, | 
|  | *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | *   See the License for the specific language governing permissions and | 
|  | *   limitations under the License. | 
|  | **/ | 
|  |  | 
|  | #include <cstddef> | 
|  | #include <cstdio> | 
|  | #include <memory> | 
|  | #include <string> | 
|  |  | 
|  | #include "catalog/CatalogAttribute.hpp" | 
|  | #include "catalog/CatalogDatabase.hpp" | 
|  | #include "catalog/CatalogRelation.hpp" | 
|  | #include "catalog/CatalogTypedefs.hpp" | 
|  | #include "cli/PrintToScreen.hpp" | 
|  | #include "query_execution/QueryContext.hpp" | 
|  | #include "query_execution/QueryContext.pb.h" | 
|  | #include "query_execution/QueryExecutionMessages.pb.h" | 
|  | #include "query_execution/QueryExecutionTypedefs.hpp" | 
|  | #include "query_execution/WorkOrdersContainer.hpp" | 
|  | #include "relational_operators/RelationalOperator.hpp" | 
|  | #include "relational_operators/TextScanOperator.hpp" | 
|  | #include "relational_operators/WorkOrder.hpp" | 
|  | #include "storage/InsertDestination.pb.h" | 
|  | #include "storage/StorageManager.hpp" | 
|  | #include "threading/ThreadIDBasedMap.hpp" | 
|  | #include "types/TypeFactory.hpp" | 
|  | #include "types/TypeID.hpp" | 
|  | #include "utility/MemStream.hpp" | 
|  |  | 
|  | #include "glog/logging.h" | 
|  |  | 
|  | #include "gtest/gtest.h" | 
|  |  | 
|  | #include "tmb/id_typedefs.h" | 
|  |  | 
|  | // Global variables used by test, set up by main(). | 
|  | namespace { | 
|  | const char *input_filename; | 
|  | const char *golden_output_filename; | 
|  | const char *failure_output_filename; | 
|  | }  // namespace | 
|  |  | 
|  | namespace quickstep { | 
|  |  | 
|  | namespace { | 
|  | constexpr int kOpIndex = 0; | 
|  | }  // namespace | 
|  |  | 
|  | class TextScanOperatorTest : public ::testing::Test { | 
|  | protected: | 
|  | virtual void SetUp() { | 
|  | thread_id_map_ = ClientIDMap::Instance(); | 
|  |  | 
|  | bus_.Initialize(); | 
|  |  | 
|  | const tmb::client_id worker_thread_client_id = bus_.Connect(); | 
|  | bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); | 
|  |  | 
|  | // Usually the worker thread makes the following call. In this test setup, | 
|  | // we don't have a worker thread hence we have to explicitly make the call. | 
|  | thread_id_map_->addValue(worker_thread_client_id); | 
|  |  | 
|  | foreman_client_id_ = bus_.Connect(); | 
|  | bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage); | 
|  |  | 
|  | db_.reset(new CatalogDatabase(nullptr, "database")); | 
|  |  | 
|  | // Create table with a variety of attribute types. | 
|  | relation_ = new CatalogRelation(nullptr, "test_relation"); | 
|  | db_->addRelation(relation_); | 
|  |  | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "long_attr", TypeFactory::GetType(kLong, true))); | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "double_attr", TypeFactory::GetType(kDouble, true))); | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "char_attr", TypeFactory::GetType(kChar, 20, true))); | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "datetime_attr", TypeFactory::GetType(kDatetime, true))); | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "interval_attr", TypeFactory::GetType(kDatetimeInterval, true))); | 
|  | relation_->addAttribute( | 
|  | new CatalogAttribute(relation_, "varchar_attr", TypeFactory::GetType(kVarChar, 20, true))); | 
|  |  | 
|  | storage_manager_.reset(new StorageManager("./test_data/")); | 
|  | } | 
|  |  | 
|  | virtual void TearDown() { | 
|  | thread_id_map_->removeValue(); | 
|  | } | 
|  |  | 
|  | void fetchAndExecuteWorkOrders(RelationalOperator *op) { | 
|  | // Treat the single operator as the sole node in a query plan DAG. | 
|  | op->setOperatorIndex(kOpIndex); | 
|  | WorkOrdersContainer container(1, 0); | 
|  | const std::size_t op_index = 0; | 
|  | op->informAllBlockingDependenciesMet(); | 
|  | op->getAllWorkOrders(&container, | 
|  | query_context_.get(), | 
|  | storage_manager_.get(), | 
|  | foreman_client_id_, | 
|  | &bus_); | 
|  |  | 
|  | while (container.hasNormalWorkOrder(op_index)) { | 
|  | std::unique_ptr<WorkOrder> work_order(container.getNormalWorkOrder(op_index)); | 
|  | work_order->execute(); | 
|  | processCatalogRelationNewBlockMessages(); | 
|  | } | 
|  |  | 
|  | while (container.hasRebuildWorkOrder(op_index)) { | 
|  | std::unique_ptr<WorkOrder> work_order(container.getRebuildWorkOrder(op_index)); | 
|  | work_order->execute(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void processCatalogRelationNewBlockMessages() { | 
|  | AnnotatedMessage msg; | 
|  | while (bus_.ReceiveIfAvailable(foreman_client_id_, &msg)) { | 
|  | const TaggedMessage &tagged_message = msg.tagged_message; | 
|  | if (tagged_message.message_type() == kCatalogRelationNewBlockMessage) { | 
|  | serialization::CatalogRelationNewBlockMessage proto; | 
|  | CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); | 
|  |  | 
|  | CatalogRelation *relation = db_->getRelationByIdMutable(proto.relation_id()); | 
|  | relation->addBlock(proto.block_id()); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | static std::string LoadGoldenOutput() { | 
|  | std::string golden_string; | 
|  |  | 
|  | FILE *golden_file = std::fopen(golden_output_filename, "r"); | 
|  | CHECK_NOTNULL(golden_file); | 
|  |  | 
|  | char read_buffer[1024]; | 
|  | for (;;) { | 
|  | std::size_t bytes_read = std::fread(read_buffer, 1, 1024, golden_file); | 
|  | if (bytes_read == 0) { | 
|  | break; | 
|  | } | 
|  | golden_string.append(read_buffer, bytes_read); | 
|  | } | 
|  |  | 
|  | CHECK(std::feof(golden_file)); | 
|  | std::fclose(golden_file); | 
|  |  | 
|  | return golden_string; | 
|  | } | 
|  |  | 
|  | // This map is needed for InsertDestination and some WorkOrders that send | 
|  | // messages to Foreman directly. To know the reason behind the design of this | 
|  | // map, see the note in InsertDestination.hpp. | 
|  | ClientIDMap *thread_id_map_; | 
|  |  | 
|  | MessageBusImpl bus_; | 
|  | tmb::client_id foreman_client_id_; | 
|  |  | 
|  | std::unique_ptr<CatalogDatabase> db_; | 
|  | CatalogRelation *relation_; | 
|  | std::unique_ptr<StorageManager> storage_manager_; | 
|  | std::unique_ptr<QueryContext> query_context_; | 
|  | }; | 
|  |  | 
|  | TEST_F(TextScanOperatorTest, ScanTest) { | 
|  | std::string golden_string = LoadGoldenOutput(); | 
|  |  | 
|  | // Setup the InsertDestination proto in the query context proto. | 
|  | serialization::QueryContext query_context_proto; | 
|  |  | 
|  | QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size(); | 
|  | serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations(); | 
|  |  | 
|  | output_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); | 
|  | output_destination_proto->set_relation_id(relation_->getID()); | 
|  | output_destination_proto->set_relational_op_index(kOpIndex); | 
|  |  | 
|  | std::unique_ptr<TextScanOperator> text_scan_op( | 
|  | new TextScanOperator(input_filename, | 
|  | '\t', | 
|  | true, | 
|  | false, | 
|  | *relation_, | 
|  | output_destination_index)); | 
|  |  | 
|  | // Setup query_context_. | 
|  | query_context_.reset(new QueryContext(query_context_proto, | 
|  | *db_, | 
|  | storage_manager_.get(), | 
|  | foreman_client_id_, | 
|  | &bus_)); | 
|  |  | 
|  | fetchAndExecuteWorkOrders(text_scan_op.get()); | 
|  | text_scan_op.reset(nullptr); | 
|  |  | 
|  | MemStream print_stream; | 
|  | PrintToScreen::PrintRelation(*relation_, | 
|  | storage_manager_.get(), | 
|  | print_stream.file()); | 
|  | std::string printed(print_stream.str()); | 
|  |  | 
|  | EXPECT_EQ(golden_string, printed); | 
|  | if (golden_string != printed) { | 
|  | FILE *failure_output = std::fopen(failure_output_filename, "w"); | 
|  | CHECK_NOTNULL(failure_output); | 
|  | const std::size_t written | 
|  | = std::fwrite(printed.c_str(), 1, printed.length(), failure_output); | 
|  | CHECK_EQ(printed.length(), written); | 
|  | std::fclose(failure_output); | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace quickstep | 
|  |  | 
|  | int main(int argc, char* argv[]) { | 
|  | google::InitGoogleLogging(argv[0]); | 
|  | testing::InitGoogleTest(&argc, argv); | 
|  |  | 
|  | if (argc != 4) { | 
|  | std::fprintf(stderr, | 
|  | "USAGE: %s [gtest options] " | 
|  | "INPUT_FILENAME GOLDEN_OUTPUT_FILENAME FAILURE_OUTPUT_FILENAME\n", | 
|  | argv[0]); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | input_filename = argv[1]; | 
|  | golden_output_filename = argv[2]; | 
|  | failure_output_filename = argv[3]; | 
|  |  | 
|  | return RUN_ALL_TESTS(); | 
|  | } |