blob: fd1fdfd3d389618f00a27ce65a2c132171ee6432 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.computer.core.master;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hugegraph.computer.core.aggregator.Aggregator;
import org.apache.hugegraph.computer.core.aggregator.DefaultAggregator;
import org.apache.hugegraph.computer.core.aggregator.MasterAggrManager;
import org.apache.hugegraph.computer.core.bsp.Bsp4Master;
import org.apache.hugegraph.computer.core.combiner.Combiner;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.Constants;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.SuperstepStat;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.computer.core.graph.value.ValueType;
import org.apache.hugegraph.computer.core.input.MasterInputManager;
import org.apache.hugegraph.computer.core.manager.Managers;
import org.apache.hugegraph.computer.core.network.TransportUtil;
import org.apache.hugegraph.computer.core.output.ComputerOutput;
import org.apache.hugegraph.computer.core.rpc.MasterRpcManager;
import org.apache.hugegraph.computer.core.util.ShutdownHook;
import org.apache.hugegraph.computer.core.worker.WorkerStat;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.TimeUtil;
import org.slf4j.Logger;
/**
* Master service is job's controller. It controls the superstep iteration of
* the job. Master service assembles the managers used by master. For example,
* aggregator manager, input manager and so on.
*/
public class MasterService implements Closeable {
private static final Logger LOG = Log.logger(MasterService.class);
private final ComputerContext context;
private final Managers managers;
private volatile boolean inited;
private volatile boolean closed;
private Config config;
private volatile Bsp4Master bsp4Master;
private ContainerInfo masterInfo;
private List<ContainerInfo> workers;
private int maxSuperStep;
private MasterComputation masterComputation;
private volatile ShutdownHook shutdownHook;
private volatile Thread serviceThread;
public MasterService() {
this.context = ComputerContext.instance();
this.managers = new Managers();
this.closed = false;
this.shutdownHook = new ShutdownHook();
}
/**
* Init master service, create the managers used by master.
*/
public void init(Config config) {
E.checkArgument(!this.inited, "The %s has been initialized", this);
LOG.info("{} Start to initialize master", this);
this.serviceThread = Thread.currentThread();
this.registerShutdownHook();
this.config = config;
this.maxSuperStep = this.config.get(ComputerOptions.BSP_MAX_SUPER_STEP);
InetSocketAddress rpcAddress = this.initManagers();
this.masterInfo = new ContainerInfo(ContainerInfo.MASTER_ID,
TransportUtil.host(rpcAddress),
rpcAddress.getPort());
/*
* Connect to BSP server and clean the old data may be left by the
* previous job with same job id.
*/
this.bsp4Master = new Bsp4Master(this.config);
this.bsp4Master.clean();
this.masterComputation = this.config.createObject(
ComputerOptions.MASTER_COMPUTATION_CLASS);
this.masterComputation.init(new DefaultMasterContext());
this.managers.initedAll(config);
LOG.info("{} register MasterService", this);
this.bsp4Master.masterInitDone(this.masterInfo);
this.workers = this.bsp4Master.waitWorkersInitDone();
LOG.info("{} waited all workers registered, workers count: {}",
this, this.workers.size());
LOG.info("{} MasterService initialized", this);
this.inited = true;
}
private void stopServiceThread() {
if (this.serviceThread == null) {
return;
}
try {
this.serviceThread.interrupt();
} catch (Throwable ignore) {
}
}
private void registerShutdownHook() {
this.shutdownHook.hook(() -> {
this.stopServiceThread();
this.cleanAndCloseBsp();
});
}
/**
* Stop the the master service. Stop the managers created in
* {@link #init(Config)}.
*/
@Override
public synchronized void close() {
this.checkInited();
if (this.closed) {
LOG.info("{} MasterService had closed before", this);
return;
}
this.masterComputation.close(new DefaultMasterContext());
this.bsp4Master.waitWorkersCloseDone();
this.managers.closeAll(this.config);
this.cleanAndCloseBsp();
this.shutdownHook.unhook();
this.closed = true;
LOG.info("{} MasterService closed", this);
}
private void cleanAndCloseBsp() {
if (this.bsp4Master == null) {
return;
}
this.bsp4Master.clean();
this.bsp4Master.close();
}
/**
* Execute the graph. First determines which superstep to start from. And
* then execute the superstep iteration.
* After the superstep iteration, output the result.
*/
public void execute() {
StopWatch watcher = new StopWatch();
this.checkInited();
LOG.info("{} MasterService execute", this);
/*
* Step 1: Determines which superstep to start from, and resume this
* superstep.
*/
int superstep = this.superstepToResume();
LOG.info("{} MasterService resume from superstep: {}",
this, superstep);
/*
* TODO: Get input splits from HugeGraph if resume from
* Constants.INPUT_SUPERSTEP.
*/
this.bsp4Master.masterResumeDone(superstep);
/*
* Step 2: Input superstep for loading vertices and edges.
* This step may be skipped if resume from other superstep than
* Constants.INPUT_SUPERSTEP.
*/
SuperstepStat superstepStat;
watcher.start();
if (superstep == Constants.INPUT_SUPERSTEP) {
superstepStat = this.inputstep();
superstep++;
} else {
// TODO: Get superstepStat from bsp service.
superstepStat = null;
}
watcher.stop();
LOG.info("{} MasterService input step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
E.checkState(superstep <= this.maxSuperStep,
"The superstep {} can't be > maxSuperStep {}",
superstep, this.maxSuperStep);
watcher.reset();
watcher.start();
// Step 3: Iteration computation of all supersteps.
for (; superstepStat.active(); superstep++) {
LOG.info("{} MasterService superstep {} started",
this, superstep);
/*
* Superstep iteration. The steps in each superstep are:
* 1) Master waits workers superstep prepared.
* 2) All managers call beforeSuperstep.
* 3) Master signals the workers that the master prepared
* superstep.
* 4) Master waits the workers do vertex computation.
* 5) Master signal the workers that all workers have finished
* vertex computation.
* 6) Master waits the workers end the superstep, and get
* superstepStat.
* 7) Master compute whether to continue the next superstep
* iteration.
* 8) All managers call afterSuperstep.
* 9) Master signals the workers with superstepStat, and workers
* know whether to continue the next superstep iteration.
*/
this.bsp4Master.waitWorkersStepPrepareDone(superstep);
this.managers.beforeSuperstep(this.config, superstep);
this.bsp4Master.masterStepPrepareDone(superstep);
this.bsp4Master.waitWorkersStepComputeDone(superstep);
this.bsp4Master.masterStepComputeDone(superstep);
List<WorkerStat> workerStats =
this.bsp4Master.waitWorkersStepDone(superstep);
superstepStat = SuperstepStat.from(workerStats);
SuperstepContext context = new SuperstepContext(superstep,
superstepStat);
// Call master compute(), note the worker afterSuperstep() is done
boolean masterContinue = this.masterComputation.compute(context);
if (this.finishedIteration(masterContinue, context)) {
superstepStat.inactivate();
}
this.managers.afterSuperstep(this.config, superstep);
this.bsp4Master.masterStepDone(superstep, superstepStat);
LOG.info("{} MasterService superstep {} finished",
this, superstep);
}
watcher.stop();
LOG.info("{} MasterService compute step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
watcher.reset();
watcher.start();
// Step 4: Output superstep for outputting results.
this.outputstep();
watcher.stop();
LOG.info("{} MasterService output step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
}
@Override
public String toString() {
Object id = this.masterInfo == null ?
"?" + this.hashCode() : this.masterInfo.id();
return String.format("[master %s]", id);
}
private InetSocketAddress initManagers() {
// Create managers
MasterInputManager inputManager = new MasterInputManager();
this.managers.add(inputManager);
MasterAggrManager aggregatorManager = new MasterAggrManager();
this.managers.add(aggregatorManager);
MasterRpcManager rpcManager = new MasterRpcManager();
this.managers.add(rpcManager);
// Init managers
this.managers.initAll(this.config);
// Register rpc service
rpcManager.registerInputSplitService(inputManager.handler());
rpcManager.registerAggregatorService(aggregatorManager.handler());
// Start rpc server
InetSocketAddress address = rpcManager.start();
LOG.info("{} MasterService started rpc server: {}", this, address);
return address;
}
private void checkInited() {
E.checkArgument(this.inited, "The %s has not been initialized", this);
}
private int superstepToResume() {
/*
* TODO: determines which superstep to start from if failover is
* enabled.
*/
return Constants.INPUT_SUPERSTEP;
}
/**
* The superstep iteration stops if met one of the following conditions:
* 1): Has run maxSuperStep times of superstep iteration.
* 2): The mater-computation returns false that stop superstep iteration.
* 3): All vertices are inactive and no message sent in a superstep.
* @param masterContinue The master-computation decide
* @return true if finish superstep iteration.
*/
private boolean finishedIteration(boolean masterContinue,
MasterComputationContext context) {
if (!masterContinue) {
return true;
}
if (context.superstep() >= this.maxSuperStep - 1) {
return true;
}
long activeVertexCount = context.totalVertexCount() -
context.finishedVertexCount();
return context.messageCount() == 0L && activeVertexCount == 0L;
}
/**
* Coordinate with workers to load vertices and edges from HugeGraph. There
* are two phases in inputstep. First phase is get input splits from
* master, and read the vertices and edges from input splits. The second
* phase is after all workers read input splits, the workers merge the
* vertices and edges to get the stats for each partition.
*/
private SuperstepStat inputstep() {
LOG.info("{} MasterService inputstep started", this);
this.bsp4Master.waitWorkersInputDone();
this.bsp4Master.masterInputDone();
List<WorkerStat> workerStats = this.bsp4Master.waitWorkersStepDone(
Constants.INPUT_SUPERSTEP);
SuperstepStat superstepStat = SuperstepStat.from(workerStats);
this.bsp4Master.masterStepDone(Constants.INPUT_SUPERSTEP,
superstepStat);
LOG.info("{} MasterService inputstep finished with superstat {}",
this, superstepStat);
return superstepStat;
}
/**
* Wait the workers write result back. After this, the job is finished
* successfully.
*/
private void outputstep() {
LOG.info("{} MasterService outputstep started", this);
this.bsp4Master.waitWorkersOutputDone();
// Merge output files of multiple partitions
ComputerOutput output = this.config.createObject(
ComputerOptions.OUTPUT_CLASS);
output.mergePartitions(this.config);
LOG.info("{} MasterService outputstep finished", this);
}
private class DefaultMasterContext implements MasterContext {
private final MasterAggrManager aggrManager;
public DefaultMasterContext() {
this.aggrManager = MasterService.this.managers.get(
MasterAggrManager.NAME);
}
@Override
public <V extends Value, C extends Aggregator<V>>
void registerAggregator(String name, Class<C> aggregatorClass) {
E.checkArgument(aggregatorClass != null,
"The aggregator class can't be null");
Aggregator<V> aggr;
try {
aggr = aggregatorClass.newInstance();
} catch (Exception e) {
throw new ComputerException("Can't new instance from class: %s",
e, aggregatorClass.getName());
}
this.aggrManager.registerAggregator(name, aggr);
}
@Override
public <V extends Value, C extends Combiner<V>>
void registerAggregator(String name, ValueType type,
Class<C> combinerClass) {
this.registerAggregator(name, type, combinerClass, null);
}
@Override
public <V extends Value, C extends Combiner<V>>
void registerAggregator(String name, V defaultValue,
Class<C> combinerClass) {
E.checkArgument(defaultValue != null,
"The aggregator default value can't be null: %s," +
" or call another register method if necessary: " +
"registerAggregator(String name,ValueType type," +
"Class<C> combiner)", name);
this.registerAggregator(name, defaultValue.valueType(),
combinerClass, defaultValue);
}
private <V extends Value, C extends Combiner<V>>
void registerAggregator(String name, ValueType type,
Class<C> combinerClass, V defaultValue) {
Aggregator<V> aggr = new DefaultAggregator<>(
MasterService.this.context,
type, combinerClass, defaultValue);
this.aggrManager.registerAggregator(name, aggr);
}
@Override
public <V extends Value> void aggregatedValue(String name, V value) {
this.aggrManager.aggregatedAggregator(name, value);
}
@Override
public <V extends Value> V aggregatedValue(String name) {
return this.aggrManager.aggregatedValue(name);
}
@Override
public Config config() {
return MasterService.this.config;
}
}
private class SuperstepContext extends DefaultMasterContext
implements MasterComputationContext {
private final int superstep;
private final SuperstepStat superstepStat;
public SuperstepContext(int superstep, SuperstepStat superstepStat) {
this.superstep = superstep;
this.superstepStat = superstepStat;
}
@Override
public long totalVertexCount() {
return this.superstepStat.vertexCount();
}
@Override
public long totalEdgeCount() {
return this.superstepStat.edgeCount();
}
@Override
public long finishedVertexCount() {
return this.superstepStat.finishedVertexCount();
}
@Override
public long messageCount() {
return this.superstepStat.messageSendCount();
}
@Override
public long messageBytes() {
return this.superstepStat.messageSendBytes();
}
@Override
public int superstep() {
return this.superstep;
}
}
}