blob: be273244d1030ce1356b5d212a4cd1c542ac8c92 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pathlib import Path
import pytest
import yaml
from graphar_pyspark import initialize
from graphar_pyspark.enums import AdjListType, FileType, GarType
from graphar_pyspark.info import (AdjList, EdgeInfo, GraphInfo, Property,
PropertyGroup, VertexInfo)
from pyspark.sql.utils import IllegalArgumentException
GRAPHAR_TESTS_EXAMPLES = Path(__file__).parent.parent.parent.joinpath("testing")
def test_property(spark):
initialize(spark)
property_from_py = Property.from_python("name", GarType.BOOL, False, False)
assert property_from_py == Property.from_scala(property_from_py.to_scala())
assert property_from_py != 0
assert property_from_py == Property.from_python("name", GarType.BOOL, False, False)
property_from_py.set_name("new_name")
property_from_py.set_data_type(GarType.INT32)
property_from_py.set_is_primary(True)
property_from_py.set_is_nullable(True)
assert property_from_py.get_name() == "new_name"
assert property_from_py.get_data_type() == GarType.INT32
assert property_from_py.get_is_primary() == True
assert property_from_py.get_is_nullable() == True
def test_property_group(spark):
initialize(spark)
p_group_from_py = PropertyGroup.from_python(
"prefix",
FileType.CSV,
[
Property.from_python("non_primary", GarType.DOUBLE, False, False),
Property.from_python("primary", GarType.INT64, True, False),
],
)
assert p_group_from_py == PropertyGroup.from_scala(p_group_from_py.to_scala())
assert p_group_from_py != 0
p_group_from_py.set_prefix("new_prefix")
p_group_from_py.set_file_type(FileType.ORC)
p_group_from_py.set_properties(
p_group_from_py.get_properties()
+ [Property("another_one", GarType.LIST, False, False)]
)
assert p_group_from_py.get_prefix() == "new_prefix"
assert p_group_from_py.get_file_type() == FileType.ORC
assert all(
p_left == p_right
for p_left, p_right in zip(
p_group_from_py.get_properties(),
[
Property.from_python("non_primary", GarType.DOUBLE, False, False),
Property.from_python("primary", GarType.INT64, True, False),
Property("another_one", GarType.LIST, False, False),
],
)
)
def test_adj_list(spark):
initialize(spark)
adj_list_from_py = AdjList.from_python(
True,
"dest",
"prefix",
FileType.PARQUET,
)
assert adj_list_from_py == AdjList.from_scala(adj_list_from_py.to_scala())
assert adj_list_from_py != 0
assert adj_list_from_py.get_adj_list_type() == AdjListType.ORDERED_BY_DEST
adj_list_from_py.set_aligned_by("src")
assert adj_list_from_py.get_adj_list_type() == AdjListType.ORDERED_BY_SOURCE
adj_list_from_py.set_ordered(False)
assert adj_list_from_py.get_adj_list_type() == AdjListType.UNORDERED_BY_SOURCE
adj_list_from_py.set_aligned_by("dest")
assert adj_list_from_py.get_adj_list_type() == AdjListType.UNORDERED_BY_DEST
adj_list_from_py.set_prefix("prefix_new")
assert adj_list_from_py.get_prefix() == "prefix_new"
adj_list_from_py.set_file_type(FileType.CSV)
assert adj_list_from_py.get_file_type() == FileType.CSV
def test_vertex_info(spark):
initialize(spark)
props_list_1 = [
Property.from_python("non_primary", GarType.DOUBLE, False, False),
Property.from_python("primary", GarType.INT64, True, False),
]
props_list_2 = [
Property.from_python("non_primary", GarType.DOUBLE, False, False),
Property.from_python("primary", GarType.INT64, True, False),
Property("another_one", GarType.LIST, False, False),
]
vertex_info_from_py = VertexInfo.from_python(
"label",
100,
"prefix",
[
PropertyGroup.from_python("prefix1", FileType.PARQUET, props_list_1),
PropertyGroup.from_python("prefix2", FileType.ORC, props_list_2),
],
"1",
)
assert vertex_info_from_py.is_nullable_key("non_primary") == False
assert vertex_info_from_py.contain_property_group(
PropertyGroup.from_python("prefix1", FileType.PARQUET, props_list_1)
)
assert (
vertex_info_from_py.contain_property_group(
PropertyGroup.from_python("prefix333", FileType.PARQUET, props_list_1)
)
== False
)
assert vertex_info_from_py.contain_property("primary")
assert vertex_info_from_py.contain_property("non_primary")
assert vertex_info_from_py.contain_property("non_existen_one") == False
yaml_string = vertex_info_from_py.dump()
restored_py_obj = yaml.safe_load(yaml_string)
assert restored_py_obj["type"] == "label"
assert restored_py_obj["prefix"] == "prefix"
# test setters
vertex_info_from_py.set_type("new_label")
assert vertex_info_from_py.get_type() == "new_label"
vertex_info_from_py.set_chunk_size(101)
assert vertex_info_from_py.get_chunk_size() == 101
vertex_info_from_py.set_prefix("new_prefix")
assert vertex_info_from_py.get_prefix() == "new_prefix"
vertex_info_from_py.set_version("2")
assert vertex_info_from_py.get_version() == "2"
vertex_info_from_py.set_property_groups(
[
PropertyGroup.from_python("prefix1", FileType.PARQUET, props_list_1),
PropertyGroup.from_python("prefix2", FileType.ORC, props_list_2),
PropertyGroup.from_python(
"prefix3", FileType.CSV, props_list_1 + props_list_2
),
],
)
assert len(vertex_info_from_py.get_property_groups()) == 3
# Get property group
assert vertex_info_from_py.get_property_group("primary") is not None
assert vertex_info_from_py.get_property_group("non_primary") is not None
assert vertex_info_from_py.get_property_group("another_one") is not None
with pytest.raises(IllegalArgumentException) as e:
vertex_info_from_py.get_property_group("non-exsiten-one")
assert e == "Property not found: non-exsiten-one"
assert vertex_info_from_py.get_property_type("primary") == GarType.INT64
assert vertex_info_from_py.get_property_type("non_primary") == GarType.DOUBLE
assert vertex_info_from_py.get_property_type("another_one") == GarType.LIST
with pytest.raises(IllegalArgumentException) as e:
vertex_info_from_py.get_property_type("non-existen-one")
assert e == "Property not found: non-exsiten-one"
# Load from disk
person_info = VertexInfo.load_vertex_info(
GRAPHAR_TESTS_EXAMPLES.joinpath("transformer")
.joinpath("person.vertex.yml")
.absolute()
.__str__()
)
assert person_info.get_type() == "person"
assert person_info.get_chunk_size() == 50
assert person_info.get_prefix() == "vertex/person/"
assert person_info.get_version() == "gar/v1"
assert len(person_info.get_property_groups()) == 2
assert person_info.get_property_type("id") == GarType.INT64
assert person_info.get_property_type("firstName") == GarType.STRING
# Primary keys logic
assert person_info.get_primary_key() == "id"
assert person_info.is_primary_key("id")
assert person_info.is_primary_key("firstName") == False
# Other
assert (
person_info.get_vertices_num_file_path()
== person_info.get_prefix() + "vertex_count"
)
assert person_info.is_validated()
assert (
VertexInfo.from_scala(person_info.to_scala()).get_prefix()
== person_info.get_prefix()
)
nebula_vi = VertexInfo.load_vertex_info(
GRAPHAR_TESTS_EXAMPLES.joinpath("nebula")
.joinpath("team.vertex.yml")
.absolute()
.__str__()
)
assert (
GRAPHAR_TESTS_EXAMPLES.joinpath("nebula")
.joinpath(nebula_vi.get_file_path(nebula_vi.get_property_group("name"), 0))
.exists()
)
assert len(nebula_vi.get_path_prefix(nebula_vi.get_property_group("name"))) > 0
def test_edge_info(spark):
initialize(spark)
py_edge_info = EdgeInfo.from_python(
src_type="src_label",
edge_type="edge_label",
dst_type="dst_label",
chunk_size=100,
src_chunk_size=101,
dst_chunk_size=102,
directed=True,
prefix="prefix",
adj_lists=[],
property_groups=[],
version="v1",
)
# getters/setters
py_edge_info.set_src_type("new_src_label")
assert py_edge_info.get_src_type() == "new_src_label"
py_edge_info.set_dst_type("new_dst_label")
assert py_edge_info.get_dst_type() == "new_dst_label"
py_edge_info.set_edge_type("new_edge_label")
assert py_edge_info.get_edge_type() == "new_edge_label"
py_edge_info.set_chunk_size(101)
assert py_edge_info.get_chunk_size() == 101
py_edge_info.set_src_chunk_size(102)
assert py_edge_info.get_src_chunk_size() == 102
py_edge_info.set_dst_chunk_size(103)
assert py_edge_info.get_dst_chunk_size() == 103
py_edge_info.set_directed(False)
assert py_edge_info.get_directed() == False
py_edge_info.set_prefix("new_prefix")
assert py_edge_info.get_prefix() == "new_prefix"
py_edge_info.set_version("v2")
assert py_edge_info.get_version() == "v2"
props_list_1 = [
Property.from_python("non_primary", GarType.DOUBLE, False, False),
Property.from_python("primary", GarType.INT64, True, False),
]
py_edge_info.set_adj_lists(
[
AdjList.from_python(
True,
"dest",
"prefix",
FileType.PARQUET,
)
]
)
py_edge_info.set_property_groups(
[
PropertyGroup.from_python("prefix1", FileType.PARQUET, props_list_1),
],
)
assert py_edge_info.is_nullable_key("non_primary") == False
assert len(py_edge_info.get_adj_lists()) == 1
assert len(py_edge_info.get_property_groups()) == 1
# Load from YAML
person_knows_person_info = EdgeInfo.load_edge_info(
GRAPHAR_TESTS_EXAMPLES.joinpath("transformer")
.joinpath("person_knows_person.edge.yml")
.absolute()
.__str__()
)
assert person_knows_person_info.get_directed() == False
assert person_knows_person_info.contain_property("creationDate")
assert (
person_knows_person_info.get_adj_list_prefix(AdjListType.UNORDERED_BY_DEST)
is not None
)
assert (
person_knows_person_info.get_adj_list_prefix(AdjListType.ORDERED_BY_SOURCE)
is not None
)
with pytest.raises(IllegalArgumentException) as e:
person_knows_person_info.get_adj_list_prefix(AdjListType.ORDERED_BY_DEST)
assert e == "adj list type not found: ordered_by_dest"
assert person_knows_person_info.contain_adj_list(AdjListType.UNORDERED_BY_DEST)
assert (
person_knows_person_info.contain_adj_list(AdjListType.UNORDERED_BY_SOURCE)
== False
)
assert person_knows_person_info.get_chunk_size() == 500
assert (
person_knows_person_info.get_offset_path_prefix(AdjListType.ORDERED_BY_SOURCE)
is not None
)
assert (
person_knows_person_info.get_adj_list_file_type(AdjListType.UNORDERED_BY_DEST)
== FileType.CSV
)
assert (
person_knows_person_info.get_adj_list_file_type(AdjListType.ORDERED_BY_SOURCE)
!= 0
)
assert len(person_knows_person_info.get_property_groups()) == 1
assert person_knows_person_info.contain_property_group(
person_knows_person_info.get_property_groups()[0],
)
assert person_knows_person_info.get_property_type("creationDate") == GarType.STRING
assert person_knows_person_info.is_primary_key("creationDate") == False
assert person_knows_person_info.get_primary_key() == ""
assert person_knows_person_info.is_validated()
assert (
person_knows_person_info.get_vertices_num_file_path(
AdjListType.ORDERED_BY_SOURCE
)
== "edge/person_knows_person/ordered_by_source/vertex_count"
)
assert (
person_knows_person_info.get_edges_num_path_prefix(
AdjListType.ORDERED_BY_SOURCE
)
== "edge/person_knows_person/ordered_by_source/edge_count"
)
assert (
person_knows_person_info.get_edges_num_file_path(
0, AdjListType.ORDERED_BY_SOURCE
)
== "edge/person_knows_person/ordered_by_source/edge_count0"
)
assert (
person_knows_person_info.get_adj_list_offset_file_path(
0, AdjListType.ORDERED_BY_SOURCE
)
== "edge/person_knows_person/ordered_by_source/offset/chunk0"
)
assert (
person_knows_person_info.get_adj_list_file_path(
0, 0, AdjListType.ORDERED_BY_SOURCE
)
== "edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0"
)
assert (
person_knows_person_info.get_adj_list_path_prefix(
None, AdjListType.ORDERED_BY_SOURCE
)
is not None
)
assert (
person_knows_person_info.get_adj_list_path_prefix(
0, AdjListType.ORDERED_BY_SOURCE
)
is not None
)
assert (
person_knows_person_info.get_property_file_path(
person_knows_person_info.get_property_group(
"creationDate",
),
AdjListType.ORDERED_BY_SOURCE,
0,
0,
)
is not None
)
assert (
person_knows_person_info.get_property_group_path_prefix(
person_knows_person_info.get_property_group(
"creationDate",
),
AdjListType.ORDERED_BY_SOURCE,
0,
)
) is not None
assert (
person_knows_person_info.get_property_group_path_prefix(
person_knows_person_info.get_property_group(
"creationDate",
),
AdjListType.ORDERED_BY_SOURCE,
None,
)
) is not None
assert person_knows_person_info.get_concat_key() == "person_knows_person"
yaml_string = person_knows_person_info.dump()
parsed_dict = yaml.safe_load(yaml_string)
assert "prefix" in parsed_dict.keys()
def test_graph_info(spark):
initialize(spark)
modern_graph_person = GraphInfo.load_graph_info(
GRAPHAR_TESTS_EXAMPLES.joinpath("modern_graph")
.joinpath("modern_graph.graph.yml")
.absolute()
.__str__()
)
assert len(modern_graph_person.get_edges()) == 2
assert modern_graph_person.get_name() == "modern_graph"
assert len(modern_graph_person.get_vertex_infos().keys()) == 2
assert "person" in modern_graph_person.get_vertex_infos().keys()
assert "software" in modern_graph_person.get_vertex_infos().keys()
assert len(modern_graph_person.get_edge_infos()) == 2
assert "person_knows_person" in modern_graph_person.get_edge_infos().keys()
assert "person_created_software" in modern_graph_person.get_edge_infos().keys()
assert modern_graph_person.get_edge_info("person", "knows", "person") is not None
assert modern_graph_person.get_vertex_info("person") is not None
# YAML
yaml_dict = yaml.safe_load(modern_graph_person.dump())
assert "name" in yaml_dict
assert yaml_dict["version"] == "gar/v1"
# Python constructor and setters
py_graph_info = GraphInfo.from_python(
"name", "prefix", ["person", "software"], ["person_knnows_person"], "v1"
)
py_graph_info.set_name("new_name")
assert py_graph_info.get_name() == "new_name"
py_graph_info.set_prefix("new_prefix")
assert py_graph_info.get_prefix() == "new_prefix"
init_vertices_size = py_graph_info.get_vertices().__len__()
new_vertices = py_graph_info.get_vertices()
new_vertices.append("new_one")
py_graph_info.set_vertices(new_vertices)
assert len(py_graph_info.get_vertices()) > init_vertices_size
init_edges_size = py_graph_info.get_edges().__len__()
new_edges = py_graph_info.get_edges()
new_edges.append("new_one")
py_graph_info.set_edges(new_edges)
assert len(py_graph_info.get_edges()) > init_edges_size
py_graph_info.set_version("v2")
assert py_graph_info.get_version() == "v2"
py_graph_info.add_edge_info(
EdgeInfo.from_python(
"src_label100",
"edge_label100",
"dst_label100",
10,
100,
100,
True,
"prefix",
[],
[],
"v1",
)
)
assert len(py_graph_info.get_edge_infos()) == 1
py_graph_info.add_vertex_info(
VertexInfo.from_python("some", 100, "prefix", [], "v1")
)
assert len(py_graph_info.get_vertex_infos()) == 1