| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "common.h" |
| |
| #include <filesystem> |
| #include <sstream> |
| #include <unordered_map> |
| |
| #include "arrow/api.h" |
| #include "arrow/filesystem/api.h" |
| #include "arrow/ipc/api.h" |
| #include "gtest/gtest.h" |
| |
| static arrow::StringBuilder test_names_builder; |
| static arrow::StringBuilder test_output_builder; |
| static std::string current_recipe; |
| |
| void StartRecipe(const std::string& recipe_name) { |
| if (!current_recipe.empty()) { |
| FAIL() << "Attempt to start a recipe " << recipe_name << " but the recipe " |
| << current_recipe << " has not been marked finished"; |
| } |
| if (recipe_name.empty()) { |
| FAIL() << "Invalid empty recipe name"; |
| } |
| current_recipe = recipe_name; |
| rout = std::stringstream(); |
| } |
| |
| void EndRecipe(const std::string& recipe_name) { |
| if (current_recipe != recipe_name) { |
| FAIL() << "Attempt to end a recipe " << recipe_name |
| << " but the recipe was not in progress"; |
| } |
| std::string recipe_output = rout.str(); |
| |
| ASSERT_OK(test_names_builder.Append(recipe_name)); |
| ASSERT_OK(test_output_builder.Append(recipe_output)); |
| |
| current_recipe = ""; |
| } |
| |
| std::shared_ptr<arrow::Schema> RecipesTableSchema() { |
| return arrow::schema({arrow::field("Recipe Name", arrow::utf8()), |
| arrow::field("Recipe Output", arrow::utf8())}); |
| } |
| |
| std::shared_ptr<arrow::Array> MakeEmptyStringArray() { |
| arrow::StringBuilder builder; |
| return builder.Finish().ValueOrDie(); |
| } |
| |
| arrow::Result<std::shared_ptr<arrow::Table>> MakeEmptyRecipesTable() { |
| std::shared_ptr<arrow::Schema> schema = RecipesTableSchema(); |
| std::shared_ptr<arrow::RecordBatch> batch = arrow::RecordBatch::Make( |
| schema, 0, {MakeEmptyStringArray(), MakeEmptyStringArray()}); |
| return arrow::Table::FromRecordBatches({batch}); |
| } |
| |
| arrow::Result<std::shared_ptr<arrow::Table>> ReadRecipeTable( |
| const std::shared_ptr<arrow::io::RandomAccessFile>& in_file) { |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader, |
| arrow::ipc::RecordBatchStreamReader::Open(in_file)); |
| return reader->ToTable(); |
| } |
| |
| arrow::Result<std::shared_ptr<arrow::Table>> LoadExistingRecipeOutputTable( |
| const std::string& filename) { |
| std::shared_ptr<arrow::fs::FileSystem> fs = |
| std::make_shared<arrow::fs::LocalFileSystem>(); |
| arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> maybe_in = |
| fs->OpenInputFile(filename); |
| if (!maybe_in.ok()) { |
| return MakeEmptyRecipesTable(); |
| } |
| std::shared_ptr<arrow::io::RandomAccessFile> in = *maybe_in; |
| return ReadRecipeTable(in); |
| } |
| |
| arrow::Result<std::shared_ptr<arrow::Table>> CreateRecipeOutputTable() { |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> test_names, |
| test_names_builder.Finish()); |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> test_outputs, |
| test_output_builder.Finish()); |
| std::shared_ptr<arrow::Schema> schema = |
| arrow::schema({arrow::field("Recipe Name", arrow::utf8()), |
| arrow::field("Recipe Output", arrow::utf8())}); |
| std::shared_ptr<arrow::RecordBatch> batch = |
| arrow::RecordBatch::Make(schema, test_names->length(), {test_names, test_outputs}); |
| return arrow::Table::FromRecordBatches({batch}); |
| } |
| |
| void PopulateMap(const arrow::Table& table, |
| std::unordered_map<std::string, std::string>* values) { |
| if (table.num_rows() == 0) { |
| return; |
| } |
| std::shared_ptr<arrow::StringArray> table_names = |
| std::dynamic_pointer_cast<arrow::StringArray>(table.column(0)->chunk(0)); |
| std::shared_ptr<arrow::StringArray> table_outputs = |
| std::dynamic_pointer_cast<arrow::StringArray>(table.column(1)->chunk(0)); |
| for (int64_t i = 0; i < table.num_rows(); i++) { |
| (*values)[table_names->GetString(i)] = table_outputs->GetString(i); |
| } |
| } |
| |
| arrow::Result<std::shared_ptr<arrow::Table>> MergeRecipeTables( |
| const std::shared_ptr<arrow::Table>& old_table, |
| const std::shared_ptr<arrow::Table>& new_table) { |
| std::unordered_map<std::string, std::string> values; |
| PopulateMap(*old_table, &values); |
| PopulateMap(*new_table, &values); |
| arrow::StringBuilder names_builder; |
| arrow::StringBuilder outputs_builder; |
| for (const auto& pair : values) { |
| ARROW_RETURN_NOT_OK(names_builder.Append(pair.first)); |
| ARROW_RETURN_NOT_OK(outputs_builder.Append(pair.second)); |
| } |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> names_arr, names_builder.Finish()); |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> outputs_arr, |
| outputs_builder.Finish()); |
| std::shared_ptr<arrow::Schema> schema = RecipesTableSchema(); |
| std::shared_ptr<arrow::RecordBatch> batch = |
| arrow::RecordBatch::Make(schema, names_arr->length(), {names_arr, outputs_arr}); |
| return arrow::Table::FromRecordBatches({batch}); |
| } |
| |
| bool HasRecipeOutput() { return test_names_builder.length() > 0; } |
| |
| arrow::Status DumpRecipeOutput(const std::string& output_filename) { |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> new_table, |
| CreateRecipeOutputTable()); |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> old_table, |
| LoadExistingRecipeOutputTable(output_filename)); |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> merged_table, |
| MergeRecipeTables(old_table, new_table)); |
| std::shared_ptr<arrow::fs::FileSystem> fs = |
| std::make_shared<arrow::fs::LocalFileSystem>(); |
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::OutputStream> out_stream, |
| fs->OpenOutputStream(output_filename)); |
| ARROW_ASSIGN_OR_RAISE( |
| std::shared_ptr<arrow::ipc::RecordBatchWriter> writer, |
| arrow::ipc::MakeStreamWriter(out_stream.get(), merged_table->schema())); |
| RETURN_NOT_OK(writer->WriteTable(*merged_table)); |
| return writer->Close(); |
| } |
| |
| arrow::Result<std::string> FindTestDataFile(const std::string& test_data_name) { |
| auto path_iter = std::filesystem::current_path(); |
| while (path_iter.has_parent_path()) { |
| auto test_data_dir_path = path_iter / "testdata"; |
| if (std::filesystem::exists(test_data_dir_path)) { |
| return (test_data_dir_path / test_data_name).string(); |
| } |
| path_iter = path_iter.parent_path(); |
| } |
| return arrow::Status::Invalid( |
| "Could not locate testdata directory. Tests must be " |
| "run inside of the cookbook repo"); |
| } |