blob: 1bdce5cc83cc29288e9806679de5a54b66bd9b9c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
# pylint: disable=C0302,C0103,W1514,R1735,R1734,C0206
import json
import os
import dgl
import networkx as nx
import numpy as np
import pandas as pd
import scipy
import torch
from dgl.data import (
CiteseerGraphDataset,
CoraGraphDataset,
GINDataset,
LegacyTUDataset,
PubmedGraphDataset,
get_download_dir,
)
from dgl.data.utils import _get_dgl_url, download, load_graphs
from ogb.linkproppred import DglLinkPropPredDataset
from pyhugegraph.api.graph import GraphManager
from pyhugegraph.api.schema import SchemaManager
from pyhugegraph.client import PyHugeClient
MAX_BATCH_NUM = 500
def clear_all_data(
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client.graphs().clear_graph_all_data()
def import_graph_from_dgl(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
dataset_name = dataset_name.upper()
if dataset_name == "CORA":
dataset_dgl = CoraGraphDataset(verbose=False)
elif dataset_name == "CITESEER":
dataset_dgl = CiteseerGraphDataset(verbose=False)
elif dataset_name == "PUBMED":
dataset_dgl = PubmedGraphDataset(verbose=False)
else:
raise ValueError("dataset not supported")
graph_dgl = dataset_dgl[0]
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# create property schema
client_schema.propertyKey("feat").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("label").asLong().ifNotExist().create()
client_schema.propertyKey("train_mask").asInt().ifNotExist().create()
client_schema.propertyKey("val_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_mask").asInt().ifNotExist().create()
# check props and create vertex label
vertex_label = f"{dataset_name}_vertex"
all_props = ["feat", "label", "train_mask", "val_mask", "test_mask"]
props = [p for p in all_props if p in graph_dgl.ndata]
props_value = {}
for p in props:
props_value[p] = graph_dgl.ndata[p].tolist()
client_schema.vertexLabel(vertex_label).useAutomaticId().properties(*props).ifNotExist().create()
# add vertices for batch (note MAX_BATCH_NUM)
idx_to_vertex_id = {}
vdatas = []
vidxs = []
for idx in range(graph_dgl.number_of_nodes()):
# extract props
properties = {
p: int(props_value[p][idx]) if isinstance(props_value[p][idx], bool) else props_value[p][idx] for p in props
}
vdata = [vertex_label, properties]
vdatas.append(vdata)
vidxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
vdatas.clear()
vidxs.clear()
# add rest vertices
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
# add edges for batch
edge_label = f"{dataset_name}_edge"
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).ifNotExist().create()
edges_src, edges_dst = graph_dgl.edges()
edatas = []
for src, dst in zip(edges_src.numpy(), edges_dst.numpy(), strict=False):
edata = [edge_label, idx_to_vertex_id[src], idx_to_vertex_id[dst], vertex_label, vertex_label, {}]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_graphs_from_dgl(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
dataset_name = dataset_name.upper()
# load dgl bultin dataset
if dataset_name in ["ENZYMES", "DD"]:
dataset_dgl = LegacyTUDataset(name=dataset_name)
elif dataset_name in ["MUTAG", "COLLAB", "NCI1", "PROTEINS", "PTC"]:
dataset_dgl = GINDataset(name=dataset_name, self_loop=True)
else:
raise ValueError("dataset not supported")
# hugegraph client
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# define vertexLabel/edgeLabel
graph_vertex_label = f"{dataset_name}_graph_vertex"
vertex_label = f"{dataset_name}_vertex"
edge_label = f"{dataset_name}_edge"
# create schema
client_schema.propertyKey("label").asLong().ifNotExist().create()
client_schema.propertyKey("feat").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("graph_id").asLong().ifNotExist().create()
client_schema.vertexLabel(graph_vertex_label).useAutomaticId().properties("label").ifNotExist().create()
client_schema.vertexLabel(vertex_label).useAutomaticId().properties("feat", "graph_id").ifNotExist().create()
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).properties(
"graph_id"
).ifNotExist().create()
client_schema.indexLabel("vertex_by_graph_id").onV(vertex_label).by("graph_id").secondary().ifNotExist().create()
client_schema.indexLabel("edge_by_graph_id").onE(edge_label).by("graph_id").secondary().ifNotExist().create()
# import to hugegraph
for graph_dgl, label in dataset_dgl:
graph_vertex = client_graph.addVertex(label=graph_vertex_label, properties={"label": int(label)})
# refine feat prop
if "feat" in graph_dgl.ndata:
node_feats = graph_dgl.ndata["feat"]
elif "attr" in graph_dgl.ndata:
node_feats = graph_dgl.ndata["attr"]
else:
raise ValueError("Node feature is empty")
# add vertices of graph i for barch
idx_to_vertex_id = {}
vdatas = []
vidxs = []
for idx in range(graph_dgl.number_of_nodes()):
feat = node_feats[idx].tolist()
properties = {"feat": feat, "graph_id": graph_vertex.id}
vdata = [vertex_label, properties]
vdatas.append(vdata)
vidxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
vdatas.clear()
vidxs.clear()
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
# add edges of graph i for barch
srcs, dsts = graph_dgl.edges()
edatas = []
for src, dst in zip(srcs.numpy(), dsts.numpy(), strict=False):
edata = [
edge_label,
idx_to_vertex_id[src],
idx_to_vertex_id[dst],
vertex_label,
vertex_label,
{"graph_id": graph_vertex.id},
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_hetero_graph_from_dgl(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
dataset_name = dataset_name.upper()
if dataset_name == "ACM":
hetero_graph = load_acm_raw()
else:
raise ValueError("dataset not supported")
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
client_schema.propertyKey("feat").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("label").asLong().ifNotExist().create()
client_schema.propertyKey("train_mask").asInt().ifNotExist().create()
client_schema.propertyKey("val_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_mask").asInt().ifNotExist().create()
ntype_to_vertex_label = {}
ntype_idx_to_vertex_id = {}
for ntype in hetero_graph.ntypes:
# create vertex schema
vertex_label = f"{dataset_name}_{ntype}_v"
ntype_to_vertex_label[ntype] = vertex_label
all_props = ["feat", "label", "train_mask", "val_mask", "test_mask"]
# check properties
props = [p for p in all_props if p in hetero_graph.nodes[ntype].data]
client_schema.vertexLabel(vertex_label).useAutomaticId().properties(*props).ifNotExist().create()
props_value = {}
for p in props:
props_value[p] = hetero_graph.nodes[ntype].data[p].tolist()
# add vertices for batch of ntype
idx_to_vertex_id = {}
vdatas = []
idxs = []
for idx in range(hetero_graph.number_of_nodes(ntype=ntype)):
properties = {
p: int(props_value[p][idx]) if isinstance(props_value[p][idx], bool) else props_value[p][idx]
for p in props
}
vdata = [vertex_label, properties]
vdatas.append(vdata)
idxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
vdatas.clear()
idxs.clear()
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
ntype_idx_to_vertex_id[ntype] = idx_to_vertex_id
# add edges
edatas = []
for canonical_etype in hetero_graph.canonical_etypes:
# create edge schema
src_type, etype, dst_type = canonical_etype
edge_label = f"{dataset_name}_{etype}_e"
client_schema.edgeLabel(edge_label).sourceLabel(ntype_to_vertex_label[src_type]).targetLabel(
ntype_to_vertex_label[dst_type]
).ifNotExist().create()
# add edges for batch of canonical_etype
srcs, dsts = hetero_graph.edges(etype=canonical_etype)
for src, dst in zip(srcs.numpy(), dsts.numpy(), strict=False):
edata = [
edge_label,
ntype_idx_to_vertex_id[src_type][src],
ntype_idx_to_vertex_id[dst_type][dst],
ntype_to_vertex_label[src_type],
ntype_to_vertex_label[dst_type],
{},
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_hetero_graph_from_dgl_no_feat(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
# dataset download from:
# https://s3.us-west-2.amazonaws.com/dgl-data/dataset/recsys/GATNE/amazon.zip
dataset_name = dataset_name.upper()
if dataset_name == "AMAZONGATNE":
hetero_graph = load_training_data_gatne()
else:
raise ValueError("dataset not supported")
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
ntype_to_vertex_label = {}
ntype_idx_to_vertex_id = {}
for ntype in hetero_graph.ntypes:
# create vertex schema
vertex_label = f"{dataset_name}_{ntype}_v"
ntype_to_vertex_label[ntype] = vertex_label
client_schema.vertexLabel(vertex_label).useAutomaticId().ifNotExist().create()
# add vertices for batch of ntype
idx_to_vertex_id = {}
vdatas = []
idxs = []
for idx in range(hetero_graph.number_of_nodes(ntype=ntype)):
properties = {}
vdata = [vertex_label, properties]
vdatas.append(vdata)
idxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
vdatas.clear()
idxs.clear()
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
ntype_idx_to_vertex_id[ntype] = idx_to_vertex_id
# add edges
edatas = []
for canonical_etype in hetero_graph.canonical_etypes:
# create edge schema
src_type, etype, dst_type = canonical_etype
edge_label = f"{dataset_name}_{etype}_e"
client_schema.edgeLabel(edge_label).sourceLabel(ntype_to_vertex_label[src_type]).targetLabel(
ntype_to_vertex_label[dst_type]
).ifNotExist().create()
# add edges for batch of canonical_etype
srcs, dsts = hetero_graph.edges(etype=canonical_etype)
for src, dst in zip(srcs.numpy(), dsts.numpy(), strict=False):
edata = [
edge_label,
ntype_idx_to_vertex_id[src_type][src],
ntype_idx_to_vertex_id[dst_type][dst],
ntype_to_vertex_label[src_type],
ntype_to_vertex_label[dst_type],
{},
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_graph_from_nx(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
dataset_name = dataset_name.upper()
if dataset_name == "CAVEMAN":
dataset = nx.connected_caveman_graph(20, 20)
else:
raise ValueError("dataset not supported")
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# create property schema
# check props and create vertex label
vertex_label = f"{dataset_name}_vertex"
client_schema.vertexLabel(vertex_label).useAutomaticId().ifNotExist().create()
# add vertices for batch (note MAX_BATCH_NUM)
idx_to_vertex_id = {}
vdatas = []
vidxs = []
for idx in dataset.nodes:
vdata = [vertex_label, {}]
vdatas.append(vdata)
vidxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
vdatas.clear()
vidxs.clear()
# add rest vertices
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
# add edges for batch
edge_label = f"{dataset_name}_edge"
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).ifNotExist().create()
edatas = []
for edge in dataset.edges:
edata = [
edge_label,
idx_to_vertex_id[edge[0]],
idx_to_vertex_id[edge[1]],
vertex_label,
vertex_label,
{},
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_graph_from_dgl_with_edge_feat(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
dataset_name = dataset_name.upper()
if dataset_name == "CORA":
dataset_dgl = CoraGraphDataset(verbose=False)
elif dataset_name == "CITESEER":
dataset_dgl = CiteseerGraphDataset(verbose=False)
elif dataset_name == "PUBMED":
dataset_dgl = PubmedGraphDataset(verbose=False)
else:
raise ValueError("dataset not supported")
graph_dgl = dataset_dgl[0]
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# create property schema
client_schema.propertyKey("feat").asDouble().valueList().ifNotExist().create() # node features
client_schema.propertyKey("edge_feat").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("label").asLong().ifNotExist().create()
client_schema.propertyKey("train_mask").asInt().ifNotExist().create()
client_schema.propertyKey("val_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_mask").asInt().ifNotExist().create()
# check props and create vertex label
vertex_label = f"{dataset_name}_edge_feat_vertex"
node_all_props = ["feat", "label", "train_mask", "val_mask", "test_mask"]
node_props = [p for p in node_all_props if p in graph_dgl.ndata]
node_props_value = {}
for p in node_props:
node_props_value[p] = graph_dgl.ndata[p].tolist()
client_schema.vertexLabel(vertex_label).useAutomaticId().properties(*node_props).ifNotExist().create()
# add vertices for batch (note MAX_BATCH_NUM)
idx_to_vertex_id = {}
vdatas = []
vidxs = []
for idx in range(graph_dgl.number_of_nodes()):
# extract props
properties = {
p: (
int(node_props_value[p][idx])
if isinstance(node_props_value[p][idx], bool)
else node_props_value[p][idx]
)
for p in node_props
}
vdata = [vertex_label, properties]
vdatas.append(vdata)
vidxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
vdatas.clear()
vidxs.clear()
# add rest vertices
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
# add edges for batch
edge_label = f"{dataset_name}_edge_feat_edge"
edge_all_props = ["edge_feat"]
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).properties(
*edge_all_props
).ifNotExist().create()
edges_src, edges_dst = graph_dgl.edges()
edatas = []
for src, dst in zip(edges_src.numpy(), edges_dst.numpy(), strict=False):
properties = {p: (torch.rand(8).tolist()) for p in edge_all_props}
edata = [
edge_label,
idx_to_vertex_id[src],
idx_to_vertex_id[dst],
vertex_label,
vertex_label,
properties,
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def import_graph_from_ogb(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
if dataset_name == "ogbl-collab":
dataset_dgl = DglLinkPropPredDataset(name=dataset_name)
else:
raise ValueError("dataset not supported")
graph_dgl = dataset_dgl[0]
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# create property schema
client_schema.propertyKey("feat").asDouble().valueList().ifNotExist().create() # node features
client_schema.propertyKey("year").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("weight").asDouble().valueList().ifNotExist().create()
# check props and create vertex label
vertex_label = f"{dataset_name}_vertex"
node_all_props = ["feat"]
node_props = [p for p in node_all_props if p in graph_dgl.ndata]
node_props_value = {}
for p in node_props:
node_props_value[p] = graph_dgl.ndata[p].tolist()
client_schema.vertexLabel(vertex_label).useAutomaticId().properties(*node_props).ifNotExist().create()
# add vertices for batch (note MAX_BATCH_NUM)
idx_to_vertex_id = {}
vdatas = []
vidxs = []
max_nodes = 10000
for idx in range(graph_dgl.number_of_nodes()):
if idx <= max_nodes:
# extract props
properties = {
p: (
int(node_props_value[p][idx])
if isinstance(node_props_value[p][idx], bool)
else node_props_value[p][idx]
)
for p in node_props
}
vdata = [vertex_label, properties]
vdatas.append(vdata)
vidxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
vdatas.clear()
vidxs.clear()
# add rest vertices
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, vidxs))
# add edges for batch
edge_label = f"{dataset_name}_edge"
edge_all_props = ["year", "weight"]
edge_props_value = {}
for p in edge_all_props:
edge_props_value[p] = graph_dgl.edata[p].tolist()
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).properties(
*edge_all_props
).ifNotExist().create()
edges_src, edges_dst = graph_dgl.edges()
edatas = []
for src, dst in zip(edges_src.numpy(), edges_dst.numpy(), strict=False):
if src <= max_nodes and dst <= max_nodes:
properties = {
p: (
int(edge_props_value[p][idx])
if isinstance(edge_props_value[p][idx], bool)
else edge_props_value[p][idx]
)
for p in edge_all_props
}
edata = [
edge_label,
idx_to_vertex_id[src],
idx_to_vertex_id[dst],
vertex_label,
vertex_label,
properties,
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
import_split_edge_from_ogb(
dataset_name=dataset_name,
idx_to_vertex_id=idx_to_vertex_id,
max_nodes=max_nodes,
)
def import_split_edge_from_ogb(
dataset_name,
idx_to_vertex_id,
max_nodes: int,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
if dataset_name == "ogbl-collab":
dataset_dgl = DglLinkPropPredDataset(name=dataset_name)
else:
raise ValueError("dataset not supported")
split_edges = dataset_dgl.get_edge_split()
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
# create property schema
client_schema.propertyKey("train_edge_mask").asInt().ifNotExist().create()
client_schema.propertyKey("train_year_mask").asInt().ifNotExist().create()
client_schema.propertyKey("train_weight_mask").asInt().ifNotExist().create()
client_schema.propertyKey("valid_edge_mask").asInt().ifNotExist().create()
client_schema.propertyKey("valid_weight_mask").asInt().ifNotExist().create()
client_schema.propertyKey("valid_year_mask").asInt().ifNotExist().create()
client_schema.propertyKey("valid_edge_neg_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_edge_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_weight_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_year_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_edge_neg_mask").asInt().ifNotExist().create()
edge_all_props = [
"train_edge_mask",
"train_year_mask",
"train_weight_mask",
"valid_edge_mask",
"valid_weight_mask",
"valid_year_mask",
"valid_edge_neg_mask",
"test_edge_mask",
"test_weight_mask",
"test_year_mask",
"test_edge_neg_mask",
]
edge_props = [
"train_edge_mask",
"valid_edge_mask",
"valid_edge_neg_mask",
"test_edge_mask",
"test_edge_neg_mask",
]
# add edges for batch
vertex_label = f"{dataset_name}_vertex"
edge_label = f"{dataset_name}_split_edge"
client_schema.edgeLabel(edge_label).sourceLabel(vertex_label).targetLabel(vertex_label).properties(
*edge_all_props
).ifNotExist().create()
edges = {}
edges["train_edge_mask"] = split_edges["train"]["edge"]
edges["train_year_mask"] = split_edges["train"]["year"]
edges["train_weight_mask"] = split_edges["train"]["weight"]
edges["valid_edge_mask"] = split_edges["valid"]["edge"]
edges["valid_weight_mask"] = split_edges["valid"]["weight"]
edges["valid_year_mask"] = split_edges["valid"]["year"]
edges["valid_edge_neg_mask"] = split_edges["valid"]["edge_neg"]
edges["test_edge_mask"] = split_edges["test"]["edge"]
edges["test_weight_mask"] = split_edges["test"]["weight"]
edges["test_year_mask"] = split_edges["test"]["year"]
edges["test_edge_neg_mask"] = split_edges["test"]["edge_neg"]
init_ogb_split_edge(
"train",
"valid",
"test",
"",
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
)
init_ogb_split_edge(
"valid",
"train",
"test",
"",
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
)
init_ogb_split_edge(
"valid",
"train",
"test",
"neg_",
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
)
init_ogb_split_edge(
"test",
"train",
"valid",
"",
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
)
init_ogb_split_edge(
"test",
"train",
"valid",
"neg_",
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
)
def import_hetero_graph_from_dgl_bgnn(
dataset_name,
url: str = "http://127.0.0.1:8080",
graph: str = "hugegraph",
user: str = "",
pwd: str = "",
graphspace: str | None = None,
):
# dataset download from : https://www.dropbox.com/s/verx1evkykzli88/datasets.zip
# Extract zip folder in this directory
dataset_name = dataset_name.upper()
if dataset_name == "AVAZU":
hetero_graph = read_input()
else:
raise ValueError("dataset not supported")
client: PyHugeClient = PyHugeClient(url=url, graph=graph, user=user, pwd=pwd, graphspace=graphspace)
client_schema: SchemaManager = client.schema()
client_graph: GraphManager = client.graph()
client_schema.propertyKey("feat").asInt().valueList().ifNotExist().create()
client_schema.propertyKey("class").asDouble().valueList().ifNotExist().create()
client_schema.propertyKey("cat_features").asInt().valueList().ifNotExist().create()
client_schema.propertyKey("train_mask").asInt().ifNotExist().create()
client_schema.propertyKey("val_mask").asInt().ifNotExist().create()
client_schema.propertyKey("test_mask").asInt().ifNotExist().create()
ntype_to_vertex_label = {}
ntype_idx_to_vertex_id = {}
for ntype in hetero_graph.ntypes:
# create vertex schema
vertex_label = f"{dataset_name}_{ntype}_v"
ntype_to_vertex_label[ntype] = vertex_label
all_props = [
"feat",
"class",
"cat_features",
"train_mask",
"val_mask",
"test_mask",
]
# check properties
props = [p for p in all_props if p in hetero_graph.nodes[ntype].data]
client_schema.vertexLabel(vertex_label).useAutomaticId().properties(*props).ifNotExist().create()
props_value = {}
for p in props:
props_value[p] = hetero_graph.nodes[ntype].data[p].tolist()
# add vertices for batch of ntype
idx_to_vertex_id = {}
vdatas = []
idxs = []
for idx in range(hetero_graph.number_of_nodes(ntype=ntype)):
properties = {
p: (int(props_value[p][idx]) if isinstance(props_value[p][idx], bool) else props_value[p][idx])
for p in props
}
vdata = [vertex_label, properties]
vdatas.append(vdata)
idxs.append(idx)
if len(vdatas) == MAX_BATCH_NUM:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
vdatas.clear()
idxs.clear()
if len(vdatas) > 0:
idx_to_vertex_id.update(_add_batch_vertices(client_graph, vdatas, idxs))
ntype_idx_to_vertex_id[ntype] = idx_to_vertex_id
# add edges
edatas = []
for canonical_etype in hetero_graph.canonical_etypes:
# create edge schema
src_type, etype, dst_type = canonical_etype
edge_label = f"{dataset_name}_{etype}_e"
client_schema.edgeLabel(edge_label).sourceLabel(ntype_to_vertex_label[src_type]).targetLabel(
ntype_to_vertex_label[dst_type]
).ifNotExist().create()
# add edges for batch of canonical_etype
srcs, dsts = hetero_graph.edges(etype=canonical_etype)
for src, dst in zip(srcs.numpy(), dsts.numpy(), strict=False):
edata = [
edge_label,
ntype_idx_to_vertex_id[src_type][src],
ntype_idx_to_vertex_id[dst_type][dst],
ntype_to_vertex_label[src_type],
ntype_to_vertex_label[dst_type],
{},
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def init_ogb_split_edge(
a,
b,
c,
d,
edges,
max_nodes,
edge_props,
vertex_label,
edge_label,
idx_to_vertex_id,
client_graph,
):
edatas = []
for idx, edge in enumerate(edges[f"{a}_edge_{d}mask"]):
if int(edge[0]) <= max_nodes and int(edge[1]) <= max_nodes:
properties = {q: (int(q == f"{a}_edge_{d}mask")) for q in edge_props}
if d != "neg_":
properties2 = {
f"{a}_year_mask": int(edges[f"{a}_year_mask"][idx]),
f"{a}_weight_mask": int(edges[f"{a}_weight_mask"][idx]),
}
properties3 = {
f"{b}_year_mask": -1,
f"{b}_weight_mask": -1,
f"{c}_year_mask": -1,
f"{c}_weight_mask": -1,
}
properties.update(properties2)
properties.update(properties3)
else:
properties2 = {
f"{a}_year_mask": -1,
f"{a}_weight_mask": -1,
f"{b}_year_mask": -1,
f"{b}_weight_mask": -1,
f"{c}_year_mask": -1,
f"{c}_weight_mask": -1,
}
properties.update(properties2)
edata = [
edge_label,
idx_to_vertex_id[int(edge[0])],
idx_to_vertex_id[int(edge[1])],
vertex_label,
vertex_label,
properties,
]
edatas.append(edata)
if len(edatas) == MAX_BATCH_NUM:
_add_batch_edges(client_graph, edatas)
edatas.clear()
if len(edatas) > 0:
_add_batch_edges(client_graph, edatas)
def _add_batch_vertices(client_graph, vdatas, vidxs):
vertices = client_graph.addVertices(vdatas)
assert len(vertices) == len(vidxs)
idx_to_vertex_id = {}
for i, idx in enumerate(vidxs):
idx_to_vertex_id[idx] = vertices[i].id
return idx_to_vertex_id
def _add_batch_edges(client_graph, edatas):
client_graph.addEdges(edatas)
def load_acm_raw():
# reference: https://github.com/dmlc/dgl/blob/master/examples/pytorch/han/utils.py
url = "dataset/ACM.mat"
data_path = get_download_dir() + "/ACM.mat"
if not os.path.exists(data_path):
download(_get_dgl_url(url), path=data_path)
data = scipy.io.loadmat(data_path)
p_vs_l = data["PvsL"] # paper-field?
p_vs_a = data["PvsA"] # paper-author
p_vs_t = data["PvsT"] # paper-term, bag of words
p_vs_c = data["PvsC"] # paper-conference, labels come from that
# We assign
# (1) KDD papers as class 0 (data mining),
# (2) SIGMOD and VLDB papers as class 1 (database),
# (3) SIGCOMM and MOBICOMM papers as class 2 (communication)
conf_ids = [0, 1, 9, 10, 13]
label_ids = [0, 1, 2, 2, 1]
p_selected = p_vs_c[:, conf_ids].tocoo().row
p_vs_l = p_vs_l[p_selected]
p_vs_a = p_vs_a[p_selected]
p_vs_t = p_vs_t[p_selected]
p_vs_c = p_vs_c[p_selected]
hgraph = dgl.heterograph(
{
("paper", "pa", "author"): p_vs_a.nonzero(),
("author", "ap", "paper"): p_vs_a.transpose().nonzero(),
("paper", "pf", "field"): p_vs_l.nonzero(),
("field", "fp", "paper"): p_vs_l.transpose().nonzero(),
}
)
features = torch.FloatTensor(p_vs_t.toarray())
pc_p, pc_c = p_vs_c.nonzero()
labels = np.zeros(len(p_selected), dtype=np.int64)
for conf_id, label_id in zip(conf_ids, label_ids, strict=False):
labels[pc_p[pc_c == conf_id]] = label_id
labels = torch.LongTensor(labels)
float_mask = np.zeros(len(pc_p))
for conf_id in conf_ids:
pc_c_mask = pc_c == conf_id
float_mask[pc_c_mask] = np.random.permutation(np.linspace(0, 1, pc_c_mask.sum()))
train_idx = np.where(float_mask <= 0.2)[0]
val_idx = np.where((float_mask > 0.2) & (float_mask <= 0.3))[0]
test_idx = np.where(float_mask > 0.3)[0]
num_nodes = hgraph.num_nodes("paper")
train_mask = _get_mask(num_nodes, train_idx)
val_mask = _get_mask(num_nodes, val_idx)
test_mask = _get_mask(num_nodes, test_idx)
hgraph.nodes["paper"].data["feat"] = features
hgraph.nodes["paper"].data["label"] = labels
hgraph.nodes["paper"].data["train_mask"] = train_mask
hgraph.nodes["paper"].data["val_mask"] = val_mask
hgraph.nodes["paper"].data["test_mask"] = test_mask
return hgraph
def read_input():
# reference: https://github.com/dmlc/dgl/blob/master/examples/pytorch/bgnn/run.py
# I added X, y, cat_features and masks into graph
input_folder = "dataset/avazu"
X = pd.read_csv(f"{input_folder}/X.csv")
y = pd.read_csv(f"{input_folder}/y.csv")
categorical_columns = []
if os.path.exists(f"{input_folder}/cat_features.txt"):
with open(f"{input_folder}/cat_features.txt") as f:
for line in f:
if line.strip():
categorical_columns.append(line.strip())
cat_features = None
if categorical_columns:
columns = X.columns
cat_features = np.where(columns.isin(categorical_columns))[0]
for col in list(columns[cat_features]):
X[col] = X[col].astype(str)
gs, _ = load_graphs(f"{input_folder}/graph.dgl")
graph = gs[0]
with open(f"{input_folder}/masks.json") as f:
masks = json.load(f)
# add X
features = [[int(x) for x in row] for row in X.values]
features_tensor = torch.tensor(features, dtype=torch.int32)
graph.ndata["feat"] = features_tensor
# add y
y_tensor = torch.tensor(y.values, dtype=torch.float64)
graph.ndata["class"] = y_tensor
# add masks
for mask_name, node_ids in masks["0"].items():
mask_tensor = torch.zeros(graph.number_of_nodes(), dtype=torch.int32)
mask_tensor[node_ids] = 1
graph.ndata[f"{mask_name}_mask"] = mask_tensor
# add cat_features
cat_features_tensor = torch.tensor(cat_features, dtype=torch.int32)
graph.ndata["cat_features"] = torch.repeat_interleave(
cat_features_tensor[None, :], repeats=graph.number_of_nodes(), dim=0
)
return graph
def load_training_data_gatne():
# reference: https://github.com/dmlc/dgl/blob/master/examples/pytorch/GATNE-T/src/utils.py
# reference: https://github.com/dmlc/dgl/blob/master/examples/pytorch/GATNE-T/src/main.py
f_name = "dataset/amazon/train.txt"
edge_data_by_type = {}
with open(f_name) as f:
for line in f:
words = line[:-1].split(" ") # line[-1] == '\n'
if words[0] not in edge_data_by_type:
edge_data_by_type[words[0]] = []
x, y = words[1], words[2]
edge_data_by_type[words[0]].append((x, y))
nodes, index2word = [], []
for edge_type in edge_data_by_type:
node1, node2 = zip(*edge_data_by_type[edge_type], strict=False)
index2word = index2word + list(node1) + list(node2)
index2word = list(set(index2word))
vocab = {}
i = 0
for word in index2word:
vocab[word] = i
i = i + 1
for edge_type in edge_data_by_type:
node1, node2 = zip(*edge_data_by_type[edge_type], strict=False)
tmp_nodes = list(set(list(node1) + list(node2)))
tmp_nodes = [vocab[word] for word in tmp_nodes]
nodes.append(tmp_nodes)
node_type = "_N" # '_N' can be replaced by an arbitrary name
data_dict = {}
num_nodes_dict = {node_type: len(vocab)}
for edge_type in edge_data_by_type:
tmp_data = edge_data_by_type[edge_type]
src = []
dst = []
for edge in tmp_data:
src.extend([vocab[edge[0]], vocab[edge[1]]])
dst.extend([vocab[edge[1]], vocab[edge[0]]])
data_dict[(node_type, edge_type, node_type)] = (src, dst)
graph = dgl.heterograph(data_dict, num_nodes_dict)
return graph
def _get_mask(size, indices):
mask = torch.zeros(size)
mask[indices] = 1
return mask.bool()
if __name__ == "__main__":
clear_all_data()
import_graph_from_dgl("CORA")
import_graphs_from_dgl("MUTAG")
import_hetero_graph_from_dgl("ACM")
import_graph_from_nx("CAVEMAN")
import_graph_from_dgl_with_edge_feat("CORA")
import_graph_from_ogb("ogbl-collab")
import_hetero_graph_from_dgl_bgnn("AVAZU")
import_hetero_graph_from_dgl_no_feat("amazongatne")