blob: 9fedfd5449a26e82a984bc36a11f05cee99255b4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arrow/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
#include <gtest/gtest.h>
#include <parquet/arrow/reader.h>
#include <filesystem>
#include <memory>
#include "common.h"
class DatasetReadingTest : public ::testing::Test {
public:
void SetUp() override {
airquality_partitioned_dir_ =
std::filesystem::temp_directory_path() / "cookbook_cpp_airquality";
std::shared_ptr<arrow::fs::FileSystem> fs =
std::make_shared<arrow::fs::LocalFileSystem>();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> airquality,
ReadInAirQuality(fs.get()));
WritePartitionedAirQuality(airquality, std::move(fs));
}
const std::string& airquality_basedir() { return airquality_partitioned_dir_; }
private:
void WritePartitionedAirQuality(const std::shared_ptr<arrow::Table>& airquality,
std::shared_ptr<arrow::fs::FileSystem> fs) {
std::shared_ptr<arrow::RecordBatchReader> table_reader =
std::make_shared<arrow::TableBatchReader>(*airquality);
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
arrow::dataset::ScannerBuilder::FromRecordBatchReader(std::move(table_reader));
ASSERT_OK(scanner_builder->UseThreads(true));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
scanner_builder->Finish());
std::shared_ptr<arrow::Schema> partitioning_schema = arrow::schema(
{arrow::field("Month", arrow::int32()), arrow::field("Day", arrow::int32())});
std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
arrow::dataset::HivePartitioning::MakeFactory();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Partitioning> partitioning,
partitioning_factory->Finish(partitioning_schema));
std::shared_ptr<arrow::dataset::ParquetFileFormat> parquet_format =
std::make_shared<arrow::dataset::ParquetFileFormat>();
arrow::dataset::FileSystemDatasetWriteOptions write_options;
write_options.existing_data_behavior =
arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions;
write_options.filesystem = std::move(fs);
write_options.partitioning = std::move(partitioning);
write_options.base_dir = airquality_partitioned_dir_;
write_options.basename_template = "chunk-{i}.parquet";
write_options.file_write_options = parquet_format->DefaultWriteOptions();
ASSERT_OK(
arrow::dataset::FileSystemDataset::Write(write_options, std::move(scanner)));
}
static arrow::Result<std::shared_ptr<arrow::Table>> ReadInAirQuality(
arrow::fs::FileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> file,
fs->OpenInputFile(airquality_path));
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::Open(file);
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(parquet::arrow::FileReader::Make(
arrow::default_memory_pool(), std::move(parquet_reader), &reader));
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
return table;
}
std::string airquality_partitioned_dir_;
};
arrow::Status DatasetRead(const std::string& airquality_basedir) {
StartRecipe("ListPartitionedDataset");
const std::string& directory_base = airquality_basedir;
// Create a filesystem
std::shared_ptr<arrow::fs::LocalFileSystem> fs =
std::make_shared<arrow::fs::LocalFileSystem>();
// Create a file selector which describes which files are part of
// the dataset. This selector performs a recursive search of a base
// directory which is typical with partitioned datasets. You can also
// create a dataset from a list of one or more paths.
arrow::fs::FileSelector selector;
selector.base_dir = directory_base;
selector.recursive = true;
// List out the files so we can see how our data is partitioned.
// This step is not necessary for reading a dataset
ARROW_ASSIGN_OR_RAISE(std::vector<arrow::fs::FileInfo> file_infos,
fs->GetFileInfo(selector));
int num_printed = 0;
for (const auto& path : file_infos) {
if (path.IsFile()) {
rout << path.path().substr(directory_base.size()) << std::endl;
if (++num_printed == 10) {
rout << "..." << std::endl;
break;
}
}
}
EndRecipe("ListPartitionedDataset");
StartRecipe("CreatingADataset");
// Create a file format which describes the format of the files.
// Here we specify we are reading parquet files. We could pick a different format
// such as Arrow-IPC files or CSV files or we could customize the parquet format with
// additional reading & parsing options.
std::shared_ptr<arrow::dataset::ParquetFileFormat> format =
std::make_shared<arrow::dataset::ParquetFileFormat>();
// Create a partitioning factory. A partitioning factory will be used by a dataset
// factory to infer the partitioning schema from the filenames. All we need to
// specify is the flavor of partitioning which, in our case, is "hive".
//
// Alternatively, we could manually create a partitioning scheme from a schema. This
// is typically not necessary for hive partitioning as inference works well.
std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
arrow::dataset::HivePartitioning::MakeFactory();
arrow::dataset::FileSystemFactoryOptions options;
options.partitioning = partitioning_factory;
// Create a dataset factory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::dataset::DatasetFactory> dataset_factory,
arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options));
// Create the dataset, this will scan the dataset directory to find all the files
// and may scan some file metadata in order to determine the dataset schema.
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset,
dataset_factory->Finish());
rout << "We discovered the following schema for the dataset:" << std::endl
<< std::endl
<< dataset->schema()->ToString() << std::endl;
EndRecipe("CreatingADataset");
StartRecipe("ScanningADataset");
// Create a scanner
arrow::dataset::ScannerBuilder scanner_builder(dataset);
ARROW_RETURN_NOT_OK(scanner_builder.UseThreads(true));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Scanner> scanner,
scanner_builder.Finish());
// Scan the dataset. There are a variety of other methods available on the scanner as
// well
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, scanner->ToTable());
rout << "Read in a table with " << table->num_rows() << " rows and "
<< table->num_columns() << " columns";
EndRecipe("ScanningADataset");
return arrow::Status::OK();
}
TEST_F(DatasetReadingTest, TestDatasetRead) {
ASSERT_OK(DatasetRead(airquality_basedir()));
}