| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <iostream> |
| |
| #include "arrow/api.h" |
| #include "arrow/compute/api.h" |
| #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 |
| #include "arrow/acero/exec_plan.h" |
| #else |
| #include "arrow/compute/exec/exec_plan.h" |
| #endif |
| #include "arrow/dataset/dataset.h" |
| #include "arrow/dataset/file_base.h" |
| #include "arrow/dataset/file_parquet.h" |
| #include "arrow/dataset/plan.h" |
| #include "arrow/dataset/scanner.h" |
| |
| #include "graphar/graph_info.h" |
| #include "graphar/util/adj_list_type.h" |
| #include "graphar/util/data_type.h" |
| #include "graphar/util/filesystem.h" |
| #include "graphar/util/general_params.h" |
| #include "graphar/util/result.h" |
| #include "graphar/util/status.h" |
| #include "graphar/util/util.h" |
| #include "graphar/writer/arrow_chunk_writer.h" |
| |
| namespace graphar { |
| // common methods |
| |
| #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 |
| namespace arrow_acero_namespace = arrow::acero; |
| #else |
| namespace arrow_acero_namespace = arrow::compute; |
| #endif |
| |
| #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000 |
| using AsyncGeneratorType = |
| arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>; |
| #else |
| using AsyncGeneratorType = |
| arrow::AsyncGenerator<arrow::util::optional<arrow::compute::ExecBatch>>; |
| #endif |
| |
| /** |
| * @brief Execute a compute plan and collect the results as a table. |
| * |
| * @param exec_context The execution context. |
| * @param plan The compute pan to execute. |
| * @param schema The schema the input table. |
| * @param sink_gen The async generator. |
| */ |
| Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable( |
| const arrow::compute::ExecContext& exec_context, |
| std::shared_ptr<arrow_acero_namespace::ExecPlan> plan, |
| std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) { |
| // translate sink_gen (async) to sink_reader (sync) |
| std::shared_ptr<arrow::RecordBatchReader> sink_reader = |
| arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen), |
| exec_context.memory_pool()); |
| |
| // validate the ExecPlan |
| RETURN_NOT_ARROW_OK(plan->Validate()); |
| // start the ExecPlan |
| #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 |
| plan->StartProducing(); // arrow 12.0.0 or later return void, not Status |
| #else |
| RETURN_NOT_ARROW_OK(plan->StartProducing()); |
| #endif |
| |
| // collect sink_reader into a Table |
| std::shared_ptr<arrow::Table> response_table; |
| GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( |
| response_table, arrow::Table::FromRecordBatchReader(sink_reader.get())); |
| |
| // stop producing |
| plan->StopProducing(); |
| // plan mark finished |
| RETURN_NOT_ARROW_OK(plan->finished().status()); |
| return response_table; |
| } |
| |
| // implementations for VertexPropertyChunkWriter |
| |
| VertexPropertyWriter::VertexPropertyWriter( |
| const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix, |
| const ValidateLevel& validate_level) |
| : vertex_info_(vertex_info), |
| prefix_(prefix), |
| validate_level_(validate_level) { |
| if (validate_level_ == ValidateLevel::default_validate) { |
| throw std::runtime_error( |
| "default_validate is not allowed to be set as the global validate " |
| "level for VertexPropertyWriter"); |
| } |
| GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); |
| } |
| |
| // Check if the operation of writing vertices number is allowed. |
| Status VertexPropertyWriter::validate(const IdType& count, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // weak & strong validate |
| if (count < 0) { |
| return Status::Invalid("The number of vertices is negative."); |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of copying a file as a chunk is allowed. |
| Status VertexPropertyWriter::validate( |
| const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // weak & strong validate |
| if (!vertex_info_->HasPropertyGroup(property_group)) { |
| return Status::KeyError("The property group", " does not exist in ", |
| vertex_info_->GetLabel(), " vertex info."); |
| } |
| if (chunk_index < 0) { |
| return Status::IndexError("Negative chunk index ", chunk_index, "."); |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of writing a table as a chunk is allowed. |
| Status VertexPropertyWriter::validate( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) { |
| validate_level = validate_level_; |
| } |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) { |
| return Status::OK(); |
| } |
| // validate property_group & chunk_index |
| GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level)); |
| // weak validate for the input_table |
| if (input_table->num_rows() > vertex_info_->GetChunkSize()) { |
| return Status::Invalid("The number of rows of input table is ", |
| input_table->num_rows(), |
| " which is larger than the vertex chunk size", |
| vertex_info_->GetChunkSize(), "."); |
| } |
| // strong validate for the input_table |
| if (validate_level == ValidateLevel::strong_validate) { |
| // validate the input table |
| RETURN_NOT_ARROW_OK(input_table->Validate()); |
| // validate the schema |
| auto schema = input_table->schema(); |
| for (auto& property : property_group->GetProperties()) { |
| int indice = schema->GetFieldIndex(property.name); |
| if (indice == -1) { |
| return Status::Invalid("Column named ", property.name, |
| " of property group ", property_group, |
| " does not exist in the input table."); |
| } |
| auto field = schema->field(indice); |
| if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) { |
| return Status::TypeError( |
| "The data type of property: ", property.name, " is ", |
| property.type->ToTypeName(), ", but got ", |
| DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(), |
| "."); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VertexPropertyWriter::WriteVerticesNum( |
| const IdType& count, ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(count, validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, vertex_info_->GetVerticesNumFilePath()); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteValueToFile<IdType>(count, path); |
| } |
| |
| Status VertexPropertyWriter::WriteChunk( |
| const std::string& file_name, |
| const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(property_group, chunk_index, validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, |
| vertex_info_->GetFilePath(property_group, chunk_index)); |
| std::string path = prefix_ + suffix; |
| return fs_->CopyFile(file_name, path); |
| } |
| |
| Status VertexPropertyWriter::WriteChunk( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK( |
| validate(input_table, property_group, chunk_index, validate_level)); |
| auto file_type = property_group->GetFileType(); |
| auto schema = input_table->schema(); |
| int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); |
| if (indice == -1) { |
| return Status::Invalid("The internal id Column named ", |
| GeneralParams::kVertexIndexCol, |
| " does not exist in the input table."); |
| } |
| |
| std::vector<int> indices({indice}); |
| for (auto& property : property_group->GetProperties()) { |
| int indice = schema->GetFieldIndex(property.name); |
| if (indice == -1) { |
| return Status::Invalid("Column named ", property.name, |
| " of property group ", property_group, |
| " of vertex ", vertex_info_->GetLabel(), |
| " does not exist in the input table."); |
| } |
| indices.push_back(indice); |
| } |
| GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table, |
| input_table->SelectColumns(indices)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, |
| vertex_info_->GetFilePath(property_group, chunk_index)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteTableToFile(in_table, file_type, path); |
| } |
| |
| Status VertexPropertyWriter::WriteChunk( |
| const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| auto property_groups = vertex_info_->GetPropertyGroups(); |
| for (auto& property_group : property_groups) { |
| GAR_RETURN_NOT_OK( |
| WriteChunk(input_table, property_group, chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status VertexPropertyWriter::WriteTable( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| auto schema = input_table->schema(); |
| int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); |
| auto table_with_index = input_table; |
| if (indice == -1) { |
| // add index column |
| GAR_ASSIGN_OR_RAISE(table_with_index, |
| addIndexColumn(input_table, start_chunk_index, |
| vertex_info_->GetChunkSize())); |
| } |
| IdType chunk_size = vertex_info_->GetChunkSize(); |
| int64_t length = table_with_index->num_rows(); |
| IdType chunk_index = start_chunk_index; |
| for (int64_t offset = 0; offset < length; |
| offset += chunk_size, chunk_index++) { |
| auto in_chunk = table_with_index->Slice(offset, chunk_size); |
| GAR_RETURN_NOT_OK( |
| WriteChunk(in_chunk, property_group, chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status VertexPropertyWriter::WriteTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index, |
| ValidateLevel validate_level) const { |
| auto property_groups = vertex_info_->GetPropertyGroups(); |
| GAR_ASSIGN_OR_RAISE(auto table_with_index, |
| addIndexColumn(input_table, start_chunk_index, |
| vertex_info_->GetChunkSize())); |
| for (auto& property_group : property_groups) { |
| GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, |
| start_chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make( |
| const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix, |
| const ValidateLevel& validate_level) { |
| return std::make_shared<VertexPropertyWriter>(vertex_info, prefix, |
| validate_level); |
| } |
| |
| Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make( |
| const std::shared_ptr<GraphInfo>& graph_info, const std::string& label, |
| const ValidateLevel& validate_level) { |
| auto vertex_info = graph_info->GetVertexInfo(label); |
| if (!vertex_info) { |
| return Status::KeyError("The vertex ", label, " doesn't exist."); |
| } |
| return Make(vertex_info, graph_info->GetPrefix(), validate_level); |
| } |
| |
| Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::addIndexColumn( |
| const std::shared_ptr<arrow::Table>& table, IdType chunk_index, |
| IdType chunk_size) const { |
| arrow::Int64Builder array_builder; |
| RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size)); |
| int64_t length = table->num_rows(); |
| for (IdType i = 0; i < length; i++) { |
| RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i)); |
| } |
| std::shared_ptr<arrow::Array> array; |
| RETURN_NOT_ARROW_OK(array_builder.Finish(&array)); |
| std::shared_ptr<arrow::ChunkedArray> chunked_array = |
| std::make_shared<arrow::ChunkedArray>(array); |
| GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( |
| auto ret, table->AddColumn(0, |
| arrow::field(GeneralParams::kVertexIndexCol, |
| arrow::int64(), false), |
| chunked_array)); |
| return ret; |
| } |
| |
| // implementations for EdgeChunkWriter |
| |
| EdgeChunkWriter::EdgeChunkWriter(const std::shared_ptr<EdgeInfo>& edge_info, |
| const std::string& prefix, |
| AdjListType adj_list_type, |
| const ValidateLevel& validate_level) |
| : edge_info_(edge_info), |
| adj_list_type_(adj_list_type), |
| validate_level_(validate_level) { |
| if (validate_level_ == ValidateLevel::default_validate) { |
| throw std::runtime_error( |
| "default_validate is not allowed to be set as the global validate " |
| "level for EdgeChunkWriter"); |
| } |
| GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); |
| chunk_size_ = edge_info_->GetChunkSize(); |
| switch (adj_list_type) { |
| case AdjListType::unordered_by_source: |
| vertex_chunk_size_ = edge_info_->GetSrcChunkSize(); |
| break; |
| case AdjListType::ordered_by_source: |
| vertex_chunk_size_ = edge_info_->GetSrcChunkSize(); |
| break; |
| case AdjListType::unordered_by_dest: |
| vertex_chunk_size_ = edge_info_->GetDstChunkSize(); |
| break; |
| case AdjListType::ordered_by_dest: |
| vertex_chunk_size_ = edge_info_->GetDstChunkSize(); |
| break; |
| default: |
| vertex_chunk_size_ = edge_info_->GetSrcChunkSize(); |
| } |
| } |
| // Check if the operation of writing number or copying a file is allowed. |
| Status EdgeChunkWriter::validate(IdType count_or_index1, IdType count_or_index2, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // weak & strong validate for adj list type |
| if (!edge_info_->HasAdjacentListType(adj_list_type_)) { |
| return Status::KeyError( |
| "Adj list type ", AdjListTypeToString(adj_list_type_), |
| " does not exist in the ", edge_info_->GetEdgeLabel(), " edge info."); |
| } |
| // weak & strong validate for count or index |
| if (count_or_index1 < 0 || count_or_index2 < 0) { |
| return Status::IndexError( |
| "The count or index must be non-negative, but got ", count_or_index1, |
| " and ", count_or_index2, "."); |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of copying a file as a property chunk is allowed. |
| Status EdgeChunkWriter::validate( |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // validate for adj list type & index |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level)); |
| // weak & strong validate for property group |
| if (!edge_info_->HasPropertyGroup(property_group)) { |
| return Status::KeyError("Property group", " does not exist in the ", |
| edge_info_->GetEdgeLabel(), " edge info."); |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of writing a table as an offset chunk is allowed. |
| Status EdgeChunkWriter::validate( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // validate for adj list type & index |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level)); |
| // weak validate for the input table |
| if (adj_list_type_ != AdjListType::ordered_by_source && |
| adj_list_type_ != AdjListType::ordered_by_dest) { |
| return Status::Invalid( |
| "The adj list type has to be ordered_by_source or ordered_by_dest, but " |
| "got " + |
| std::string(AdjListTypeToString(adj_list_type_))); |
| } |
| if (adj_list_type_ == AdjListType::ordered_by_source && |
| input_table->num_rows() > edge_info_->GetSrcChunkSize() + 1) { |
| return Status::Invalid( |
| "The number of rows of input offset table is ", input_table->num_rows(), |
| " which is larger than the offset size of source vertex chunk ", |
| edge_info_->GetSrcChunkSize() + 1, "."); |
| } |
| if (adj_list_type_ == AdjListType::ordered_by_dest && |
| input_table->num_rows() > edge_info_->GetDstChunkSize() + 1) { |
| return Status::Invalid( |
| "The number of rows of input offset table is ", input_table->num_rows(), |
| " which is larger than the offset size of destination vertex chunk ", |
| edge_info_->GetSrcChunkSize() + 1, "."); |
| } |
| // strong validate for the input_table |
| if (validate_level == ValidateLevel::strong_validate) { |
| // validate the input table |
| RETURN_NOT_ARROW_OK(input_table->Validate()); |
| // validate the schema |
| auto schema = input_table->schema(); |
| int index = schema->GetFieldIndex(GeneralParams::kOffsetCol); |
| if (index == -1) { |
| return Status::Invalid("The offset column ", GeneralParams::kOffsetCol, |
| " does not exist in the input table"); |
| } |
| auto field = schema->field(index); |
| if (field->type()->id() != arrow::Type::INT64) { |
| return Status::TypeError( |
| "The data type for offset column should be INT64, but got ", |
| field->type()->name()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of writing a table as an adj list chunk is allowed. |
| Status EdgeChunkWriter::validate( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType chunk_index, ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // validate for adj list type & index |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level)); |
| // weak validate for the input table |
| if (input_table->num_rows() > edge_info_->GetChunkSize()) { |
| return Status::Invalid( |
| "The number of rows of input table is ", input_table->num_rows(), |
| " which is larger than the ", edge_info_->GetEdgeLabel(), |
| " edge chunk size ", edge_info_->GetChunkSize(), "."); |
| } |
| // strong validate for the input table |
| if (validate_level == ValidateLevel::strong_validate) { |
| auto schema = input_table->schema(); |
| int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol); |
| if (index == -1) { |
| return Status::Invalid("The source index column ", |
| GeneralParams::kSrcIndexCol, |
| " does not exist in the input table"); |
| } |
| auto field = schema->field(index); |
| if (field->type()->id() != arrow::Type::INT64) { |
| return Status::TypeError( |
| "The data type for source index column should be INT64, but got ", |
| field->type()->name()); |
| } |
| index = schema->GetFieldIndex(GeneralParams::kDstIndexCol); |
| if (index == -1) { |
| return Status::Invalid("The destination index column ", |
| GeneralParams::kDstIndexCol, |
| " does not exist in the input table"); |
| } |
| field = schema->field(index); |
| if (field->type()->id() != arrow::Type::INT64) { |
| return Status::TypeError( |
| "The data type for destination index column should be INT64, but " |
| "got ", |
| field->type()->name()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Check if the operation of writing a table as a property chunk is allowed. |
| Status EdgeChunkWriter::validate( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| // use the writer's validate level |
| if (validate_level == ValidateLevel::default_validate) |
| validate_level = validate_level_; |
| // no validate |
| if (validate_level == ValidateLevel::no_validate) |
| return Status::OK(); |
| // validate for property group, adj list type & index |
| GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index, |
| validate_level)); |
| // weak validate for the input table |
| if (input_table->num_rows() > edge_info_->GetChunkSize()) { |
| return Status::Invalid( |
| "The number of rows of input table is ", input_table->num_rows(), |
| " which is larger than the ", edge_info_->GetEdgeLabel(), |
| " edge chunk size ", edge_info_->GetChunkSize(), "."); |
| } |
| // strong validate for the input table |
| if (validate_level == ValidateLevel::strong_validate) { |
| // validate the input table |
| RETURN_NOT_ARROW_OK(input_table->Validate()); |
| // validate the schema |
| auto schema = input_table->schema(); |
| for (auto& property : property_group->GetProperties()) { |
| int indice = schema->GetFieldIndex(property.name); |
| if (indice == -1) { |
| return Status::Invalid("Column named ", property.name, |
| " of property group ", property_group, |
| " does not exist in the input table."); |
| } |
| auto field = schema->field(indice); |
| if (DataType::ArrowDataTypeToDataType(field->type()) != property.type) { |
| return Status::TypeError( |
| "The data type of property: ", property.name, " is ", |
| property.type->ToTypeName(), ", but got ", |
| DataType::ArrowDataTypeToDataType(field->type())->ToTypeName(), |
| "."); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::WriteEdgesNum(IdType vertex_chunk_index, |
| const IdType& count, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, count, validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetEdgesNumFilePath( |
| vertex_chunk_index, adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteValueToFile<IdType>(count, path); |
| } |
| |
| Status EdgeChunkWriter::WriteVerticesNum(const IdType& count, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(0, count, validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, |
| edge_info_->GetVerticesNumFilePath(adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteValueToFile<IdType>(count, path); |
| } |
| |
| Status EdgeChunkWriter::WriteOffsetChunk(const std::string& file_name, |
| IdType vertex_chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, 0, validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath( |
| vertex_chunk_index, adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->CopyFile(file_name, path); |
| } |
| |
| Status EdgeChunkWriter::WriteAdjListChunk(const std::string& file_name, |
| IdType vertex_chunk_index, |
| IdType chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(vertex_chunk_index, chunk_index, validate_level)); |
| GAR_ASSIGN_OR_RAISE( |
| auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index, |
| chunk_index, adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->CopyFile(file_name, path); |
| } |
| |
| Status EdgeChunkWriter::WritePropertyChunk( |
| const std::string& file_name, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(property_group, vertex_chunk_index, chunk_index, |
| validate_level)); |
| GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetPropertyFilePath( |
| property_group, adj_list_type_, |
| vertex_chunk_index, chunk_index)); |
| std::string path = prefix_ + suffix; |
| return fs_->CopyFile(file_name, path); |
| } |
| |
| Status EdgeChunkWriter::WriteOffsetChunk( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(input_table, vertex_chunk_index, validate_level)); |
| auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); |
| auto schema = input_table->schema(); |
| int index = schema->GetFieldIndex(GeneralParams::kOffsetCol); |
| if (index == -1) { |
| return Status::Invalid("The offset column ", GeneralParams::kOffsetCol, |
| " does not exist in the input table"); |
| } |
| auto in_table = input_table->SelectColumns({index}).ValueOrDie(); |
| GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath( |
| vertex_chunk_index, adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteTableToFile(in_table, file_type, path); |
| } |
| |
| Status EdgeChunkWriter::WriteAdjListChunk( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType chunk_index, ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK( |
| validate(input_table, vertex_chunk_index, chunk_index, validate_level)); |
| auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); |
| std::vector<int> indices; |
| indices.clear(); |
| auto schema = input_table->schema(); |
| int index = schema->GetFieldIndex(GeneralParams::kSrcIndexCol); |
| if (index == -1) { |
| return Status::Invalid("The source index column ", |
| GeneralParams::kSrcIndexCol, |
| " does not exist in the input table"); |
| } |
| indices.push_back(index); |
| index = schema->GetFieldIndex(GeneralParams::kDstIndexCol); |
| if (index == -1) { |
| return Status::Invalid("The destination index column ", |
| GeneralParams::kDstIndexCol, |
| " does not exist in the input table"); |
| } |
| indices.push_back(index); |
| auto in_table = input_table->SelectColumns(indices).ValueOrDie(); |
| |
| GAR_ASSIGN_OR_RAISE( |
| auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index, |
| chunk_index, adj_list_type_)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteTableToFile(in_table, file_type, path); |
| } |
| |
| Status EdgeChunkWriter::WritePropertyChunk( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(validate(input_table, property_group, vertex_chunk_index, |
| chunk_index, validate_level)); |
| auto file_type = property_group->GetFileType(); |
| |
| std::vector<int> indices; |
| indices.clear(); |
| auto schema = input_table->schema(); |
| for (auto& property : property_group->GetProperties()) { |
| int indice = schema->GetFieldIndex(property.name); |
| if (indice == -1) { |
| return Status::Invalid("Column named ", property.name, |
| " of property group ", property_group, " of edge ", |
| edge_info_->GetEdgeLabel(), |
| " does not exist in the input table."); |
| } |
| indices.push_back(indice); |
| } |
| auto in_table = input_table->SelectColumns(indices).ValueOrDie(); |
| GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetPropertyFilePath( |
| property_group, adj_list_type_, |
| vertex_chunk_index, chunk_index)); |
| std::string path = prefix_ + suffix; |
| return fs_->WriteTableToFile(in_table, file_type, path); |
| } |
| |
| Status EdgeChunkWriter::WritePropertyChunk( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType chunk_index, ValidateLevel validate_level) const { |
| const auto& property_groups = edge_info_->GetPropertyGroups(); |
| for (auto& property_group : property_groups) { |
| GAR_RETURN_NOT_OK(WritePropertyChunk(input_table, property_group, |
| vertex_chunk_index, chunk_index, |
| validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::WriteChunk( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType chunk_index, ValidateLevel validate_level) const { |
| GAR_RETURN_NOT_OK(WriteAdjListChunk(input_table, vertex_chunk_index, |
| chunk_index, validate_level)); |
| return WritePropertyChunk(input_table, vertex_chunk_index, chunk_index, |
| validate_level); |
| } |
| |
| Status EdgeChunkWriter::WriteAdjListTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| int64_t length = input_table->num_rows(); |
| IdType chunk_index = start_chunk_index; |
| for (int64_t offset = 0; offset < length; |
| offset += chunk_size_, chunk_index++) { |
| auto in_chunk = input_table->Slice(offset, chunk_size_); |
| GAR_RETURN_NOT_OK(WriteAdjListChunk(in_chunk, vertex_chunk_index, |
| chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::WritePropertyTable( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType start_chunk_index, |
| ValidateLevel validate_level) const { |
| int64_t length = input_table->num_rows(); |
| IdType chunk_index = start_chunk_index; |
| for (int64_t offset = 0; offset < length; |
| offset += chunk_size_, chunk_index++) { |
| auto in_chunk = input_table->Slice(offset, chunk_size_); |
| GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, property_group, |
| vertex_chunk_index, chunk_index, |
| validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::WritePropertyTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| int64_t length = input_table->num_rows(); |
| IdType chunk_index = start_chunk_index; |
| for (int64_t offset = 0; offset < length; |
| offset += chunk_size_, chunk_index++) { |
| auto in_chunk = input_table->Slice(offset, chunk_size_); |
| GAR_RETURN_NOT_OK(WritePropertyChunk(in_chunk, vertex_chunk_index, |
| chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::WriteTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| int64_t length = input_table->num_rows(); |
| IdType chunk_index = start_chunk_index; |
| for (int64_t offset = 0; offset < length; |
| offset += chunk_size_, chunk_index++) { |
| auto in_chunk = input_table->Slice(offset, chunk_size_); |
| GAR_RETURN_NOT_OK( |
| WriteChunk(in_chunk, vertex_chunk_index, chunk_index, validate_level)); |
| } |
| return Status::OK(); |
| } |
| |
| Status EdgeChunkWriter::SortAndWriteAdjListTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| GAR_ASSIGN_OR_RAISE( |
| auto response_table, |
| sortTable(input_table, getSortColumnName(adj_list_type_))); |
| if (adj_list_type_ == AdjListType::ordered_by_source || |
| adj_list_type_ == AdjListType::ordered_by_dest) { |
| GAR_ASSIGN_OR_RAISE( |
| auto offset_table, |
| getOffsetTable(response_table, getSortColumnName(adj_list_type_), |
| vertex_chunk_index)); |
| GAR_RETURN_NOT_OK( |
| WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level)); |
| } |
| return WriteAdjListTable(response_table, vertex_chunk_index, |
| start_chunk_index, validate_level); |
| } |
| |
| Status EdgeChunkWriter::SortAndWritePropertyTable( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::shared_ptr<PropertyGroup>& property_group, |
| IdType vertex_chunk_index, IdType start_chunk_index, |
| ValidateLevel validate_level) const { |
| GAR_ASSIGN_OR_RAISE( |
| auto response_table, |
| sortTable(input_table, getSortColumnName(adj_list_type_))); |
| return WritePropertyTable(response_table, property_group, vertex_chunk_index, |
| start_chunk_index, validate_level); |
| } |
| |
| Status EdgeChunkWriter::SortAndWritePropertyTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| GAR_ASSIGN_OR_RAISE( |
| auto response_table, |
| sortTable(input_table, getSortColumnName(adj_list_type_))); |
| return WritePropertyTable(response_table, vertex_chunk_index, |
| start_chunk_index, validate_level); |
| } |
| |
| Status EdgeChunkWriter::SortAndWriteTable( |
| const std::shared_ptr<arrow::Table>& input_table, IdType vertex_chunk_index, |
| IdType start_chunk_index, ValidateLevel validate_level) const { |
| GAR_ASSIGN_OR_RAISE( |
| auto response_table, |
| sortTable(input_table, getSortColumnName(adj_list_type_))); |
| |
| if (adj_list_type_ == AdjListType::ordered_by_source || |
| adj_list_type_ == AdjListType::ordered_by_dest) { |
| GAR_ASSIGN_OR_RAISE( |
| auto offset_table, |
| getOffsetTable(response_table, getSortColumnName(adj_list_type_), |
| vertex_chunk_index)); |
| GAR_RETURN_NOT_OK( |
| WriteOffsetChunk(offset_table, vertex_chunk_index, validate_level)); |
| } |
| |
| return WriteTable(response_table, vertex_chunk_index, start_chunk_index, |
| validate_level); |
| } |
| |
| Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::getOffsetTable( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::string& column_name, IdType vertex_chunk_index) const { |
| std::shared_ptr<arrow::ChunkedArray> column = |
| input_table->GetColumnByName(column_name); |
| int64_t array_index = 0, index = 0; |
| auto ids = |
| std::static_pointer_cast<arrow::Int64Array>(column->chunk(array_index)); |
| |
| arrow::Int64Builder builder; |
| IdType begin_index = vertex_chunk_index * vertex_chunk_size_, |
| end_index = begin_index + vertex_chunk_size_; |
| RETURN_NOT_ARROW_OK(builder.Append(0)); |
| std::vector<std::shared_ptr<arrow::Array>> arrays; |
| std::vector<std::shared_ptr<arrow::Field>> schema_vector; |
| std::string property = GeneralParams::kOffsetCol; |
| schema_vector.push_back( |
| arrow::field(property, DataType::DataTypeToArrowDataType(int64()))); |
| |
| int64_t global_index = 0; |
| for (IdType i = begin_index; i < end_index; i++) { |
| while (true) { |
| if (array_index >= column->num_chunks()) |
| break; |
| if (index >= ids->length()) { |
| array_index++; |
| if (array_index == column->num_chunks()) |
| break; |
| ids = std::static_pointer_cast<arrow::Int64Array>( |
| column->chunk(array_index)); |
| index = 0; |
| } |
| if (ids->IsNull(index) || !ids->IsValid(index)) { |
| index++; |
| global_index++; |
| continue; |
| } |
| int64_t x = ids->Value(index); |
| if (x <= i) { |
| index++; |
| global_index++; |
| } else { |
| break; |
| } |
| } |
| RETURN_NOT_ARROW_OK(builder.Append(global_index)); |
| } |
| |
| GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto array, builder.Finish()); |
| arrays.push_back(array); |
| auto schema = std::make_shared<arrow::Schema>(schema_vector); |
| return arrow::Table::Make(schema, arrays); |
| } |
| |
| Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable( |
| const std::shared_ptr<arrow::Table>& input_table, |
| const std::string& column_name) { |
| auto exec_context = arrow::compute::default_exec_context(); |
| auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie(); |
| auto table_source_options = |
| arrow_acero_namespace::TableSourceNodeOptions{input_table}; |
| auto source = arrow_acero_namespace::MakeExecNode("table_source", plan.get(), |
| {}, table_source_options) |
| .ValueOrDie(); |
| AsyncGeneratorType sink_gen; |
| RETURN_NOT_ARROW_OK( |
| arrow_acero_namespace::MakeExecNode( |
| "order_by_sink", plan.get(), {source}, |
| arrow_acero_namespace::OrderBySinkNodeOptions{ |
| arrow::compute::SortOptions{{arrow::compute::SortKey{ |
| column_name, arrow::compute::SortOrder::Ascending}}}, |
| &sink_gen}) |
| .status()); |
| return ExecutePlanAndCollectAsTable(*exec_context, plan, |
| input_table->schema(), sink_gen); |
| } |
| |
| Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make( |
| const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix, |
| AdjListType adj_list_type, const ValidateLevel& validate_level) { |
| if (!edge_info->HasAdjacentListType(adj_list_type)) { |
| return Status::KeyError( |
| "The adjacent list type ", AdjListTypeToString(adj_list_type), |
| " doesn't exist in edge ", edge_info->GetEdgeLabel(), "."); |
| } |
| return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type, |
| validate_level); |
| } |
| |
| Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make( |
| const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_label, |
| const std::string& edge_label, const std::string& dst_label, |
| AdjListType adj_list_type, const ValidateLevel& validate_level) { |
| auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label); |
| if (!edge_info) { |
| return Status::KeyError("The edge ", src_label, " ", edge_label, " ", |
| dst_label, " doesn't exist."); |
| } |
| return Make(edge_info, graph_info->GetPrefix(), adj_list_type, |
| validate_level); |
| } |
| |
| std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) { |
| switch (adj_list_type) { |
| case AdjListType::unordered_by_source: |
| return GeneralParams::kSrcIndexCol; |
| case AdjListType::ordered_by_source: |
| return GeneralParams::kSrcIndexCol; |
| case AdjListType::unordered_by_dest: |
| return GeneralParams::kDstIndexCol; |
| case AdjListType::ordered_by_dest: |
| return GeneralParams::kDstIndexCol; |
| default: |
| return GeneralParams::kSrcIndexCol; |
| } |
| return GeneralParams::kSrcIndexCol; |
| } |
| } // namespace graphar |