blob: 201a473e9888116a9349210dd8ef56b018fcf735 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.worker;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import com.baidu.hugegraph.computer.core.aggregator.Aggregator;
import com.baidu.hugegraph.computer.core.aggregator.WorkerAggrManager;
import com.baidu.hugegraph.computer.core.bsp.Bsp4Worker;
import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.ContainerInfo;
import com.baidu.hugegraph.computer.core.compute.ComputeManager;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.SuperstepStat;
import com.baidu.hugegraph.computer.core.graph.edge.Edge;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.input.WorkerInputManager;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.network.DataClientManager;
import com.baidu.hugegraph.computer.core.network.DataServerManager;
import com.baidu.hugegraph.computer.core.network.connection.ConnectionManager;
import com.baidu.hugegraph.computer.core.network.connection.TransportConnectionManager;
import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager;
import com.baidu.hugegraph.computer.core.rpc.WorkerRpcManager;
import com.baidu.hugegraph.computer.core.sender.MessageSendManager;
import com.baidu.hugegraph.computer.core.sort.sorting.RecvSortManager;
import com.baidu.hugegraph.computer.core.sort.sorting.SendSortManager;
import com.baidu.hugegraph.computer.core.sort.sorting.SortManager;
import com.baidu.hugegraph.computer.core.store.FileManager;
import com.baidu.hugegraph.computer.core.util.ShutdownHook;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
public class WorkerService implements Closeable {
private static final Logger LOG = Log.logger(WorkerService.class);
private final ComputerContext context;
private final Managers managers;
private final Map<Integer, ContainerInfo> workers;
private volatile boolean inited;
private volatile boolean closed;
private Config config;
private Bsp4Worker bsp4Worker;
private ComputeManager computeManager;
private ContainerInfo workerInfo;
private Combiner<Value> combiner;
private ContainerInfo masterInfo;
private volatile ShutdownHook shutdownHook;
private volatile Thread serviceThread;
public WorkerService() {
this.context = ComputerContext.instance();
this.managers = new Managers();
this.workers = new HashMap<>();
this.inited = false;
this.closed = false;
this.shutdownHook = new ShutdownHook();
}
/**
* Init worker service, create the managers used by worker service.
*/
public void init(Config config) {
E.checkArgument(!this.inited, "The %s has been initialized", this);
this.serviceThread = Thread.currentThread();
this.registerShutdownHook();
this.config = config;
this.workerInfo = new ContainerInfo();
LOG.info("{} Start to initialize worker", this);
this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
/*
* Keep the waitMasterInitDone() called before initManagers(),
* in order to ensure master init() before worker managers init()
*/
this.masterInfo = this.bsp4Worker.waitMasterInitDone();
InetSocketAddress address = this.initManagers(this.masterInfo);
this.workerInfo.updateAddress(address);
Computation<?> computation = this.config.createObject(
ComputerOptions.WORKER_COMPUTATION_CLASS);
LOG.info("Loading computation '{}' in category '{}'",
computation.name(), computation.category());
this.combiner = this.config.createObject(
ComputerOptions.WORKER_COMBINER_CLASS, false);
if (this.combiner == null) {
LOG.info("None combiner is provided for computation '{}'",
computation.name());
} else {
LOG.info("Combiner '{}' is provided for computation '{}'",
this.combiner.name(), computation.name());
}
LOG.info("{} register WorkerService", this);
this.bsp4Worker.workerInitDone();
List<ContainerInfo> workers = this.bsp4Worker.waitMasterAllInitDone();
DataClientManager dm = this.managers.get(DataClientManager.NAME);
for (ContainerInfo worker : workers) {
this.workers.put(worker.id(), worker);
dm.connect(worker.id(), worker.hostname(), worker.dataPort());
}
this.computeManager = new ComputeManager(this.context, this.managers);
this.managers.initedAll(this.config);
LOG.info("{} WorkerService initialized", this);
this.inited = true;
}
private void registerShutdownHook() {
this.shutdownHook.hook(() -> {
this.stopServiceThread();
this.cleanAndCloseBsp();
});
}
/**
* Stop the worker service. Stop the managers created in
* {@link #init(Config)}.
*/
@Override
public void close() {
this.checkInited();
if (this.closed) {
LOG.info("{} WorkerService had closed before", this);
return;
}
this.computeManager.close();
/*
* Seems managers.closeAll() would do the following actions:
* TODO: close the connection to other workers.
* TODO: stop the connection to the master
* TODO: stop the data transportation server.
*/
this.managers.closeAll(this.config);
this.bsp4Worker.workerCloseDone();
this.bsp4Worker.close();
this.shutdownHook.unhook();
this.closed = true;
LOG.info("{} WorkerService closed", this);
}
private void stopServiceThread() {
if (this.serviceThread == null) {
return;
}
try {
this.serviceThread.interrupt();
} catch (Throwable ignore) {
}
}
private void cleanAndCloseBsp() {
if (this.bsp4Worker == null) {
return;
}
this.bsp4Worker.clean();
this.bsp4Worker.close();
}
/**
* Execute the superstep in worker. It first wait master witch superstep
* to start from. And then do the superstep iteration until master's
* superstepStat is inactive.
*/
public void execute() {
this.checkInited();
LOG.info("{} WorkerService execute", this);
// TODO: determine superstep if fail over is enabled.
int superstep = this.bsp4Worker.waitMasterResumeDone();
SuperstepStat superstepStat;
if (superstep == Constants.INPUT_SUPERSTEP) {
superstepStat = this.inputstep();
superstep++;
} else {
// TODO: Get superstepStat from bsp service.
superstepStat = null;
}
/*
* The master determine whether to execute the next superstep. The
* superstepStat is active while master decides to execute the next
* superstep.
*/
while (superstepStat.active()) {
WorkerContext context = new SuperstepContext(superstep,
superstepStat);
LOG.info("Start computation of superstep {}", superstep);
if (superstep > 0) {
this.computeManager.takeRecvedMessages();
}
/*
* Call beforeSuperstep() before all workers compute() called.
*
* NOTE: keep computeManager.compute() called after
* managers.beforeSuperstep().
*/
this.managers.beforeSuperstep(this.config, superstep);
/*
* Notify master by each worker, when the master received all
* workers signal, then notify all workers to do compute().
*/
this.bsp4Worker.workerStepPrepareDone(superstep);
this.bsp4Worker.waitMasterStepPrepareDone(superstep);
WorkerStat workerStat = this.computeManager.compute(context,
superstep);
this.bsp4Worker.workerStepComputeDone(superstep);
this.bsp4Worker.waitMasterStepComputeDone(superstep);
/*
* Call afterSuperstep() after all workers compute() is done.
*
* NOTE: keep managers.afterSuperstep() called after
* computeManager.compute(), because managers may rely on
* computation, like WorkerAggrManager send aggregators to master
* after called aggregateValue(String name, V value) in computation.
*/
this.managers.afterSuperstep(this.config, superstep);
this.bsp4Worker.workerStepDone(superstep, workerStat);
LOG.info("End computation of superstep {}", superstep);
superstepStat = this.bsp4Worker.waitMasterStepDone(superstep);
superstep++;
}
this.outputstep();
}
@Override
public String toString() {
Object id = this.workerInfo == null ?
"?" + this.hashCode() : this.workerInfo.id();
return String.format("[worker %s]", id);
}
private InetSocketAddress initManagers(ContainerInfo masterInfo) {
// Create managers
WorkerRpcManager rpcManager = new WorkerRpcManager();
this.managers.add(rpcManager);
/*
* NOTE: this init() method will be called twice, will be ignored at
* the 2nd time call.
*/
WorkerRpcManager.updateRpcRemoteServerConfig(this.config,
masterInfo.hostname(),
masterInfo.rpcPort());
rpcManager.init(this.config);
WorkerAggrManager aggregatorManager = new WorkerAggrManager(
this.context);
aggregatorManager.service(rpcManager.aggregateRpcService());
this.managers.add(aggregatorManager);
FileManager fileManager = new FileManager();
this.managers.add(fileManager);
SortManager recvSortManager = new RecvSortManager(this.context);
this.managers.add(recvSortManager);
MessageRecvManager recvManager = new MessageRecvManager(this.context,
fileManager, recvSortManager);
this.managers.add(recvManager);
ConnectionManager connManager = new TransportConnectionManager();
DataServerManager serverManager = new DataServerManager(connManager,
recvManager);
this.managers.add(serverManager);
DataClientManager clientManager = new DataClientManager(connManager,
this.context);
this.managers.add(clientManager);
SortManager sendSortManager = new SendSortManager(this.context);
this.managers.add(sendSortManager);
MessageSendManager sendManager = new MessageSendManager(this.context,
sendSortManager,
clientManager.sender());
this.managers.add(sendManager);
WorkerInputManager inputManager = new WorkerInputManager(this.context,
sendManager);
inputManager.service(rpcManager.inputSplitService());
this.managers.add(inputManager);
// Init all managers
this.managers.initAll(this.config);
InetSocketAddress address = serverManager.address();
LOG.info("{} WorkerService initialized managers with data server " +
"address '{}'", this, address);
return address;
}
private void checkInited() {
E.checkArgument(this.inited, "The %s has not been initialized", this);
}
/**
* Load vertices and edges from HugeGraph. There are two phases in
* inputstep. First phase is get input splits from master, and read the
* vertices and edges from input splits. The second phase is after all
* workers read input splits, the workers merge the vertices and edges to
* get the stats for each partition.
*/
private SuperstepStat inputstep() {
LOG.info("{} WorkerService inputstep started", this);
WorkerInputManager manager = this.managers.get(WorkerInputManager.NAME);
manager.loadGraph();
this.bsp4Worker.workerInputDone();
this.bsp4Worker.waitMasterInputDone();
WorkerStat workerStat = this.computeManager.input();
this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP,
workerStat);
SuperstepStat superstepStat = this.bsp4Worker.waitMasterStepDone(
Constants.INPUT_SUPERSTEP);
manager.close(this.config);
LOG.info("{} WorkerService inputstep finished", this);
return superstepStat;
}
/**
* Write results back parallel to HugeGraph and signal the master. Be
* called after all superstep iteration finished. After this, this worker
* can exit successfully.
*/
private void outputstep() {
this.computeManager.output();
this.bsp4Worker.workerOutputDone();
LOG.info("{} WorkerService outputstep finished", this);
}
private class SuperstepContext implements WorkerContext {
private final int superstep;
private final SuperstepStat superstepStat;
private final WorkerAggrManager aggrManager;
private final MessageSendManager sendManager;
private SuperstepContext(int superstep, SuperstepStat superstepStat) {
this.superstep = superstep;
this.superstepStat = superstepStat;
this.aggrManager = WorkerService.this.managers.get(
WorkerAggrManager.NAME);
this.sendManager = WorkerService.this.managers.get(
MessageSendManager.NAME);
}
@Override
public Config config() {
return WorkerService.this.config;
}
@Override
public <V extends Value> Aggregator<V> createAggregator(String name) {
return this.aggrManager.createAggregator(name);
}
@Override
public <V extends Value> void aggregateValue(String name, V value) {
this.aggrManager.aggregateValue(name, value);
}
@Override
public <V extends Value> V aggregatedValue(String name) {
return this.aggrManager.aggregatedValue(name);
}
@Override
public void sendMessage(Id target, Value value) {
this.sendManager.sendMessage(target, value);
}
@Override
public void sendMessageToAllEdges(Vertex vertex, Value value) {
for (Edge edge : vertex.edges()) {
this.sendMessage(edge.targetId(), value);
}
}
@Override
public long totalVertexCount() {
return this.superstepStat.vertexCount();
}
@Override
public long totalEdgeCount() {
return this.superstepStat.edgeCount();
}
@Override
public int superstep() {
return this.superstep;
}
/**
* Message combiner.
*/
@Override
@SuppressWarnings("unchecked")
public <V extends Value> Combiner<V> combiner() {
return (Combiner<V>) WorkerService.this.combiner;
}
}
}