blob: 0048c2bcc22935d866cbcdaf650c79c1ef6ede7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifdef ARROW_ORC
#include "arrow/adapters/orc/adapter.h"
#endif
#include "arrow/api.h"
#include "arrow/csv/api.h"
#include "arrow/filesystem/api.h"
#include "arrow/io/api.h"
#include "parquet/arrow/reader.h"
#include "graphar/expression.h"
#include "graphar/filesystem.h"
#include "graphar/graph_info.h"
#include "graphar/reader_util.h"
#include "graphar/types.h"
namespace graphar::util {
/**
* @brief Checks whether the property names in the FilterOptions match the
* properties in the property group
*
* @param filter_options filter options
* @param property_group property group
* @return Status error if the property names in the FilterOptions do not match
*/
Status CheckFilterOptions(
const FilterOptions& filter_options,
const std::shared_ptr<PropertyGroup>& property_group) noexcept {
if (filter_options.filter) {
GAR_ASSIGN_OR_RAISE(auto filter, filter_options.filter->Evaluate());
for (const auto& field : arrow::compute::FieldsInExpression(filter)) {
auto property_name = *field.name();
if (!property_group->HasProperty(property_name)) {
return Status::Invalid(
property_name, " in the filter does not match the property group: ",
property_group);
}
}
}
if (filter_options.columns.has_value()) {
for (const auto& col : filter_options.columns.value().get()) {
if (!property_group->HasProperty(col)) {
return Status::Invalid(
col, " in the columns does not match the property group: ",
property_group);
}
}
}
return Status::OK();
}
/**
* @brief parse the vertex id to related adj list offset
*
* @param edge_info edge info
* @param vertex_chunk_size vertex chunk size
* @param prefix prefix of the payload files
* @param adj_list_type adj list type to find the offset
* @param vid vertex id
*
* @return tuple of <begin offset, end offset>
*/
Result<std::pair<IdType, IdType>> GetAdjListOffsetOfVertex(
const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
AdjListType adj_list_type, IdType vid) noexcept {
// get the adj list offset of id
IdType vertex_chunk_size;
if (adj_list_type == AdjListType::ordered_by_source) {
vertex_chunk_size = edge_info->GetSrcChunkSize();
} else if (adj_list_type == AdjListType::ordered_by_dest) {
vertex_chunk_size = edge_info->GetDstChunkSize();
} else {
return Status::Invalid(
"The adj list type has to be ordered_by_source or ordered_by_dest, but "
"got ",
std::string(AdjListTypeToString(adj_list_type)));
}
IdType offset_chunk_index = vid / vertex_chunk_size;
IdType offset_in_file = vid % vertex_chunk_size;
GAR_ASSIGN_OR_RAISE(
auto offset_file_path,
edge_info->GetAdjListOffsetFilePath(offset_chunk_index, adj_list_type));
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
auto adjacent_list = edge_info->GetAdjacentList(adj_list_type);
if (adjacent_list == nullptr) {
return Status::Invalid("The adjacent list is not set for adj list type ",
std::string(AdjListTypeToString(adj_list_type)));
}
auto file_type = adjacent_list->GetFileType();
std::string path = out_prefix + offset_file_path;
GAR_ASSIGN_OR_RAISE(auto table, fs->ReadFileToTable(path, file_type));
auto array = std::static_pointer_cast<arrow::Int64Array>(
table->column(0)->Slice(offset_in_file, 2)->chunk(0));
return std::make_pair(static_cast<IdType>(array->Value(0)),
static_cast<IdType>(array->Value(1)));
}
Result<IdType> GetVertexChunkNum(
const std::string& prefix,
const std::shared_ptr<VertexInfo>& vertex_info) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(auto vertex_num_file_suffix,
vertex_info->GetVerticesNumFilePath());
std::string vertex_num_file_path = out_prefix + vertex_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto vertex_num,
fs->ReadFileToValue<IdType>(vertex_num_file_path));
return (vertex_num + vertex_info->GetChunkSize() - 1) /
vertex_info->GetChunkSize();
}
Result<IdType> GetVertexNum(
const std::string& prefix,
const std::shared_ptr<VertexInfo>& vertex_info) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(auto vertex_num_file_suffix,
vertex_info->GetVerticesNumFilePath());
std::string vertex_num_file_path = out_prefix + vertex_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto vertex_num,
fs->ReadFileToValue<IdType>(vertex_num_file_path));
return vertex_num;
}
Result<IdType> GetVertexChunkNum(const std::string& prefix,
const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(auto vertex_num_file_suffix,
edge_info->GetVerticesNumFilePath(adj_list_type));
std::string vertex_num_file_path = out_prefix + vertex_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto vertex_num,
fs->ReadFileToValue<IdType>(vertex_num_file_path));
IdType chunk_size;
if (adj_list_type == AdjListType::ordered_by_source ||
adj_list_type == AdjListType::unordered_by_source) {
chunk_size = edge_info->GetSrcChunkSize();
} else {
chunk_size = edge_info->GetDstChunkSize();
}
return (vertex_num + chunk_size - 1) / chunk_size;
}
Result<IdType> GetVertexNum(const std::string& prefix,
const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(auto vertex_num_file_suffix,
edge_info->GetVerticesNumFilePath(adj_list_type));
std::string vertex_num_file_path = out_prefix + vertex_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto vertex_num,
fs->ReadFileToValue<IdType>(vertex_num_file_path));
return vertex_num;
}
Result<IdType> GetEdgeChunkNum(const std::string& prefix,
const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type,
IdType vertex_chunk_index) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(
auto edge_num_file_suffix,
edge_info->GetEdgesNumFilePath(vertex_chunk_index, adj_list_type));
std::string edge_num_file_path = out_prefix + edge_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto edge_num,
fs->ReadFileToValue<IdType>(edge_num_file_path));
return (edge_num + edge_info->GetChunkSize() - 1) / edge_info->GetChunkSize();
}
Result<IdType> GetEdgeNum(const std::string& prefix,
const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type,
IdType vertex_chunk_index) noexcept {
std::string out_prefix;
GAR_ASSIGN_OR_RAISE(auto fs, FileSystemFromUriOrPath(prefix, &out_prefix));
GAR_ASSIGN_OR_RAISE(
auto edge_num_file_suffix,
edge_info->GetEdgesNumFilePath(vertex_chunk_index, adj_list_type));
std::string edge_num_file_path = out_prefix + edge_num_file_suffix;
GAR_ASSIGN_OR_RAISE(auto edge_num,
fs->ReadFileToValue<IdType>(edge_num_file_path));
return edge_num;
}
} // namespace graphar::util