blob: d628f4727c1605511edce60efe3c68430bf603a2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fcntl.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
#include "arrow/io/file.h"
#include "parquet/column_reader.h"
#include "parquet/column_scanner.h"
#include "parquet/file_reader.h"
#include "parquet/printer.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
using std::string;
namespace parquet {
using ReadableFile = ::arrow::io::ReadableFile;
std::string alltypes_plain() {
std::string dir_string(test::get_data_dir());
std::stringstream ss;
ss << dir_string << "/"
<< "alltypes_plain.parquet";
return ss.str();
}
std::string nation_dict_truncated_data_page() {
std::string dir_string(test::get_data_dir());
std::stringstream ss;
ss << dir_string << "/"
<< "nation.dict-malformed.parquet";
return ss.str();
}
class TestAllTypesPlain : public ::testing::Test {
public:
void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); }
void TearDown() {}
protected:
std::unique_ptr<ParquetFileReader> reader_;
};
TEST_F(TestAllTypesPlain, NoopConstructDestruct) {}
TEST_F(TestAllTypesPlain, TestBatchRead) {
std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Reader> col =
std::dynamic_pointer_cast<Int32Reader>(group->Column(0));
int16_t def_levels[4];
int16_t rep_levels[4];
int32_t values[4];
// This file only has 8 rows
ASSERT_EQ(8, reader_->metadata()->num_rows());
// This file only has 1 row group
ASSERT_EQ(1, reader_->metadata()->num_row_groups());
// Size of the metadata is 730 bytes
ASSERT_EQ(730, reader_->metadata()->size());
// This row group must have 8 rows
ASSERT_EQ(8, group->metadata()->num_rows());
ASSERT_TRUE(col->HasNext());
int64_t values_read;
auto levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read);
ASSERT_EQ(4, levels_read);
ASSERT_EQ(4, values_read);
// Now read past the end of the file
ASSERT_TRUE(col->HasNext());
levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read);
ASSERT_EQ(4, levels_read);
ASSERT_EQ(4, values_read);
ASSERT_FALSE(col->HasNext());
}
TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
int32_t val;
bool is_null;
for (int i = 0; i < 8; ++i) {
ASSERT_TRUE(scanner->HasNext());
ASSERT_TRUE(scanner->NextValue(&val, &is_null));
ASSERT_FALSE(is_null);
}
ASSERT_FALSE(scanner->HasNext());
ASSERT_FALSE(scanner->NextValue(&val, &is_null));
}
TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
ASSERT_EQ(128, scanner->batch_size());
scanner->SetBatchSize(1024);
ASSERT_EQ(1024, scanner->batch_size());
}
TEST_F(TestAllTypesPlain, DebugPrintWorks) {
std::stringstream ss;
std::list<int> columns;
ParquetFilePrinter printer(reader_.get());
printer.DebugPrint(ss, columns);
std::string result = ss.str();
ASSERT_GT(result.size(), 0);
}
TEST_F(TestAllTypesPlain, ColumnSelection) {
std::stringstream ss;
std::list<int> columns;
columns.push_back(5);
columns.push_back(0);
columns.push_back(10);
ParquetFilePrinter printer(reader_.get());
printer.DebugPrint(ss, columns);
std::string result = ss.str();
ASSERT_GT(result.size(), 0);
}
TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
std::stringstream ss;
std::list<int> columns;
columns.push_back(100);
ParquetFilePrinter printer1(reader_.get());
ASSERT_THROW(printer1.DebugPrint(ss, columns), ParquetException);
columns.clear();
columns.push_back(-1);
ParquetFilePrinter printer2(reader_.get());
ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
}
class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
std::string dir_string(test::get_data_dir());
std::stringstream ss;
ss << dir_string << "/"
<< "alltypes_plain.parquet";
PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle));
fileno = handle->file_descriptor();
}
void TearDown() {}
protected:
int fileno;
std::shared_ptr<::arrow::io::ReadableFile> handle;
};
class HelperFileClosed : public ArrowInputFile {
public:
explicit HelperFileClosed(
const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, bool* close_called)
: ArrowInputFile(file), close_called_(close_called) {}
void Close() override { *close_called_ = true; }
private:
bool* close_called_;
};
TEST_F(TestLocalFile, FileClosedOnDestruction) {
bool close_called = false;
{
auto contents = ParquetFileReader::Contents::Open(
std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called)));
std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
result->Open(std::move(contents));
}
ASSERT_TRUE(close_called);
}
TEST_F(TestLocalFile, OpenWithMetadata) {
// PARQUET-808
std::stringstream ss;
std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle);
auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata);
// Compare pointers
ASSERT_EQ(metadata.get(), reader->metadata().get());
std::list<int> columns;
ParquetFilePrinter printer(reader.get());
printer.DebugPrint(ss, columns, true);
// Make sure OpenFile passes on the external metadata, too
auto reader2 = ParquetFileReader::OpenFile(alltypes_plain(), false,
default_reader_properties(), metadata);
// Compare pointers
ASSERT_EQ(metadata.get(), reader2->metadata().get());
}
TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
// PARQUET-816. Some files generated by older Parquet implementations may
// contain malformed data page metadata, and we can successfully decode them
// if we optimistically proceed to decoding, even if there is not enough data
// available in the stream. Before, we had quite aggressive checking of
// stream reads, which are not found e.g. in Impala's Parquet implementation
auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false);
std::stringstream ss;
// empty list means print all
std::list<int> columns;
ParquetFilePrinter printer1(reader.get());
printer1.DebugPrint(ss, columns, true);
reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true);
std::stringstream ss2;
ParquetFilePrinter printer2(reader.get());
printer2.DebugPrint(ss2, columns, true);
// The memory-mapped reads runs over the end of the column chunk and succeeds
// by accident
ASSERT_EQ(ss2.str(), ss.str());
}
TEST(TestJSONWithLocalFile, JSONOutput) {
std::string jsonOutput = R"###({
"FileName": "alltypes_plain.parquet",
"Version": "0",
"CreatedBy": "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)",
"TotalRows": "8",
"NumberOfRowGroups": "1",
"NumberOfRealColumns": "11",
"NumberOfColumns": "11",
"Columns": [
{ "Id": "0", "Name": "id", "PhysicalType": "INT32", "LogicalType": "NONE" },
{ "Id": "1", "Name": "bool_col", "PhysicalType": "BOOLEAN", "LogicalType": "NONE" },
{ "Id": "2", "Name": "tinyint_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
{ "Id": "3", "Name": "smallint_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
{ "Id": "4", "Name": "int_col", "PhysicalType": "INT32", "LogicalType": "NONE" },
{ "Id": "5", "Name": "bigint_col", "PhysicalType": "INT64", "LogicalType": "NONE" },
{ "Id": "6", "Name": "float_col", "PhysicalType": "FLOAT", "LogicalType": "NONE" },
{ "Id": "7", "Name": "double_col", "PhysicalType": "DOUBLE", "LogicalType": "NONE" },
{ "Id": "8", "Name": "date_string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" },
{ "Id": "9", "Name": "string_col", "PhysicalType": "BYTE_ARRAY", "LogicalType": "NONE" },
{ "Id": "10", "Name": "timestamp_col", "PhysicalType": "INT96", "LogicalType": "NONE" }
],
"RowGroups": [
{
"Id": "0", "TotalBytes": "671", "Rows": "8",
"ColumnChunks": [
{"Id": "0", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "73", "CompressedSize": "73" },
{"Id": "1", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "24", "CompressedSize": "24" },
{"Id": "2", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
{"Id": "3", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
{"Id": "4", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
{"Id": "5", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" },
{"Id": "6", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "47", "CompressedSize": "47" },
{"Id": "7", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "55", "CompressedSize": "55" },
{"Id": "8", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "88", "CompressedSize": "88" },
{"Id": "9", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "49", "CompressedSize": "49" },
{"Id": "10", "Values": "8", "StatsSet": "False",
"Compression": "UNCOMPRESSED", "Encodings": "RLE PLAIN_DICTIONARY PLAIN ", "UncompressedSize": "139", "CompressedSize": "139" }
]
}
]
}
)###";
std::stringstream ss;
// empty list means print all
std::list<int> columns;
auto reader =
ParquetFileReader::OpenFile(alltypes_plain(), false, default_reader_properties());
ParquetFilePrinter printer(reader.get());
printer.JSONPrint(ss, columns, "alltypes_plain.parquet");
ASSERT_EQ(jsonOutput, ss.str());
}
} // namespace parquet