blob: 57aa2ea36a30b4f1956364aa57878c4a49e005b0 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include "arrow/adapters/orc/adapter.h"
#include "arrow/api.h"
#include "arrow/csv/api.h"
#include "arrow/filesystem/api.h"
#include "arrow/io/api.h"
#include "arrow/stl.h"
#include "arrow/util/uri.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include "./util.h"
#include "graphar/graph_info.h"
#include "graphar/util/adj_list_type.h"
#include "graphar/util/general_params.h"
#include "graphar/util/yaml.h"
#include "graphar/writer/arrow_chunk_writer.h"
#include <catch2/catch_test_macros.hpp>
namespace graphar {
TEST_CASE_METHOD(GlobalFixture, "TestVertexPropertyWriter") {
std::string path = test_data_dir + "/ldbc_sample/person_0_0.csv";
arrow::io::IOContext io_context = arrow::io::default_io_context();
auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
std::shared_ptr<arrow::io::InputStream> input =
fs->OpenInputStream(path).ValueOrDie();
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
parse_options.delimiter = '|';
auto convert_options = arrow::csv::ConvertOptions::Defaults();
// Instantiate TableReader from input stream and options
auto maybe_reader = arrow::csv::TableReader::Make(
io_context, input, read_options, parse_options, convert_options);
REQUIRE(maybe_reader.ok());
std::shared_ptr<arrow::csv::TableReader> reader = *maybe_reader;
// Read table from CSV file
auto maybe_table = reader->Read();
REQUIRE(maybe_table.ok());
std::shared_ptr<arrow::Table> table = *maybe_table;
std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
// Construct the writer
std::string vertex_meta_file =
test_data_dir + "/ldbc_sample/parquet/" + "person.vertex.yml";
auto vertex_meta = Yaml::LoadFile(vertex_meta_file).value();
auto vertex_info = VertexInfo::Load(vertex_meta).value();
auto maybe_writer = VertexPropertyWriter::Make(vertex_info, "/tmp/");
REQUIRE(!maybe_writer.has_error());
auto writer = maybe_writer.value();
// Get & set validate level
REQUIRE(writer->GetValidateLevel() == ValidateLevel::no_validate);
writer->SetValidateLevel(ValidateLevel::strong_validate);
REQUIRE(writer->GetValidateLevel() == ValidateLevel::strong_validate);
// Valid cases
// Write the table
REQUIRE(writer->WriteTable(table, 0).ok());
// Write the number of vertices
REQUIRE(writer->WriteVerticesNum(table->num_rows()).ok());
// Check vertex count
input = fs->OpenInputStream("/tmp/vertex/person/vertex_count").ValueOrDie();
auto num = input->Read(sizeof(IdType)).ValueOrDie();
const IdType* ptr = reinterpret_cast<const IdType*>(num->data());
REQUIRE((*ptr) == table->num_rows());
// Invalid cases
// Invalid vertices number
REQUIRE(writer->WriteVerticesNum(-1).IsInvalid());
// Out of range
REQUIRE(writer->WriteChunk(table, 0).IsInvalid());
// Invalid chunk id
auto chunk = table->Slice(0, vertex_info->GetChunkSize());
REQUIRE(writer->WriteChunk(chunk, -1).IsIndexError());
// Invalid property group
Property p1("invalid_property", int32(), false);
auto pg1 = CreatePropertyGroup({p1}, FileType::CSV);
REQUIRE(writer->WriteTable(table, pg1, 0).IsKeyError());
// Property not found in table
std::shared_ptr<arrow::Table> tmp_table =
table->RenameColumns({"original_id", "firstName", "lastName", "id"})
.ValueOrDie();
auto pg2 = vertex_info->GetPropertyGroup("firstName");
REQUIRE(writer->WriteTable(tmp_table, pg2, 0).IsInvalid());
// Invalid data type
auto pg3 = vertex_info->GetPropertyGroup("id");
REQUIRE(writer->WriteTable(tmp_table, pg3, 0).IsTypeError());
SECTION("TestOrcParquetReader") {
arrow::Status st;
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::string path1 = test_data_dir + "/ldbc_sample/orc" +
"/vertex/person/firstName_lastName_gender/chunk1";
std::string path2 = test_data_dir + "/ldbc_sample/parquet" +
"/vertex/person/firstName_lastName_gender/chunk1";
arrow::io::IOContext io_context = arrow::io::default_io_context();
// Open ORC file reader
auto fs1 = arrow::fs::FileSystemFromUriOrPath(path1).ValueOrDie();
std::shared_ptr<arrow::io::RandomAccessFile> input1 =
fs1->OpenInputFile(path1).ValueOrDie();
std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader =
arrow::adapters::orc::ORCFileReader::Open(input1, pool).ValueOrDie();
// Read entire file as a single Arrow table
auto maybe_table = reader->Read();
std::shared_ptr<arrow::Table> table1 = maybe_table.ValueOrDie();
// Open Parquet file reader
auto fs2 = arrow::fs::FileSystemFromUriOrPath(path2).ValueOrDie();
std::shared_ptr<arrow::io::RandomAccessFile> input2 =
fs2->OpenInputFile(path2).ValueOrDie();
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
st = parquet::arrow::OpenFile(input2, pool, &arrow_reader);
// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table2;
st = arrow_reader->ReadTable(&table2);
REQUIRE(table1->GetColumnByName("firstName")->ToString() ==
table2->GetColumnByName("firstName")->ToString());
REQUIRE(table1->GetColumnByName("lastName")->ToString() ==
table2->GetColumnByName("lastName")->ToString());
REQUIRE(table1->GetColumnByName("gender")->ToString() ==
table2->GetColumnByName("gender")->ToString());
}
SECTION("TestEdgeChunkWriter") {
arrow::Status st;
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::string path = test_data_dir +
"/ldbc_sample/parquet/edge/person_knows_person/"
"unordered_by_source/adj_list/part0/chunk0";
auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
std::shared_ptr<arrow::io::RandomAccessFile> input =
fs->OpenInputFile(path).ValueOrDie();
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> maybe_table;
st = arrow_reader->ReadTable(&maybe_table);
REQUIRE(st.ok());
std::shared_ptr<arrow::Table> table =
maybe_table
->RenameColumns(
{GeneralParams::kSrcIndexCol, GeneralParams::kDstIndexCol})
.ValueOrDie();
std::cout << table->schema()->ToString() << std::endl;
std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
// Construct the writer
std::string edge_meta_file =
test_data_dir + "/ldbc_sample/csv/" + "person_knows_person.edge.yml";
auto edge_meta = Yaml::LoadFile(edge_meta_file).value();
auto edge_info = EdgeInfo::Load(edge_meta).value();
auto adj_list_type = AdjListType::ordered_by_source;
auto maybe_writer =
EdgeChunkWriter::Make(edge_info, "/tmp/", adj_list_type);
REQUIRE(!maybe_writer.has_error());
auto writer = maybe_writer.value();
// Get & set validate level
REQUIRE(writer->GetValidateLevel() == ValidateLevel::no_validate);
writer->SetValidateLevel(ValidateLevel::strong_validate);
REQUIRE(writer->GetValidateLevel() == ValidateLevel::strong_validate);
// Valid cases
// Write adj list of vertex chunk 0 to files
REQUIRE(writer->SortAndWriteAdjListTable(table, 0, 0).ok());
// Write number of edges for vertex chunk 0
REQUIRE(writer->WriteEdgesNum(0, table->num_rows()).ok());
// Write number of vertices
REQUIRE(writer->WriteVerticesNum(903).ok());
// Check the number of edges
std::shared_ptr<arrow::io::InputStream> input2 =
fs->OpenInputStream(
"/tmp/edge/person_knows_person/ordered_by_source/edge_count0")
.ValueOrDie();
auto edge_num = input2->Read(sizeof(IdType)).ValueOrDie();
const IdType* edge_num_ptr =
reinterpret_cast<const IdType*>(edge_num->data());
REQUIRE((*edge_num_ptr) == table->num_rows());
// Check the number of vertices
std::shared_ptr<arrow::io::InputStream> input3 =
fs->OpenInputStream(
"/tmp/edge/person_knows_person/ordered_by_source/vertex_count")
.ValueOrDie();
auto vertex_num = input3->Read(sizeof(IdType)).ValueOrDie();
const IdType* vertex_num_ptr =
reinterpret_cast<const IdType*>(vertex_num->data());
REQUIRE((*vertex_num_ptr) == 903);
// Invalid cases
// Invalid count or index
REQUIRE(writer->WriteEdgesNum(-1, 0).IsIndexError());
REQUIRE(writer->WriteEdgesNum(0, -1).IsIndexError());
REQUIRE(writer->WriteVerticesNum(-1).IsIndexError());
// Out of range
REQUIRE(writer->WriteOffsetChunk(table, 0).IsInvalid());
// Invalid chunk id
REQUIRE(writer->WriteAdjListChunk(table, -1, 0).IsIndexError());
REQUIRE(writer->WriteAdjListChunk(table, 0, -1).IsIndexError());
// Invalid adj list type
auto invalid_adj_list_type = AdjListType::unordered_by_dest;
auto maybe_writer2 =
EdgeChunkWriter::Make(edge_info, "/tmp/", invalid_adj_list_type);
REQUIRE(maybe_writer2.has_error());
// Invalid property group
Property p1("invalid_property", int32(), false);
auto pg1 = CreatePropertyGroup({p1}, FileType::CSV);
REQUIRE(writer->WritePropertyChunk(table, pg1, 0, 0).IsKeyError());
// Property not found in table
auto pg2 = edge_info->GetPropertyGroup("creationDate");
REQUIRE(writer->WritePropertyChunk(table, pg2, 0, 0).IsInvalid());
// Required columns not found
std::shared_ptr<arrow::Table> tmp_table =
table->RenameColumns({"creationDate", "tmp_property"}).ValueOrDie();
REQUIRE(writer->WriteAdjListChunk(tmp_table, 0, 0).IsInvalid());
// Invalid data type
REQUIRE(writer->WritePropertyChunk(tmp_table, pg2, 0, 0).IsTypeError());
}
}
} // namespace graphar