blob: 23d01f4e2c50f4616edcfc5f47131a502f38b897 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.computer.core.bsp;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
public abstract class BspBase {
private static final Logger LOG = Log.logger(BspBase.class);
private final Config config;
private final String jobId;
private final String jobNamespace;
private final int workerCount;
private final long registerTimeout;
private final long barrierOnMasterTimeout;
private final long barrierOnWorkersTimeout;
private final long logInterval;
private final BspClient bspClient;
public BspBase(Config config) {
this.config = config;
this.jobId = config.get(ComputerOptions.JOB_ID);
this.jobNamespace = config.get(ComputerOptions.JOB_NAMESPACE);
this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT);
this.registerTimeout = this.config.get(
ComputerOptions.BSP_REGISTER_TIMEOUT);
this.barrierOnWorkersTimeout = this.config.get(
ComputerOptions.BSP_WAIT_WORKERS_TIMEOUT);
this.barrierOnMasterTimeout = this.config.get(
ComputerOptions.BSP_WAIT_MASTER_TIMEOUT);
this.logInterval = this.config.get(ComputerOptions.BSP_LOG_INTERVAL);
this.bspClient = this.init();
}
/**
* Do initialization operation, like connect to etcd or ZooKeeper cluster.
*/
private BspClient init() {
BspClient bspClient = this.createBspClient();
String namespace = StringUtils.isEmpty(this.jobNamespace) ?
this.constructPath(null, this.jobId) :
this.constructPath(null, this.jobNamespace, this.jobId);
bspClient.init(namespace);
LOG.info("Init {} BSP connection to '{}' for job '{}'",
bspClient.type(), bspClient.endpoint(), this.jobId);
return bspClient;
}
/**
* Close the connection to etcd or Zookeeper. Contrary to init.
* Could not do any bsp operation after close is called.
*/
public void close() {
this.bspClient.close();
LOG.info("Closed {} BSP connection '{}' for job '{}'",
this.bspClient.type(), this.bspClient.endpoint(), this.jobId);
}
/**
* Cleaned up the BSP data
*/
public void clean() {
try {
this.bspClient().clean();
} catch (Exception e) {
throw new ComputerException("Failed to clean up the BSP data: %s",
e, this.bspClient().endpoint());
}
LOG.info("Cleaned up the BSP data: {}", this.bspClient().endpoint());
}
private BspClient createBspClient() {
// TODO: create from factory. the type of bsp can be get from config
return new EtcdBspClient(this.config);
}
protected final BspClient bspClient() {
return this.bspClient;
}
public final int workerCount() {
return this.workerCount;
}
public final long registerTimeout() {
return this.registerTimeout;
}
public final long barrierOnMasterTimeout() {
return this.barrierOnMasterTimeout;
}
public final long barrierOnWorkersTimeout() {
return this.barrierOnWorkersTimeout;
}
public final long logInterval() {
return this.logInterval;
}
/**
* This method is used to generate the key saved in etcd. We can add
* attempt id in the path for further checkpoint based implementation.
*/
protected String constructPath(BspEvent event, Object... paths) {
StringBuilder sb = new StringBuilder();
if (event != null) {
// TODO: replace event.code() with event.name()
sb.append(event.name());
}
for (Object path : paths) {
sb.append("/").append(path.toString());
}
return sb.toString();
}
}