| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include <cstddef> |
| #include <cstdio> |
| #include <memory> |
| #include <string> |
| |
| #include "catalog/CatalogAttribute.hpp" |
| #include "catalog/CatalogDatabase.hpp" |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "cli/PrintToScreen.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/QueryContext.pb.h" |
| #include "query_execution/QueryExecutionMessages.pb.h" |
| #include "query_execution/QueryExecutionTypedefs.hpp" |
| #include "query_execution/WorkOrdersContainer.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "relational_operators/TextScanOperator.hpp" |
| #include "relational_operators/WorkOrder.hpp" |
| #include "storage/InsertDestination.pb.h" |
| #include "storage/StorageManager.hpp" |
| #include "threading/ThreadIDBasedMap.hpp" |
| #include "types/TypeFactory.hpp" |
| #include "types/TypeID.hpp" |
| #include "utility/MemStream.hpp" |
| |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| #include "gtest/gtest.h" |
| |
| #include "tmb/id_typedefs.h" |
| |
| // Global variables used by test, set up by main(). |
| namespace { |
| const char *input_filename; |
| const char *golden_output_filename; |
| const char *failure_output_filename; |
| } // namespace |
| |
| namespace quickstep { |
| |
| namespace { |
| constexpr std::size_t kQueryId = 0; |
| constexpr int kOpIndex = 0; |
| } // namespace |
| |
| class TextScanOperatorTest : public ::testing::Test { |
| protected: |
| virtual void SetUp() { |
| thread_id_map_ = ClientIDMap::Instance(); |
| |
| bus_.Initialize(); |
| |
| const tmb::client_id worker_thread_client_id = bus_.Connect(); |
| bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); |
| |
| // Usually the worker thread makes the following call. In this test setup, |
| // we don't have a worker thread hence we have to explicitly make the call. |
| thread_id_map_->addValue(worker_thread_client_id); |
| |
| foreman_client_id_ = bus_.Connect(); |
| bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage); |
| |
| db_.reset(new CatalogDatabase(nullptr, "database")); |
| |
| // Create table with a variety of attribute types. |
| relation_ = new CatalogRelation(nullptr, "test_relation"); |
| db_->addRelation(relation_); |
| |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "long_attr", TypeFactory::GetType(kLong, true))); |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "double_attr", TypeFactory::GetType(kDouble, true))); |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "char_attr", TypeFactory::GetType(kChar, 20, true))); |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "datetime_attr", TypeFactory::GetType(kDatetime, true))); |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "interval_attr", TypeFactory::GetType(kDatetimeInterval, true))); |
| relation_->addAttribute( |
| new CatalogAttribute(relation_, "varchar_attr", TypeFactory::GetType(kVarChar, 20, true))); |
| |
| storage_manager_.reset(new StorageManager("./text_scan_operator_test_data/")); |
| } |
| |
| virtual void TearDown() { |
| thread_id_map_->removeValue(); |
| } |
| |
| void fetchAndExecuteWorkOrders(RelationalOperator *op) { |
| // Treat the single operator as the sole node in a query plan DAG. |
| op->setOperatorIndex(kOpIndex); |
| WorkOrdersContainer container(1, 0); |
| const std::size_t op_index = 0; |
| op->informAllBlockingDependenciesMet(); |
| op->getAllWorkOrders(&container, |
| query_context_.get(), |
| storage_manager_.get(), |
| foreman_client_id_, |
| &bus_); |
| |
| while (container.hasNormalWorkOrder(op_index)) { |
| std::unique_ptr<WorkOrder> work_order(container.getNormalWorkOrder(op_index)); |
| work_order->execute(); |
| processCatalogRelationNewBlockMessages(); |
| } |
| |
| while (container.hasRebuildWorkOrder(op_index)) { |
| std::unique_ptr<WorkOrder> work_order(container.getRebuildWorkOrder(op_index)); |
| work_order->execute(); |
| } |
| } |
| |
| void processCatalogRelationNewBlockMessages() { |
| AnnotatedMessage msg; |
| while (bus_.ReceiveIfAvailable(foreman_client_id_, &msg)) { |
| const TaggedMessage &tagged_message = msg.tagged_message; |
| if (tagged_message.message_type() == kCatalogRelationNewBlockMessage) { |
| serialization::CatalogRelationNewBlockMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| CatalogRelation *relation = db_->getRelationByIdMutable(proto.relation_id()); |
| relation->addBlock(proto.block_id()); |
| } |
| } |
| } |
| |
| static std::string LoadGoldenOutput() { |
| std::string golden_string; |
| |
| FILE *golden_file = std::fopen(golden_output_filename, "r"); |
| CHECK_NOTNULL(golden_file); |
| |
| char read_buffer[1024]; |
| for (;;) { |
| std::size_t bytes_read = std::fread(read_buffer, 1, 1024, golden_file); |
| if (bytes_read == 0) { |
| break; |
| } |
| golden_string.append(read_buffer, bytes_read); |
| } |
| |
| CHECK(std::feof(golden_file)); |
| std::fclose(golden_file); |
| |
| return golden_string; |
| } |
| |
| // This map is needed for InsertDestination and some WorkOrders that send |
| // messages to Foreman directly. To know the reason behind the design of this |
| // map, see the note in InsertDestination.hpp. |
| ClientIDMap *thread_id_map_; |
| |
| MessageBusImpl bus_; |
| tmb::client_id foreman_client_id_; |
| |
| std::unique_ptr<CatalogDatabase> db_; |
| CatalogRelation *relation_; |
| std::unique_ptr<StorageManager> storage_manager_; |
| std::unique_ptr<QueryContext> query_context_; |
| }; |
| |
| TEST_F(TextScanOperatorTest, ScanTest) { |
| std::string golden_string = LoadGoldenOutput(); |
| |
| // Setup the InsertDestination proto in the query context proto. |
| serialization::QueryContext query_context_proto; |
| query_context_proto.set_query_id(0); // dummy query ID. |
| |
| QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size(); |
| serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations(); |
| |
| output_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); |
| output_destination_proto->set_relation_id(relation_->getID()); |
| output_destination_proto->set_relational_op_index(kOpIndex); |
| |
| std::unique_ptr<TextScanOperator> text_scan_op( |
| new TextScanOperator(kQueryId, |
| input_filename, |
| '\t', |
| true, |
| *relation_, |
| output_destination_index)); |
| |
| // Setup query_context_. |
| query_context_.reset(new QueryContext(query_context_proto, |
| *db_, |
| storage_manager_.get(), |
| foreman_client_id_, |
| &bus_)); |
| |
| fetchAndExecuteWorkOrders(text_scan_op.get()); |
| text_scan_op.reset(nullptr); |
| |
| MemStream print_stream; |
| PrintToScreen::PrintRelation(*relation_, |
| storage_manager_.get(), |
| print_stream.file()); |
| std::string printed(print_stream.str()); |
| |
| EXPECT_EQ(golden_string, printed); |
| if (golden_string != printed) { |
| FILE *failure_output = std::fopen(failure_output_filename, "w"); |
| CHECK_NOTNULL(failure_output); |
| const std::size_t written |
| = std::fwrite(printed.c_str(), 1, printed.length(), failure_output); |
| CHECK_EQ(printed.length(), written); |
| std::fclose(failure_output); |
| } |
| } |
| |
| } // namespace quickstep |
| |
| int main(int argc, char* argv[]) { |
| google::InitGoogleLogging(argv[0]); |
| testing::InitGoogleTest(&argc, argv); |
| |
| if (argc != 4) { |
| std::fprintf(stderr, |
| "USAGE: %s [gtest options] " |
| "INPUT_FILENAME GOLDEN_OUTPUT_FILENAME FAILURE_OUTPUT_FILENAME\n", |
| argv[0]); |
| return 1; |
| } |
| |
| input_filename = argv[1]; |
| golden_output_filename = argv[2]; |
| failure_output_filename = argv[3]; |
| |
| return RUN_ALL_TESTS(); |
| } |