blob: 0861701cf06086ed1d43a791b5e9d74549b282dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "arrow/c/abi.h"
#include "benchmarks/BatchStreamIterator.h"
#include "benchmarks/common/BenchmarkUtils.h"
#include "utils/TestUtils.h"
#include <arrow/adapters/orc/adapter.h>
#include <arrow/io/file.h>
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/type_fwd.h>
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include <vector>
namespace gluten {
static const unsigned kFileNum = 2;
struct OrcTestEntry {
std::string orcFilename;
std::shared_ptr<arrow::Schema> writeSchema;
std::shared_ptr<arrow::Schema> readSchema;
size_t writeRowNums = 0;
size_t readRowNums = 0;
void Assert() {
ASSERT_TRUE(writeSchema->Equals(*readSchema));
ASSERT_EQ(writeRowNums, readRowNums);
}
};
struct OrcTestData {
std::vector<OrcTestEntry> entries;
OrcTestData() {
entries.resize(kFileNum);
entries[0].orcFilename = "example_orders.orc";
entries[1].orcFilename = "example_lineitem.orc";
}
~OrcTestData() {
for (auto& x : entries) {
std::filesystem::remove(x.orcFilename);
// std::cout << "remove file " << x.orcFilename << std::endl;
}
}
void check() {
for (auto& x : entries) {
x.Assert();
}
}
} orcTestData;
arrow::Status parquet2Orc(unsigned index, const std::string& parquetFile, const std::string& orcFile) {
ParquetBatchStreamIterator parquetIterator(parquetFile);
orcTestData.entries[index].writeSchema = parquetIterator.getSchema();
std::shared_ptr<arrow::io::FileOutputStream> outputStream;
ARROW_ASSIGN_OR_RAISE(outputStream, arrow::io::FileOutputStream::Open(orcFile));
auto writerOptions = arrow::adapters::orc::WriteOptions();
auto maybeWriter = arrow::adapters::orc::ORCFileWriter::Open(outputStream.get(), writerOptions);
EXPECT_TRUE(maybeWriter.ok());
auto& writer = *maybeWriter;
while (true) {
// 1. read from Parquet
auto cb = parquetIterator.next();
if (cb == nullptr) {
break;
}
auto arrowColumnarBatch = std::dynamic_pointer_cast<gluten::ArrowColumnarBatch>(cb);
auto recordBatch = arrowColumnarBatch->getRecordBatch();
// std::cout << "==========\n" << recordBatch->ToString() << std::endl;
// 2. write to Orc
if (!(writer->Write(*recordBatch)).ok()) {
return arrow::Status::IOError("Write failed");
}
orcTestData.entries[index].writeRowNums += recordBatch->num_rows();
}
if (!(writer->Close()).ok()) {
return arrow::Status::IOError("Close failed");
}
return arrow::Status::OK();
}
void testWriteOrc() {
std::vector<std::string> inputFiles(kFileNum);
inputFiles[0] = getGeneratedFilePath("example_orders");
inputFiles[1] = getGeneratedFilePath("example_lineitem");
ASSERT_EQ(inputFiles.size(), orcTestData.entries.size());
for (auto i = 0; i != inputFiles.size(); ++i) {
ASSERT_NOT_OK(parquet2Orc(i, inputFiles[i], orcTestData.entries[i].orcFilename));
}
}
void testReadOrc() {
for (auto i = 0; i != orcTestData.entries.size(); ++i) {
// Open File
auto input = arrow::io::ReadableFile::Open(orcTestData.entries[i].orcFilename);
EXPECT_TRUE(input.ok());
// Open ORC File Reader
auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input, arrow::default_memory_pool());
EXPECT_TRUE(maybeReader.ok());
auto& reader = *maybeReader;
// read schema
auto schema = reader->ReadSchema();
EXPECT_TRUE(schema.ok());
orcTestData.entries[i].readSchema = *schema;
// std::cout << "schema:\n" << (*schema)->ToString() << std::endl;
// read record batch
auto recordBatchReader = reader->GetRecordBatchReader(4096, std::vector<std::string>());
EXPECT_TRUE(recordBatchReader.ok());
while (true) {
auto batch = (*recordBatchReader)->Next();
EXPECT_TRUE(batch.ok());
if (!(*batch)) {
break;
}
orcTestData.entries[i].readRowNums += (*batch)->num_rows();
// std::cout << (*batch)->ToString() << std::endl;
}
}
}
class OrcTest : public ::testing::Test {};
TEST_F(OrcTest, testOrc) {
GTEST_SKIP() << "Issue 2862";
testWriteOrc();
testReadOrc();
orcTestData.check();
}
} // namespace gluten