blob: 2d0dcb762c205f653ec9691142b6e775845eef94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "arrow/api.h"
#include "gar/util/convert_to_arrow_type.h"
#include "gar/util/general_params.h"
#include "gar/util/result.h"
#include "gar/writer/edges_builder.h"
namespace graphar {
namespace builder {
Status EdgesBuilder::Dump() {
// construct the writer
EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, validate_level_);
// construct empty edge collections for vertex chunks without edges
IdType num_vertex_chunks =
(num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
for (IdType i = 0; i < num_vertex_chunks; i++)
if (edges_.find(i) == edges_.end()) {
std::vector<Edge> empty_chunk_edges;
edges_[i] = empty_chunk_edges;
}
// dump the offsets
if (adj_list_type_ == AdjListType::ordered_by_source ||
adj_list_type_ == AdjListType::ordered_by_dest) {
for (auto& chunk_edges : edges_) {
IdType vertex_chunk_index = chunk_edges.first;
// sort the edges
if (adj_list_type_ == AdjListType::ordered_by_source)
sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_src);
if (adj_list_type_ == AdjListType::ordered_by_dest)
sort(chunk_edges.second.begin(), chunk_edges.second.end(), cmp_dst);
// construct and write offset chunk
GAR_ASSIGN_OR_RAISE(
auto offset_table,
getOffsetTable(vertex_chunk_index, chunk_edges.second));
GAR_RETURN_NOT_OK(
writer.WriteOffsetChunk(offset_table, vertex_chunk_index));
}
}
// dump the vertex num
GAR_RETURN_NOT_OK(writer.WriteVerticesNum(num_vertices_));
// dump the edge nums
IdType vertex_chunk_num =
(num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
for (IdType vertex_chunk_index = 0; vertex_chunk_index < vertex_chunk_num;
vertex_chunk_index++) {
if (edges_.find(vertex_chunk_index) == edges_.end()) {
GAR_RETURN_NOT_OK(writer.WriteEdgesNum(vertex_chunk_index, 0));
} else {
GAR_RETURN_NOT_OK(writer.WriteEdgesNum(
vertex_chunk_index, edges_[vertex_chunk_index].size()));
}
}
// dump the edges
for (auto& chunk_edges : edges_) {
IdType vertex_chunk_index = chunk_edges.first;
// convert to table
GAR_ASSIGN_OR_RAISE(auto input_table, convertToTable(chunk_edges.second));
// write table
GAR_RETURN_NOT_OK(writer.WriteTable(input_table, vertex_chunk_index, 0));
chunk_edges.second.clear();
}
is_saved_ = true;
return Status::OK();
}
Status EdgesBuilder::validate(const Edge& e,
ValidateLevel validate_level) const {
// use the builder's validate level
if (validate_level == ValidateLevel::default_validate)
validate_level = validate_level_;
// no validate
if (validate_level == ValidateLevel::no_validate)
return Status::OK();
// weak validate
// can not add new edges after dumping
if (is_saved_) {
return Status::Invalid(
"The edge builder has been saved, can not add "
"new edges any more");
}
// adj list type not exits in edge info
if (!edge_info_->HasAdjacentListType(adj_list_type_)) {
return Status::KeyError(
"Adj list type ", AdjListTypeToString(adj_list_type_),
" does not exist in the ", edge_info_->GetEdgeLabel(), " edge info.");
}
// strong validate
if (validate_level == ValidateLevel::strong_validate) {
for (auto& property : e.GetProperties()) {
// check if the property is contained
if (!edge_info_->HasProperty(property.first)) {
return Status::KeyError("Property with name ", property.first,
" is not contained in the ",
edge_info_->GetEdgeLabel(), " edge info.");
}
// check if the property type is correct
auto type = edge_info_->GetPropertyType(property.first).value();
bool invalid_type = false;
switch (type->id()) {
case Type::BOOL:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::BOOL>::CType)) {
invalid_type = true;
}
break;
case Type::INT32:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::INT32>::CType)) {
invalid_type = true;
}
break;
case Type::INT64:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::INT64>::CType)) {
invalid_type = true;
}
break;
case Type::FLOAT:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::FLOAT>::CType)) {
invalid_type = true;
}
break;
case Type::DOUBLE:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::DOUBLE>::CType)) {
invalid_type = true;
}
break;
case Type::STRING:
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::STRING>::CType)) {
invalid_type = true;
}
break;
case Type::DATE:
// date is stored as int32_t
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::DATE>::CType::c_type)) {
invalid_type = true;
}
break;
case Type::TIMESTAMP:
// timestamp is stored as int64_t
if (property.second.type() !=
typeid(typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type)) {
invalid_type = true;
}
break;
default:
return Status::TypeError("Unsupported property type.");
}
if (invalid_type) {
return Status::TypeError(
"Invalid data type for property ", property.first + ", defined as ",
type->ToTypeName(), ", but got ", property.second.type().name());
}
}
}
return Status::OK();
}
template <Type type>
Status EdgesBuilder::tryToAppend(
const std::string& property_name,
std::shared_ptr<arrow::Array>& array, // NOLINT
const std::vector<Edge>& edges) {
using CType = typename TypeToArrowType<type>::CType;
arrow::MemoryPool* pool = arrow::default_memory_pool();
typename TypeToArrowType<type>::BuilderType builder(pool);
for (const auto& e : edges) {
if (e.Empty() || (!e.ContainProperty(property_name))) {
RETURN_NOT_ARROW_OK(builder.AppendNull());
} else {
RETURN_NOT_ARROW_OK(
builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
}
}
array = builder.Finish().ValueOrDie();
return Status::OK();
}
template <>
Status EdgesBuilder::tryToAppend<Type::TIMESTAMP>(
const std::string& property_name,
std::shared_ptr<arrow::Array>& array, // NOLINT
const std::vector<Edge>& edges) {
using CType = typename TypeToArrowType<Type::TIMESTAMP>::CType::c_type;
arrow::MemoryPool* pool = arrow::default_memory_pool();
typename TypeToArrowType<Type::TIMESTAMP>::BuilderType builder(
arrow::timestamp(arrow::TimeUnit::MILLI), pool);
for (const auto& e : edges) {
if (e.Empty() || (!e.ContainProperty(property_name))) {
RETURN_NOT_ARROW_OK(builder.AppendNull());
} else {
RETURN_NOT_ARROW_OK(
builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
}
}
array = builder.Finish().ValueOrDie();
return Status::OK();
}
template <>
Status EdgesBuilder::tryToAppend<Type::DATE>(
const std::string& property_name,
std::shared_ptr<arrow::Array>& array, // NOLINT
const std::vector<Edge>& edges) {
using CType = typename TypeToArrowType<Type::DATE>::CType::c_type;
arrow::MemoryPool* pool = arrow::default_memory_pool();
typename TypeToArrowType<Type::DATE>::BuilderType builder(pool);
for (const auto& e : edges) {
if (e.Empty() || (!e.ContainProperty(property_name))) {
RETURN_NOT_ARROW_OK(builder.AppendNull());
} else {
RETURN_NOT_ARROW_OK(
builder.Append(std::any_cast<CType>(e.GetProperty(property_name))));
}
}
array = builder.Finish().ValueOrDie();
return Status::OK();
}
Status EdgesBuilder::appendToArray(
const std::shared_ptr<DataType>& type, const std::string& property_name,
std::shared_ptr<arrow::Array>& array, // NOLINT
const std::vector<Edge>& edges) {
switch (type->id()) {
case Type::BOOL:
return tryToAppend<Type::BOOL>(property_name, array, edges);
case Type::INT32:
return tryToAppend<Type::INT32>(property_name, array, edges);
case Type::INT64:
return tryToAppend<Type::INT64>(property_name, array, edges);
case Type::FLOAT:
return tryToAppend<Type::FLOAT>(property_name, array, edges);
case Type::DOUBLE:
return tryToAppend<Type::DOUBLE>(property_name, array, edges);
case Type::STRING:
return tryToAppend<Type::STRING>(property_name, array, edges);
case Type::DATE:
return tryToAppend<Type::DATE>(property_name, array, edges);
case Type::TIMESTAMP:
return tryToAppend<Type::TIMESTAMP>(property_name, array, edges);
default:
return Status::TypeError("Unsupported property type.");
}
return Status::OK();
}
Status EdgesBuilder::tryToAppend(
int src_or_dest,
std::shared_ptr<arrow::Array>& array, // NOLINT
const std::vector<Edge>& edges) {
arrow::MemoryPool* pool = arrow::default_memory_pool();
typename arrow::TypeTraits<arrow::Int64Type>::BuilderType builder(pool);
for (const auto& e : edges) {
RETURN_NOT_ARROW_OK(builder.Append(std::any_cast<int64_t>(
src_or_dest == 1 ? e.GetSource() : e.GetDestination())));
}
array = builder.Finish().ValueOrDie();
return Status::OK();
}
Result<std::shared_ptr<arrow::Table>> EdgesBuilder::convertToTable(
const std::vector<Edge>& edges) {
const auto& property_groups = edge_info_->GetPropertyGroups();
std::vector<std::shared_ptr<arrow::Array>> arrays;
std::vector<std::shared_ptr<arrow::Field>> schema_vector;
// add src
std::shared_ptr<arrow::Array> array;
schema_vector.push_back(arrow::field(
GeneralParams::kSrcIndexCol, DataType::DataTypeToArrowDataType(int64())));
GAR_RETURN_NOT_OK(tryToAppend(1, array, edges));
arrays.push_back(array);
// add dst
schema_vector.push_back(arrow::field(
GeneralParams::kDstIndexCol, DataType::DataTypeToArrowDataType(int64())));
GAR_RETURN_NOT_OK(tryToAppend(0, array, edges));
arrays.push_back(array);
// add properties
for (auto& property_group : property_groups) {
for (auto& property : property_group->GetProperties()) {
// add a column to schema
schema_vector.push_back(arrow::field(
property.name, DataType::DataTypeToArrowDataType(property.type)));
// add a column to data
std::shared_ptr<arrow::Array> array;
GAR_RETURN_NOT_OK(
appendToArray(property.type, property.name, array, edges));
arrays.push_back(array);
}
}
auto schema = std::make_shared<arrow::Schema>(schema_vector);
return arrow::Table::Make(schema, arrays);
}
Result<std::shared_ptr<arrow::Table>> EdgesBuilder::getOffsetTable(
IdType vertex_chunk_index, const std::vector<Edge>& edges) {
arrow::Int64Builder builder;
IdType begin_index = vertex_chunk_index * vertex_chunk_size_,
end_index = begin_index + vertex_chunk_size_;
RETURN_NOT_ARROW_OK(builder.Append(0));
std::vector<std::shared_ptr<arrow::Array>> arrays;
std::vector<std::shared_ptr<arrow::Field>> schema_vector;
schema_vector.push_back(arrow::field(
GeneralParams::kOffsetCol, DataType::DataTypeToArrowDataType(int64())));
size_t index = 0;
for (IdType i = begin_index; i < end_index; i++) {
while (index < edges.size()) {
int64_t x = (adj_list_type_ == AdjListType::ordered_by_source
? edges[index].GetSource()
: edges[index].GetDestination());
if (x <= i) {
index++;
} else {
break;
}
}
RETURN_NOT_ARROW_OK(builder.Append(index));
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish());
arrays.push_back(array);
auto schema = std::make_shared<arrow::Schema>(schema_vector);
return arrow::Table::Make(schema, arrays);
}
} // namespace builder
} // namespace graphar