blob: 51689ef2703687e7a022f45b2f123235ab01eb0a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C0103,C0206,W0612,C0209,R1705,C0200,R1735,W0201
"""
Boost-GNN (BGNN)
References
----------
Paper: https://arxiv.org/abs/2101.08543
Author's code: https://github.com/nd7141/bgnn
DGL code: https://github.com/dmlc/dgl/tree/master/examples/pytorch/bgnn
"""
import itertools
import time
from collections import defaultdict as ddict
import dgl
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from catboost import CatBoostClassifier, CatBoostRegressor, Pool, sum_models
from sklearn import preprocessing
from sklearn.metrics import r2_score
from tqdm import tqdm
from category_encoders import CatBoostEncoder
from dgl.nn.pytorch import (
AGNNConv as AGNNConvDGL,
APPNPConv,
ChebConv as ChebConvDGL,
GATConv as GATConvDGL,
GraphConv,
)
from torch.nn import Dropout, ELU, Linear, ReLU, Sequential
class BGNNPredictor:
"""
Description
-----------
Boost GNN predictor for semi-supervised node classification or regression problems.
Publication: https://arxiv.org/abs/2101.08543
Parameters
----------
gnn_model : nn.Module
DGL implementation of GNN model.
task: str, optional
Regression or classification task.
loss_fn : callable, optional
Function that takes torch tensors, pred and true, and returns a scalar.
trees_per_epoch : int, optional
Number of GBDT trees to build each epoch.
backprop_per_epoch : int, optional
Number of backpropagation steps to make each epoch.
lr : float, optional
Learning rate of gradient descent optimizer.
append_gbdt_pred : bool, optional
Append GBDT predictions or replace original input node features.
train_input_features : bool, optional
Train original input node features.
gbdt_depth : int, optional
Depth of each tree in GBDT model.
gbdt_lr : float, optional
Learning rate of GBDT model.
gbdt_alpha : int, optional
Weight to combine previous and new GBDT trees.
random_seed : int, optional
random seed for GNN and GBDT models.
Examples
----------
gnn_model = GAT(10, 20, num_heads=5),
bgnn = BGNNPredictor(gnn_model)
metrics = bgnn.fit(graph, X, y, train_mask, val_mask, test_mask, cat_features)
"""
def __init__(
self,
gnn_model,
task="regression",
loss_fn=None,
trees_per_epoch=10,
backprop_per_epoch=10,
lr=0.01,
append_gbdt_pred=True,
train_input_features=False,
gbdt_depth=6,
gbdt_lr=0.1,
gbdt_alpha=1,
random_seed=0,
):
self.device = torch.device("cpu")
self.model = gnn_model.to(self.device)
self.task = task
self.loss_fn = loss_fn
self.trees_per_epoch = trees_per_epoch
self.backprop_per_epoch = backprop_per_epoch
self.lr = lr
self.append_gbdt_pred = append_gbdt_pred
self.train_input_features = train_input_features
self.gbdt_depth = gbdt_depth
self.gbdt_lr = gbdt_lr
self.gbdt_alpha = gbdt_alpha
self.random_seed = random_seed
torch.manual_seed(random_seed)
np.random.seed(random_seed)
def init_gbdt_model(self, num_epochs, epoch):
if self.task == "regression":
catboost_model_obj = CatBoostRegressor
catboost_loss_fn = "RMSE"
else:
if epoch == 0: # we predict multiclass probs at first epoch
catboost_model_obj = CatBoostClassifier
catboost_loss_fn = "MultiClass"
else: # we predict the gradients for each class at epochs > 0
catboost_model_obj = CatBoostRegressor
catboost_loss_fn = "MultiRMSE"
return catboost_model_obj(
iterations=num_epochs,
depth=self.gbdt_depth,
learning_rate=self.gbdt_lr,
loss_function=catboost_loss_fn,
random_seed=self.random_seed,
nan_mode="Min",
)
def fit_gbdt(self, pool, trees_per_epoch, epoch):
gbdt_model = self.init_gbdt_model(trees_per_epoch, epoch)
gbdt_model.fit(pool, verbose=False)
return gbdt_model
def append_gbdt_model(self, new_gbdt_model, weights):
if self.gbdt_model is None:
return new_gbdt_model
return sum_models([self.gbdt_model, new_gbdt_model], weights=weights)
def train_gbdt(
self,
gbdt_X_train,
gbdt_y_train,
cat_features,
epoch,
gbdt_trees_per_epoch,
gbdt_alpha,
):
pool = Pool(gbdt_X_train, gbdt_y_train, cat_features=cat_features)
epoch_gbdt_model = self.fit_gbdt(pool, gbdt_trees_per_epoch, epoch)
if epoch == 0 and self.task == "classification":
self.base_gbdt = epoch_gbdt_model
else:
self.gbdt_model = self.append_gbdt_model(
epoch_gbdt_model, weights=[1, gbdt_alpha]
)
def update_node_features(self, node_features, X, original_X):
# get predictions from gbdt model
if self.task == "regression":
predictions = np.expand_dims(self.gbdt_model.predict(original_X), axis=1)
else:
predictions = self.base_gbdt.predict_proba(original_X)
if self.gbdt_model is not None:
predictions_after_one = self.gbdt_model.predict(original_X)
predictions += predictions_after_one
# update node features with predictions
if self.append_gbdt_pred:
if self.train_input_features:
predictions = np.append(
node_features.detach().cpu().data[:, : -self.out_dim],
predictions,
axis=1,
) # replace old predictions with new predictions
else:
predictions = np.append(
X, predictions, axis=1
) # append original features with new predictions
predictions = torch.from_numpy(predictions).to(self.device)
node_features.data = predictions.float().data
def update_gbdt_targets(self, node_features, node_features_before, train_mask):
return (
(node_features - node_features_before)
.detach()
.cpu()
.numpy()[train_mask, -self.out_dim :]
)
def init_node_features(self, X):
node_features = torch.empty(
X.shape[0], self.in_dim, requires_grad=True, device=self.device
)
if self.append_gbdt_pred:
node_features.data[:, : -self.out_dim] = torch.from_numpy(
X.to_numpy(copy=True)
)
return node_features
def init_optimizer(self, node_features, optimize_node_features, learning_rate):
params = [self.model.parameters()]
if optimize_node_features:
params.append([node_features])
optimizer = torch.optim.Adam(itertools.chain(*params), lr=learning_rate)
return optimizer
def train_model(self, model_in, target_labels, train_mask, optimizer):
y = target_labels[train_mask]
self.model.train()
logits = self.model(*model_in).squeeze()
pred = logits[train_mask]
if self.loss_fn is not None:
loss = self.loss_fn(pred, y)
else:
if self.task == "regression":
loss = torch.sqrt(F.mse_loss(pred, y))
elif self.task == "classification":
loss = F.cross_entropy(pred, y.long())
else:
raise NotImplementedError(
"Unknown task. Supported tasks: classification, regression."
)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss
def evaluate_model(self, logits, target_labels, mask):
metrics = {}
y = target_labels[mask]
with torch.no_grad():
pred = logits[mask]
if self.task == "regression":
metrics["loss"] = torch.sqrt(F.mse_loss(pred, y).squeeze() + 1e-8)
metrics["rmsle"] = torch.sqrt(
F.mse_loss(torch.log(pred + 1), torch.log(y + 1)).squeeze() + 1e-8
)
metrics["mae"] = F.l1_loss(pred, y)
metrics["r2"] = torch.Tensor(
[r2_score(y.cpu().numpy(), pred.cpu().numpy())]
)
elif self.task == "classification":
metrics["loss"] = F.cross_entropy(pred, y.long())
metrics["accuracy"] = torch.Tensor(
[(y == pred.max(1)[1]).sum().item() / y.shape[0]]
)
return metrics
def train_and_evaluate(
self,
model_in,
target_labels,
train_mask,
val_mask,
test_mask,
optimizer,
metrics,
gnn_passes_per_epoch,
):
loss = None
for _ in range(gnn_passes_per_epoch):
loss = self.train_model(model_in, target_labels, train_mask, optimizer)
self.model.eval()
logits = self.model(*model_in).squeeze()
train_results = self.evaluate_model(logits, target_labels, train_mask)
val_results = self.evaluate_model(logits, target_labels, val_mask)
test_results = self.evaluate_model(logits, target_labels, test_mask)
for metric_name in train_results:
metrics[metric_name].append(
(
train_results[metric_name].detach().item(),
val_results[metric_name].detach().item(),
test_results[metric_name].detach().item(),
)
)
return loss
def update_early_stopping(
self,
metrics,
epoch,
best_metric,
best_val_epoch,
epochs_since_last_best_metric,
metric_name,
lower_better=False,
):
train_metric, val_metric, test_metric = metrics[metric_name][-1]
if (lower_better and val_metric < best_metric[1]) or (
not lower_better and val_metric > best_metric[1]
):
best_metric = metrics[metric_name][-1]
best_val_epoch = epoch
epochs_since_last_best_metric = 0
else:
epochs_since_last_best_metric += 1
return best_metric, best_val_epoch, epochs_since_last_best_metric
def log_epoch(
self,
pbar,
metrics,
epoch,
loss,
epoch_time,
logging_epochs,
metric_name="loss",
):
train_metric, val_metric, test_metric = metrics[metric_name][-1]
if epoch and epoch % logging_epochs == 0:
pbar.set_description(
f"Epoch {epoch:05d} | Loss {loss:.3f} | \\"
f"Loss {train_metric:.3f}/{val_metric:.3f}/{test_metric:.3f} \\"
f" | Time {epoch_time:.4f}"
)
def fit(
self,
graph,
X,
y,
train_mask,
val_mask,
test_mask,
original_X=None,
cat_features=None,
num_epochs=100,
patience=10,
logging_epochs=1,
metric_name="loss",
):
"""
:param graph : dgl.DGLGraph
Input graph
:param X : pd.DataFrame
Input node features. Each column represents one input feature. Each row is a node.
Values in dataframe are numerical, after preprocessing.
:param y : pd.DataFrame
Input node targets. Each column represents one target. Each row is a node
(order of nodes should be the same as in X).
:param train_mask : list[int]
Node indexes (rows) that belong to train set.
:param val_mask : list[int]
Node indexes (rows) that belong to validation set.
:param test_mask : list[int]
Node indexes (rows) that belong to test set.
:param original_X : pd.DataFrame, optional
Input node features before preprocessing. Each column represents one input feature. Each row is a node.
Values in dataframe can be of any type, including categorical (e.g. string, bool) or
missing values (None). This is useful if you want to preprocess X with GBDT model.
:param cat_features: list[int]
Feature indexes (columns) which are categorical features.
:param num_epochs : int
Number of epochs to run.
:param patience : int
Number of epochs to wait until early stopping.
:param logging_epochs : int
Log every n epoch.
:param metric_name : str
Metric to use for early stopping.
:param normalize_features : bool
If to normalize original input features X (column wise).
:param replace_na: bool
If to replace missing values (None) in X.
:return: metrics evaluated during training
"""
# initialize for early stopping and metrics
if metric_name in ["r2", "accuracy"]:
best_metric = [np.cfloat("-inf")] * 3 # for train/val/test
else:
best_metric = [np.cfloat("inf")] * 3 # for train/val/test
best_val_epoch = 0
epochs_since_last_best_metric = 0
metrics = ddict(list)
if cat_features is None:
cat_features = []
if self.task == "regression":
self.out_dim = y.shape[1]
elif self.task == "classification":
self.out_dim = len(set(y.iloc[test_mask, 0]))
self.in_dim = (
self.out_dim + X.shape[1] if self.append_gbdt_pred else self.out_dim
)
if original_X is None:
original_X = X.copy()
cat_features = []
gbdt_X_train = original_X.iloc[train_mask]
gbdt_y_train = y.iloc[train_mask]
gbdt_alpha = self.gbdt_alpha
self.gbdt_model = None
node_features = self.init_node_features(X)
optimizer = self.init_optimizer(
node_features, optimize_node_features=True, learning_rate=self.lr
)
y = torch.from_numpy(y.to_numpy(copy=True)).float().squeeze().to(self.device)
graph = graph.to(self.device)
pbar = tqdm(range(num_epochs))
for epoch in pbar:
start2epoch = time.time()
# gbdt part
self.train_gbdt(
gbdt_X_train,
gbdt_y_train,
cat_features,
epoch,
self.trees_per_epoch,
gbdt_alpha,
)
self.update_node_features(node_features, X, original_X)
node_features_before = node_features.clone()
model_in = (graph, node_features)
loss = self.train_and_evaluate(
model_in,
y,
train_mask,
val_mask,
test_mask,
optimizer,
metrics,
self.backprop_per_epoch,
)
gbdt_y_train = self.update_gbdt_targets(
node_features, node_features_before, train_mask
)
self.log_epoch(
pbar,
metrics,
epoch,
loss,
time.time() - start2epoch,
logging_epochs,
metric_name=metric_name,
)
# check early stopping
(
best_metric,
best_val_epoch,
epochs_since_last_best_metric,
) = self.update_early_stopping(
metrics,
epoch,
best_metric,
best_val_epoch,
epochs_since_last_best_metric,
metric_name,
lower_better=(metric_name not in ["r2", "accuracy"]),
)
if patience and epochs_since_last_best_metric > patience:
break
if np.isclose(gbdt_y_train.sum(), 0.0):
print("Node embeddings do not change anymore. Stopping...")
break
print(
"Best {} at iteration {}: {:.3f}/{:.3f}/{:.3f}".format(
metric_name, best_val_epoch, *best_metric
)
)
return metrics
def predict(self, graph, X, test_mask):
graph = graph.to(self.device)
node_features = torch.empty(X.shape[0], self.in_dim).to(self.device)
self.update_node_features(node_features, X, X)
logits = self.model(graph, node_features).squeeze()
if self.task == "regression":
return logits[test_mask]
else:
return logits[test_mask].max(1)[1]
def plot_interactive(
self,
metrics,
legend,
title,
logx=False,
logy=False,
metric_name="loss",
start_from=0,
):
import plotly.graph_objects as go
metric_results = metrics[metric_name]
xs = [list(range(len(metric_results)))] * len(metric_results[0])
ys = list(zip(*metric_results))
fig = go.Figure()
for i in range(len(ys)):
fig.add_trace(
go.Scatter(
x=xs[i][start_from:],
y=ys[i][start_from:],
mode="lines+markers",
name=legend[i],
)
)
fig.update_layout(
title=title,
title_x=0.5,
xaxis_title="Epoch",
yaxis_title=metric_name,
font=dict(
size=40,
),
height=600,
)
if logx:
fig.update_layout(xaxis_type="log")
if logy:
fig.update_layout(yaxis_type="log")
fig.show()
class GNNModelDGL(torch.nn.Module):
def __init__(
self,
in_dim,
hidden_dim,
out_dim,
dropout=0.0,
name="gat",
residual=True,
use_mlp=False,
join_with_mlp=False,
):
super(GNNModelDGL, self).__init__()
self.name = name
self.use_mlp = use_mlp
self.join_with_mlp = join_with_mlp
self.normalize_input_columns = True
if name == "gat":
self.l1 = GATConvDGL(
in_dim,
hidden_dim // 8,
8,
feat_drop=dropout,
attn_drop=dropout,
residual=False,
activation=F.elu,
)
self.l2 = GATConvDGL(
hidden_dim,
out_dim,
1,
feat_drop=dropout,
attn_drop=dropout,
residual=residual,
activation=None,
)
elif name == "gcn":
self.l1 = GraphConv(in_dim, hidden_dim, activation=F.elu)
self.l2 = GraphConv(hidden_dim, out_dim, activation=F.elu)
self.drop = Dropout(p=dropout)
elif name == "cheb":
self.l1 = ChebConvDGL(in_dim, hidden_dim, k=3)
self.l2 = ChebConvDGL(hidden_dim, out_dim, k=3)
self.drop = Dropout(p=dropout)
elif name == "agnn":
self.lin1 = Sequential(
Dropout(p=dropout), Linear(in_dim, hidden_dim), ELU()
)
self.l1 = AGNNConvDGL(learn_beta=False)
self.l2 = AGNNConvDGL(learn_beta=True)
self.lin2 = Sequential(
Dropout(p=dropout), Linear(hidden_dim, out_dim), ELU()
)
elif name == "appnp":
self.lin1 = Sequential(
Dropout(p=dropout),
Linear(in_dim, hidden_dim),
ReLU(),
Dropout(p=dropout),
Linear(hidden_dim, out_dim),
)
self.l1 = APPNPConv(k=10, alpha=0.1, edge_drop=0.0)
def forward(self, graph, features):
h = features
logits = None
if self.use_mlp:
if self.join_with_mlp:
h = torch.cat((h, self.mlp(features)), 1)
else:
h = self.mlp(features)
if self.name == "gat":
h = self.l1(graph, h).flatten(1)
logits = self.l2(graph, h).mean(1)
elif self.name in ["appnp"]:
h = self.lin1(h)
logits = self.l1(graph, h)
elif self.name == "agnn":
h = self.lin1(h)
h = self.l1(graph, h)
h = self.l2(graph, h)
logits = self.lin2(h)
elif self.name == "che3b":
lambda_max = dgl.laplacian_lambda_max(graph)
h = self.drop(h)
h = self.l1(graph, h, lambda_max)
logits = self.l2(graph, h, lambda_max)
elif self.name == "gcn":
h = self.drop(h)
h = self.l1(graph, h)
logits = self.l2(graph, h)
return logits
def normalize_features(X, train_mask, val_mask, test_mask):
min_max_scaler = preprocessing.MinMaxScaler()
A = X.to_numpy(copy=True)
A[train_mask] = min_max_scaler.fit_transform(A[train_mask])
A[val_mask + test_mask] = min_max_scaler.transform(A[val_mask + test_mask])
return pd.DataFrame(A, columns=X.columns).astype(float)
def replace_na(X, train_mask):
if X.isna().any().any():
return X.fillna(X.iloc[train_mask].min() - 1)
return X
def encode_cat_features(X, y, cat_features, train_mask, val_mask, test_mask):
enc = CatBoostEncoder()
A = X.to_numpy(copy=True)
b = y.to_numpy(copy=True)
A[np.ix_(train_mask, cat_features)] = enc.fit_transform(
A[np.ix_(train_mask, cat_features)], b[train_mask]
)
A[np.ix_(val_mask + test_mask, cat_features)] = enc.transform(
A[np.ix_(val_mask + test_mask, cat_features)]
)
A = A.astype(float)
return pd.DataFrame(A, columns=X.columns)
def convert_data(g):
retrieved_tensor = g.ndata["feat"]
retrieved_np = retrieved_tensor.numpy()
retrieved_str = retrieved_np.astype(str)
X = pd.DataFrame(retrieved_str)
retrieved_y_tensor = g.ndata["class"]
retrieved_y_np = retrieved_y_tensor.numpy()
y = pd.DataFrame(retrieved_y_np)
retrieved_cat_features_tensor = g.ndata["cat_features"][0]
cat_features = retrieved_cat_features_tensor.numpy()
train_mask = g.ndata["train_mask"].numpy().tolist()
val_mask = g.ndata["val_mask"].numpy().tolist()
test_mask = g.ndata["test_mask"].numpy().tolist()
masks = {
"0": {
"train": [i for i, v in enumerate(train_mask) if v == 1],
"val": [i for i, v in enumerate(val_mask) if v == 1],
"test": [i for i, v in enumerate(test_mask) if v == 1],
}
}
# graph, X, y, cat_features, masks = read_input(input_folder)
train_mask, val_mask, test_mask = (
masks["0"]["train"],
masks["0"]["val"],
masks["0"]["test"],
)
return X, y, cat_features, train_mask, val_mask, test_mask