blob: 11e4a6bd03be9e2b824f8078f524261696ed4fd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <filesystem>
#include "arrow/api.h"
#include "graphar/api/arrow_writer.h"
#include "graphar/api/high_level_writer.h"
#include "graphar/convert_to_arrow_type.h"
#include "graphar/graph_info.h"
#include "graphar/high-level/graph_reader.h"
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
#include "util.h"
namespace py = pybind11;
namespace fs = std::filesystem;
struct GraphArConfig {
std::string path;
std::string name;
std::string version;
};
struct Property {
std::string name;
std::string data_type;
bool is_primary;
bool nullable;
};
struct PropertyGroup {
std::string file_type;
std::vector<Property> properties;
};
struct Source {
std::string file_type;
std::string path;
char delimiter;
std::unordered_map<std::string, std::string> columns;
};
struct Vertex {
std::string type;
std::vector<std::string> labels;
int chunk_size;
std::string validate_level;
std::string prefix;
std::vector<PropertyGroup> property_groups;
std::vector<Source> sources;
};
struct AdjList {
bool ordered;
std::string aligned_by;
std::string file_type;
};
struct Edge {
std::string edge_type;
std::string src_type;
std::string src_prop;
std::string dst_type;
std::string dst_prop;
int chunk_size;
std::string validate_level;
std::string prefix;
std::vector<AdjList> adj_lists;
std::vector<PropertyGroup> property_groups;
std::vector<Source> sources;
};
struct ImportSchema {
std::vector<Vertex> vertices;
std::vector<Edge> edges;
};
struct ImportConfig {
GraphArConfig graphar_config;
ImportSchema import_schema;
};
ImportConfig ConvertPyDictToConfig(const py::dict& config_dict) {
ImportConfig import_config;
auto graphar_dict = config_dict["graphar"].cast<py::dict>();
import_config.graphar_config.path = graphar_dict["path"].cast<std::string>();
import_config.graphar_config.name = graphar_dict["name"].cast<std::string>();
import_config.graphar_config.version =
graphar_dict["version"].cast<std::string>();
auto schema_dict = config_dict["import_schema"].cast<py::dict>();
auto vertices_list = schema_dict["vertices"].cast<std::vector<py::dict>>();
for (const auto& vertex_dict : vertices_list) {
Vertex vertex;
vertex.type = vertex_dict["type"].cast<std::string>();
vertex.chunk_size = vertex_dict["chunk_size"].cast<int>();
vertex.prefix = vertex_dict["prefix"].cast<std::string>();
vertex.validate_level = vertex_dict["validate_level"].cast<std::string>();
vertex.labels = vertex_dict["labels"].cast<std::vector<std::string>>();
auto pg_list = vertex_dict["property_groups"].cast<std::vector<py::dict>>();
for (const auto& pg_dict : pg_list) {
PropertyGroup pg;
pg.file_type = pg_dict["file_type"].cast<std::string>();
auto prop_list = pg_dict["properties"].cast<std::vector<py::dict>>();
for (const auto& prop_dict : prop_list) {
Property prop;
prop.name = prop_dict["name"].cast<std::string>();
prop.data_type = prop_dict["data_type"].cast<std::string>();
prop.is_primary = prop_dict["is_primary"].cast<bool>();
prop.nullable = prop_dict["nullable"].cast<bool>();
pg.properties.emplace_back(prop);
}
vertex.property_groups.emplace_back(pg);
}
auto source_list = vertex_dict["sources"].cast<std::vector<py::dict>>();
for (const auto& source_dict : source_list) {
Source src;
src.file_type = source_dict["file_type"].cast<std::string>();
src.path = source_dict["path"].cast<std::string>();
src.delimiter = source_dict["delimiter"].cast<char>();
src.columns = source_dict["columns"]
.cast<std::unordered_map<std::string, std::string>>();
vertex.sources.emplace_back(src);
}
import_config.import_schema.vertices.emplace_back(vertex);
}
auto edges_list = schema_dict["edges"].cast<std::vector<py::dict>>();
for (const auto& edge_dict : edges_list) {
Edge edge;
edge.edge_type = edge_dict["edge_type"].cast<std::string>();
edge.src_type = edge_dict["src_type"].cast<std::string>();
edge.src_prop = edge_dict["src_prop"].cast<std::string>();
edge.dst_type = edge_dict["dst_type"].cast<std::string>();
edge.dst_prop = edge_dict["dst_prop"].cast<std::string>();
edge.chunk_size = edge_dict["chunk_size"].cast<int>();
edge.validate_level = edge_dict["validate_level"].cast<std::string>();
edge.prefix = edge_dict["prefix"].cast<std::string>();
auto adj_list_dicts = edge_dict["adj_lists"].cast<std::vector<py::dict>>();
for (const auto& adj_list_dict : adj_list_dicts) {
AdjList adj_list;
adj_list.ordered = adj_list_dict["ordered"].cast<bool>();
adj_list.aligned_by = adj_list_dict["aligned_by"].cast<std::string>();
adj_list.file_type = adj_list_dict["file_type"].cast<std::string>();
edge.adj_lists.emplace_back(adj_list);
}
auto edge_pg_list =
edge_dict["property_groups"].cast<std::vector<py::dict>>();
for (const auto& edge_pg_dict : edge_pg_list) {
PropertyGroup edge_pg;
edge_pg.file_type = edge_pg_dict["file_type"].cast<std::string>();
auto edge_prop_list =
edge_pg_dict["properties"].cast<std::vector<py::dict>>();
for (const auto& prop_dict : edge_prop_list) {
Property edge_prop;
edge_prop.name = prop_dict["name"].cast<std::string>();
edge_prop.data_type = prop_dict["data_type"].cast<std::string>();
edge_prop.is_primary = prop_dict["is_primary"].cast<bool>();
edge_prop.nullable = prop_dict["nullable"].cast<bool>();
edge_pg.properties.emplace_back(edge_prop);
}
edge.property_groups.emplace_back(edge_pg);
}
auto edge_source_list = edge_dict["sources"].cast<std::vector<py::dict>>();
for (const auto& edge_source_dict : edge_source_list) {
Source edge_src;
edge_src.file_type = edge_source_dict["file_type"].cast<std::string>();
edge_src.path = edge_source_dict["path"].cast<std::string>();
edge_src.delimiter = edge_source_dict["delimiter"].cast<char>();
edge_src.columns =
edge_source_dict["columns"]
.cast<std::unordered_map<std::string, std::string>>();
edge.sources.emplace_back(edge_src);
}
import_config.import_schema.edges.emplace_back(edge);
}
return import_config;
}
std::string DoImport(const py::dict& config_dict) {
auto import_config = ConvertPyDictToConfig(config_dict);
auto version =
graphar::InfoVersion::Parse(import_config.graphar_config.version).value();
fs::path save_path = import_config.graphar_config.path;
std::unordered_map<std::string, graphar::IdType> vertex_chunk_sizes;
std::unordered_map<std::string, int64_t> vertex_counts;
std::map<std::pair<std::string, std::string>,
std::unordered_map<std::shared_ptr<arrow::Scalar>, graphar::IdType,
arrow::Scalar::Hash, arrow::Scalar::PtrsEqual>>
vertex_prop_index_map;
std::unordered_map<std::string, std::vector<std::string>>
vertex_props_in_edges;
std::map<std::pair<std::string, std::string>, graphar::Property>
vertex_prop_property_map;
for (const auto& edge : import_config.import_schema.edges) {
vertex_props_in_edges[edge.src_type].emplace_back(edge.src_prop);
vertex_props_in_edges[edge.dst_type].emplace_back(edge.dst_prop);
}
for (const auto& vertex : import_config.import_schema.vertices) {
vertex_chunk_sizes[vertex.type] = vertex.chunk_size;
auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();
std::string primary_key;
for (const auto& pg : vertex.property_groups) {
std::vector<graphar::Property> props;
for (const auto& prop : pg.properties) {
if (prop.is_primary) {
if (!primary_key.empty()) {
throw std::runtime_error("Multiple primary keys found in vertex " +
vertex.type);
}
primary_key = prop.name;
}
graphar::Property property(
prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable);
props.emplace_back(property);
vertex_prop_property_map[std::make_pair(vertex.type, prop.name)] =
property;
}
// TODO: add prefix parameter in config
auto property_group = graphar::CreatePropertyGroup(
props, graphar::StringToFileType(pg.file_type));
pgs.emplace_back(property_group);
}
auto vertex_info =
graphar::CreateVertexInfo(vertex.type, vertex.chunk_size, pgs,
vertex.labels, vertex.prefix, version);
auto file_name = vertex.type + ".vertex.yml";
vertex_info->Save(save_path / file_name);
auto save_path_str = save_path.string();
save_path_str += "/";
auto vertex_prop_writer = graphar::VertexPropertyWriter::Make(
vertex_info, save_path_str,
StringToValidateLevel(vertex.validate_level))
.value();
std::vector<std::shared_ptr<arrow::Table>> vertex_tables;
for (const auto& source : vertex.sources) {
std::vector<std::string> column_names;
for (const auto& [key, value] : source.columns) {
column_names.emplace_back(key);
}
auto table = GetDataFromFile(source.path, column_names, source.delimiter,
source.file_type);
std::unordered_map<std::string, Property> column_prop_map;
std::unordered_map<std::string, std::string> reversed_columns_config;
for (const auto& [key, value] : source.columns) {
reversed_columns_config[value] = key;
}
for (const auto& pg : vertex.property_groups) {
for (const auto& prop : pg.properties) {
column_prop_map[reversed_columns_config[prop.name]] = prop;
}
}
std::unordered_map<
std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
columns_to_change;
for (const auto& [column, prop] : column_prop_map) {
auto arrow_data_type = graphar::DataType::DataTypeToArrowDataType(
graphar::DataType::TypeNameToDataType(prop.data_type));
auto arrow_column = table->GetColumnByName(column);
// TODO: whether need to check duplicate values for primary key?
if (!prop.nullable) {
for (const auto& chunk : arrow_column->chunks()) {
if (chunk->null_count() > 0) {
throw std::runtime_error("Non-nullable column '" + column +
"' has null values");
}
}
}
// TODO: check this
if (column != prop.name ||
arrow_column->type()->id() != arrow_data_type->id()) {
columns_to_change[column] =
std::make_pair(prop.name, arrow_data_type);
}
}
table = ChangeNameAndDataType(table, columns_to_change);
vertex_tables.emplace_back(table);
}
std::shared_ptr<arrow::Table> merged_vertex_table =
MergeTables(vertex_tables);
// TODO: check all fields in props
// TODO: add start_index in config
graphar::IdType start_chunk_index = 0;
auto vertex_table_with_index =
vertex_prop_writer
->AddIndexColumn(merged_vertex_table, start_chunk_index,
vertex_info->GetChunkSize())
.value();
for (const auto& property_group : pgs) {
vertex_prop_writer->WriteTable(vertex_table_with_index, property_group,
start_chunk_index);
}
if (vertex_props_in_edges.find(vertex.type) !=
vertex_props_in_edges.end()) {
for (const auto& vertex_prop : vertex_props_in_edges[vertex.type]) {
vertex_prop_index_map[std::make_pair(vertex.type, vertex_prop)] =
TableToUnorderedMap(vertex_table_with_index, vertex_prop,
graphar::GeneralParams::kVertexIndexCol);
}
}
auto vertex_count = merged_vertex_table->num_rows();
vertex_counts[vertex.type] = vertex_count;
vertex_prop_writer->WriteVerticesNum(vertex_count);
}
for (const auto& edge : import_config.import_schema.edges) {
auto pgs = std::vector<std::shared_ptr<graphar::PropertyGroup>>();
for (const auto& pg : edge.property_groups) {
std::vector<graphar::Property> props;
for (const auto& prop : pg.properties) {
props.emplace_back(graphar::Property(
prop.name, graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable));
}
// TODO: add prefix parameter in config
auto property_group = graphar::CreatePropertyGroup(
props, graphar::StringToFileType(pg.file_type));
pgs.emplace_back(property_group);
}
graphar::AdjacentListVector adj_lists;
for (const auto& adj_list : edge.adj_lists) {
// TODO: add prefix parameter in config
adj_lists.emplace_back(graphar::CreateAdjacentList(
graphar::OrderedAlignedToAdjListType(adj_list.ordered,
adj_list.aligned_by),
graphar::StringToFileType(adj_list.file_type)));
}
// TODO: add directed parameter in config
bool directed = true;
// TODO: whether prefix has default value?
auto edge_info = graphar::CreateEdgeInfo(
edge.src_type, edge.edge_type, edge.dst_type, edge.chunk_size,
vertex_chunk_sizes[edge.src_type], vertex_chunk_sizes[edge.dst_type],
directed, adj_lists, pgs, edge.prefix, version);
auto file_name =
ConcatEdgeTriple(edge.src_type, edge.edge_type, edge.dst_type) +
".edge.yml";
edge_info->Save(save_path / file_name);
auto save_path_str = save_path.string();
save_path_str += "/";
for (const auto& adj_list : adj_lists) {
int64_t vertex_count;
if (adj_list->GetType() == graphar::AdjListType::ordered_by_source ||
adj_list->GetType() == graphar::AdjListType::unordered_by_source) {
vertex_count = vertex_counts[edge.src_type];
} else {
vertex_count = vertex_counts[edge.dst_type];
}
std::vector<std::shared_ptr<arrow::Table>> edge_tables;
for (const auto& source : edge.sources) {
std::vector<std::string> column_names;
for (const auto& [key, value] : source.columns) {
column_names.emplace_back(key);
}
auto table = GetDataFromFile(source.path, column_names,
source.delimiter, source.file_type);
std::unordered_map<std::string, graphar::Property> column_prop_map;
std::unordered_map<std::string, std::string> reversed_columns;
for (const auto& [key, value] : source.columns) {
reversed_columns[value] = key;
}
for (const auto& pg : edge.property_groups) {
for (const auto& prop : pg.properties) {
column_prop_map[reversed_columns[prop.name]] = graphar::Property(
prop.name,
graphar::DataType::TypeNameToDataType(prop.data_type),
prop.is_primary, prop.nullable);
}
}
column_prop_map[reversed_columns.at(edge.src_prop)] =
vertex_prop_property_map.at(
std::make_pair(edge.src_type, edge.src_prop));
column_prop_map[reversed_columns.at(edge.dst_prop)] =
vertex_prop_property_map.at(
std::make_pair(edge.dst_type, edge.dst_prop));
std::unordered_map<
std::string,
std::pair<std::string, std::shared_ptr<arrow::DataType>>>
columns_to_change;
for (const auto& [column, prop] : column_prop_map) {
auto arrow_data_type =
graphar::DataType::DataTypeToArrowDataType(prop.type);
auto arrow_column = table->GetColumnByName(column);
// TODO: is needed?
if (!prop.is_nullable) {
for (const auto& chunk : arrow_column->chunks()) {
if (chunk->null_count() > 0) {
throw std::runtime_error("Non-nullable column '" + column +
"' has null values");
}
}
}
if (column != prop.name ||
arrow_column->type()->id() != arrow_data_type->id()) {
columns_to_change[column] =
std::make_pair(prop.name, arrow_data_type);
}
}
table = ChangeNameAndDataType(table, columns_to_change);
edge_tables.emplace_back(table);
}
std::unordered_map<
std::string, std::pair<std::string, std::shared_ptr<arrow::DataType>>>
vertex_columns_to_change;
std::shared_ptr<arrow::Table> merged_edge_table =
MergeTables(edge_tables);
// TODO: check all fields in props
auto combined_edge_table =
merged_edge_table->CombineChunks().ValueOrDie();
auto edge_builder =
graphar::builder::EdgesBuilder::Make(
edge_info, save_path_str, adj_list->GetType(), vertex_count,
StringToValidateLevel(edge.validate_level))
.value();
std::vector<std::string> edge_column_names;
for (const auto& field : combined_edge_table->schema()->fields()) {
edge_column_names.push_back(field->name());
}
const int64_t num_rows = combined_edge_table->num_rows();
for (int64_t i = 0; i < num_rows; ++i) {
auto edge_src_column =
combined_edge_table->GetColumnByName(edge.src_prop);
auto edge_dst_column =
combined_edge_table->GetColumnByName(edge.dst_prop);
graphar::builder::Edge e(
vertex_prop_index_map
.at(std::make_pair(edge.src_type, edge.src_prop))
.at(edge_src_column->GetScalar(i).ValueOrDie()),
vertex_prop_index_map
.at(std::make_pair(edge.dst_type, edge.dst_prop))
.at(edge_dst_column->GetScalar(i).ValueOrDie()));
for (const auto& column_name : edge_column_names) {
if (column_name != edge.src_prop && column_name != edge.dst_prop) {
auto column = combined_edge_table->GetColumnByName(column_name);
auto column_type = column->type();
std::any value;
TryToCastToAny(
graphar::DataType::ArrowDataTypeToDataType(column_type),
column->chunk(0), value);
e.AddProperty(column_name, value);
}
}
edge_builder->AddEdge(e);
}
edge_builder->Dump();
}
}
return "Imported successfully!";
}