blob: 27758194cd2a97a75daf45e271fdb682bde68092 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import net.iharder.Base64;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.RPCCommunications;
import org.apache.giraph.comm.ServerInterface;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
*/
@SuppressWarnings("rawtypes")
public class BspServiceWorker<
I extends WritableComparable,
V extends Writable,
E extends Writable,
M extends Writable>
extends BspService<I, V, E, M>
implements CentralizedServiceWorker<I, V, E, M> {
/** Number of input splits */
private int inputSplitCount = -1;
/** My process health znode */
private String myHealthZnode;
/** List of aggregators currently in use */
private Set<String> aggregatorInUse = new TreeSet<String>();
/** Worker info */
private final WorkerInfo workerInfo;
/** Worker graph partitioner */
private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
/** Input split vertex cache (only used when loading from input split) */
private final Map<PartitionOwner, Partition<I, V, E, M>>
inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
/** Communication service */
private final ServerInterface<I, V, E, M> commService;
/** Structure to store the partitions on this worker */
private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
new HashMap<Integer, Partition<I, V, E, M>>();
/** Have the partition exchange children (workers) changed? */
private final BspEvent partitionExchangeChildrenChanged =
new PredicateLock();
/** Max vertices per partition before sending */
private final int maxVerticesPerPartition;
/** Worker Context */
private final WorkerContext workerContext;
/** Total vertices loaded */
private long totalVerticesLoaded = 0;
/** Total edges loaded */
private long totalEdgesLoaded = 0;
/** Input split max vertices (-1 denotes all) */
private final long inputSplitMaxVertices;
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
public BspServiceWorker(
String serverPortList,
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphMapper<I, V, E, M> graphMapper,
GraphState<I, V, E,M> graphState)
throws UnknownHostException, IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
registerBspEvent(partitionExchangeChildrenChanged);
int finalRpcPort =
getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
getTaskPartition();
maxVerticesPerPartition =
getConfiguration().getInt(
GiraphJob.MAX_VERTICES_PER_PARTITION,
GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
inputSplitMaxVertices =
getConfiguration().getLong(
GiraphJob.INPUT_SPLIT_MAX_VERTICES,
GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
workerInfo =
new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
commService = new RPCCommunications<I, V, E, M>(
context, this, graphState);
graphState.setWorkerCommunications(commService);
this.workerContext =
BspUtils.createWorkerContext(getConfiguration(),
graphMapper.getGraphState());
}
public WorkerContext getWorkerContext() {
return workerContext;
}
/**
* Intended to check the health of the node. For instance, can it ssh,
* dmesg, etc. For now, does nothing.
*/
public boolean isHealthy() {
return true;
}
/**
* Use an aggregator in this superstep.
*
* @param name
* @return boolean (false when aggregator not registered)
*/
public boolean useAggregator(String name) {
if (getAggregatorMap().get(name) == null) {
LOG.error("userAggregator: Aggregator=" + name + " not registered");
return false;
}
aggregatorInUse.add(name);
return true;
}
/**
* Try to reserve an InputSplit for loading. While InputSplits exists that
* are not finished, wait until they are.
*
* @return reserved InputSplit or null if no unfinished InputSplits exist
*/
private String reserveInputSplit() {
List<String> inputSplitPathList = null;
try {
inputSplitPathList =
getZkExt().getChildrenExt(INPUT_SPLIT_PATH, false, false, true);
if (inputSplitCount == -1) {
inputSplitCount = inputSplitPathList.size();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
String reservedInputSplitPath = null;
Stat reservedStat = null;
while (true) {
int finishedInputSplits = 0;
for (int i = 0; i < inputSplitPathList.size(); ++i) {
String tmpInputSplitFinishedPath =
inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
try {
reservedStat =
getZkExt().exists(tmpInputSplitFinishedPath, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (reservedStat != null) {
++finishedInputSplits;
continue;
}
String tmpInputSplitReservedPath =
inputSplitPathList.get(i) + INPUT_SPLIT_RESERVED_NODE;
try {
reservedStat =
getZkExt().exists(tmpInputSplitReservedPath, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (reservedStat == null) {
try {
// Attempt to reserve this InputSplit
getZkExt().createExt(tmpInputSplitReservedPath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
false);
reservedInputSplitPath = inputSplitPathList.get(i);
if (LOG.isInfoEnabled()) {
float percentFinished =
finishedInputSplits * 100.0f /
inputSplitPathList.size();
LOG.info("reserveInputSplit: Reserved input " +
"split path " + reservedInputSplitPath +
", overall roughly " +
+ percentFinished +
"% input splits finished");
}
return reservedInputSplitPath;
} catch (KeeperException.NodeExistsException e) {
LOG.info("reserveInputSplit: Couldn't reserve " +
"(already reserved) inputSplit" +
" at " + tmpInputSplitReservedPath);
} catch (KeeperException e) {
throw new IllegalStateException(
"reserveInputSplit: KeeperException on reserve", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"reserveInputSplit: InterruptedException " +
"on reserve", e);
}
}
}
if (LOG.isInfoEnabled()) {
LOG.info("reserveInputSplit: reservedPath = " +
reservedInputSplitPath + ", " + finishedInputSplits +
" of " + inputSplitPathList.size() +
" InputSplits are finished.");
}
if (finishedInputSplits == inputSplitPathList.size()) {
return null;
}
// Wait for either a reservation to go away or a notification that
// an InputSplit has finished.
getInputSplitsStateChangedEvent().waitMsecs(60*1000);
getInputSplitsStateChangedEvent().reset();
}
}
/**
* Load the vertices from the user-defined VertexReader into our partitions
* of vertex ranges. Do this until all the InputSplits have been processed.
* All workers will try to do as many InputSplits as they can. The master
* will monitor progress and stop this once all the InputSplits have been
* loaded and check-pointed. Keep track of the last input split path to
* ensure the input split cache is flushed prior to marking the last input
* split complete.
*
* @throws IOException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
private VertexEdgeCount loadVertices() throws IOException,
ClassNotFoundException,
InterruptedException, InstantiationException,
IllegalAccessException {
String inputSplitPath = null;
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
while ((inputSplitPath = reserveInputSplit()) != null) {
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
loadVerticesFromInputSplit(inputSplitPath));
}
// Flush the remaining cached vertices
for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
inputSplitCache.entrySet()) {
if (!entry.getValue().getVertices().isEmpty()) {
commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
entry.getValue());
entry.getValue().getVertices().clear();
}
}
inputSplitCache.clear();
return vertexEdgeCount;
}
/**
* Mark an input split path as completed by this worker. This notifies
* the master and the other workers that this input split has not only
* been reserved, but also marked processed.
*
* @param inputSplitPath Path to the input split.
*/
private void markInputSplitPathFinished(String inputSplitPath) {
String inputSplitFinishedPath =
inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
try {
getZkExt().createExt(inputSplitFinishedPath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
} catch (KeeperException.NodeExistsException e) {
LOG.warn("loadVertices: " + inputSplitFinishedPath +
" already exists!");
} catch (KeeperException e) {
throw new IllegalStateException(
"loadVertices: KeeperException on " +
inputSplitFinishedPath, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"loadVertices: InterruptedException on " +
inputSplitFinishedPath, e);
}
}
/**
* Extract vertices from input split, saving them into a mini cache of
* partitions. Periodically flush the cache of vertices when a limit is
* reached in readVerticeFromInputSplit.
* Mark the input split finished when done.
*
* @param inputSplitPath ZK location of input split
* @return Mapping of vertex indices and statistics, or null if no data read
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws InstantiationException
* @throws IllegalAccessException
*/
private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
VertexEdgeCount vertexEdgeCount =
readVerticesFromInputSplit(inputSplit);
if (LOG.isInfoEnabled()) {
LOG.info("loadVerticesFromInputSplit: Finished loading " +
inputSplitPath + " " + vertexEdgeCount);
}
markInputSplitPathFinished(inputSplitPath);
return vertexEdgeCount;
}
/**
* Talk to ZooKeeper to convert the input split path to the actual
* InputSplit containing the vertices to read.
*
* @param inputSplitPath Location in ZK of input split
* @return instance of InputSplit containing vertices to read
* @throws IOException
* @throws ClassNotFoundException
*/
private InputSplit getInputSplitForVertices(String inputSplitPath)
throws IOException, ClassNotFoundException {
byte[] splitList;
try {
splitList = getZkExt().getData(inputSplitPath, false, null);
} catch (KeeperException e) {
throw new IllegalStateException(
"loadVertices: KeeperException on " + inputSplitPath, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"loadVertices: IllegalStateException on " + inputSplitPath, e);
}
getContext().progress();
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(splitList));
String inputSplitClass = Text.readString(inputStream);
InputSplit inputSplit = (InputSplit)
ReflectionUtils.newInstance(
getConfiguration().getClassByName(inputSplitClass),
getConfiguration());
((Writable) inputSplit).readFields(inputStream);
if (LOG.isInfoEnabled()) {
LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
" from ZooKeeper and got input split '" +
inputSplit.toString() + "'");
}
return inputSplit;
}
/**
* Read vertices from input split. If testing, the user may request a
* maximum number of vertices to be read from an input split.
*
* @param inputSplit Input split to process with vertex reader
* @return List of vertices.
* @throws IOException
* @throws InterruptedException
*/
private VertexEdgeCount readVerticesFromInputSplit(
InputSplit inputSplit) throws IOException, InterruptedException {
VertexInputFormat<I, V, E, M> vertexInputFormat =
BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
VertexReader<I, V, E, M> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, getContext());
vertexReader.initialize(inputSplit, getContext());
long vertexCount = 0;
long edgeCount = 0;
while (vertexReader.nextVertex()) {
BasicVertex<I, V, E, M> readerVertex =
vertexReader.getCurrentVertex();
if (readerVertex.getVertexId() == null) {
throw new IllegalArgumentException(
"loadVertices: Vertex reader returned a vertex " +
"without an id! - " + readerVertex);
}
if (readerVertex.getVertexValue() == null) {
readerVertex.setVertexValue(
BspUtils.<V>createVertexValue(getConfiguration()));
}
PartitionOwner partitionOwner =
workerGraphPartitioner.getPartitionOwner(
readerVertex.getVertexId());
Partition<I, V, E, M> partition =
inputSplitCache.get(partitionOwner);
if (partition == null) {
partition = new Partition<I, V, E, M>(
getConfiguration(),
partitionOwner.getPartitionId());
inputSplitCache.put(partitionOwner, partition);
}
BasicVertex<I, V, E, M> oldVertex =
partition.putVertex(readerVertex);
if (oldVertex != null) {
LOG.warn("readVertices: Replacing vertex " + oldVertex +
" with " + readerVertex);
}
if (partition.getVertices().size() >= maxVerticesPerPartition) {
commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
partition);
partition.getVertices().clear();
}
++vertexCount;
edgeCount += readerVertex.getNumOutEdges();
getContext().progress();
++totalVerticesLoaded;
totalEdgesLoaded += readerVertex.getNumOutEdges();
// Update status every half a million vertices
if ((totalVerticesLoaded % 500000) == 0) {
String status = "readVerticesFromInputSplit: Loaded " +
totalVerticesLoaded + " vertices and " +
totalEdgesLoaded + " edges " +
MemoryUtils.getRuntimeMemoryStats() + " " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep();
if (LOG.isInfoEnabled()) {
LOG.info(status);
}
getContext().setStatus(status);
}
// For sampling, or to limit outlier input splits, the number of
// records per input split can be limited
if ((inputSplitMaxVertices > 0) &&
(vertexCount >= inputSplitMaxVertices)) {
if (LOG.isInfoEnabled()) {
LOG.info("readVerticesFromInputSplit: Leaving the input " +
"split early, reached maximum vertices " +
vertexCount);
}
break;
}
}
vertexReader.close();
return new VertexEdgeCount(vertexCount, edgeCount);
}
@Override
public void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
Iterable<M> messageIterator) {
vertex.putMessages(messageIterator);
}
@Override
public void setup() {
// Unless doing a restart, prepare for computation:
// 1. Start superstep INPUT_SUPERSTEP (no computation)
// 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
// 3. Process input splits until there are no more.
// 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
// 5. Wait for superstep INPUT_SUPERSTEP to complete.
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
setCachedSuperstep(getRestartedSuperstep());
return;
}
JSONObject jobState = getJobState();
if (jobState != null) {
try {
if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
ApplicationState.START_SUPERSTEP) &&
jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
getSuperstep()) {
if (LOG.isInfoEnabled()) {
LOG.info("setup: Restarting from an automated " +
"checkpointed superstep " +
getSuperstep() + ", attempt " +
getApplicationAttempt());
}
setRestartedSuperstep(getSuperstep());
return;
}
} catch (JSONException e) {
throw new RuntimeException(
"setup: Failed to get key-values from " +
jobState.toString(), e);
}
}
// Add the partitions for that this worker owns
Collection<? extends PartitionOwner> masterSetPartitionOwners =
startSuperstep();
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
commService.setup();
// Ensure the InputSplits are ready for processing before processing
while (true) {
Stat inputSplitsReadyStat;
try {
inputSplitsReadyStat =
getZkExt().exists(INPUT_SPLITS_ALL_READY_PATH, true);
} catch (KeeperException e) {
throw new IllegalStateException(
"setup: KeeperException waiting on input splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"setup: InterruptedException waiting on input splits", e);
}
if (inputSplitsReadyStat != null) {
break;
}
getInputSplitsAllReadyEvent().waitForever();
getInputSplitsAllReadyEvent().reset();
}
getContext().progress();
try {
VertexEdgeCount vertexEdgeCount = loadVertices();
if (LOG.isInfoEnabled()) {
LOG.info("setup: Finally loaded a total of " +
vertexEdgeCount);
}
} catch (Exception e) {
LOG.error("setup: loadVertices failed - ", e);
throw new IllegalStateException("setup: loadVertices failed", e);
}
getContext().progress();
// Workers wait for each other to finish, coordinated by master
String workerDonePath =
INPUT_SPLIT_DONE_PATH + "/" + getWorkerInfo().getHostnameId();
try {
getZkExt().createExt(workerDonePath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
} catch (KeeperException e) {
throw new IllegalStateException(
"setup: KeeperException creating worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"setup: InterruptedException creating worker done splits", e);
}
while (true) {
Stat inputSplitsDoneStat;
try {
inputSplitsDoneStat =
getZkExt().exists(INPUT_SPLITS_ALL_DONE_PATH, true);
} catch (KeeperException e) {
throw new IllegalStateException(
"setup: KeeperException waiting on worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"setup: InterruptedException waiting on worker " +
"done splits", e);
}
if (inputSplitsDoneStat != null) {
break;
}
getInputSplitsAllDoneEvent().waitForever();
getInputSplitsAllDoneEvent().reset();
}
// At this point all vertices have been sent to their destinations.
// Move them to the worker, creating creating the empty partitions
movePartitionsToWorker(commService);
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
!getPartitionMap().containsKey(
partitionOwner.getPartitionId())) {
Partition<I, V, E, M> partition =
new Partition<I, V, E, M>(getConfiguration(),
partitionOwner.getPartitionId());
getPartitionMap().put(partitionOwner.getPartitionId(),
partition);
}
}
// Generate the partition stats for the input superstep and process
// if necessary
List<PartitionStats> partitionStatsList =
new ArrayList<PartitionStats>();
for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
PartitionStats partitionStats =
new PartitionStats(partition.getPartitionId(),
partition.getVertices().size(),
0,
partition.getEdgeCount());
partitionStatsList.add(partitionStats);
}
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, workerPartitionMap);
finishSuperstep(partitionStatsList);
}
/**
* Marshal the aggregator values of to a JSONArray that will later be
* aggregated by master. Reset the 'use' of aggregators in the next
* superstep
*
* @param superstep
*/
private JSONArray marshalAggregatorValues(long superstep) {
JSONArray aggregatorArray = new JSONArray();
if ((superstep == INPUT_SUPERSTEP) || (aggregatorInUse.size() == 0)) {
return aggregatorArray;
}
for (String name : aggregatorInUse) {
try {
Aggregator<Writable> aggregator = getAggregatorMap().get(name);
ByteArrayOutputStream outputStream =
new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
aggregator.getAggregatedValue().write(output);
JSONObject aggregatorObj = new JSONObject();
aggregatorObj.put(AGGREGATOR_NAME_KEY, name);
aggregatorObj.put(AGGREGATOR_CLASS_NAME_KEY,
aggregator.getClass().getName());
aggregatorObj.put(
AGGREGATOR_VALUE_KEY,
Base64.encodeBytes(outputStream.toByteArray()));
aggregatorArray.put(aggregatorObj);
LOG.info("marshalAggregatorValues: " +
"Found aggregatorObj " +
aggregatorObj + ", value (" +
aggregator.getAggregatedValue() + ")");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (LOG.isInfoEnabled()) {
LOG.info("marshalAggregatorValues: Finished assembling " +
"aggregator values in JSONArray - " + aggregatorArray);
}
aggregatorInUse.clear();
return aggregatorArray;
}
/**
* Get values of aggregators aggregated by master in previous superstep.
*
* @param superstep Superstep to get the aggregated values from
*/
private void getAggregatorValues(long superstep) {
if (superstep <= (INPUT_SUPERSTEP + 1)) {
return;
}
String mergedAggregatorPath =
getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
JSONArray aggregatorArray = null;
try {
byte[] zkData =
getZkExt().getData(mergedAggregatorPath, false, null);
aggregatorArray = new JSONArray(new String(zkData));
} catch (KeeperException.NoNodeException e) {
LOG.info("getAggregatorValues: no aggregators in " +
mergedAggregatorPath + " on superstep " + superstep);
return;
} catch (Exception e) {
throw new RuntimeException(e);
}
for (int i = 0; i < aggregatorArray.length(); ++i) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("getAggregatorValues: " +
"Getting aggregators from " +
aggregatorArray.getJSONObject(i));
}
String aggregatorName = aggregatorArray.getJSONObject(i).
getString(AGGREGATOR_NAME_KEY);
Aggregator<Writable> aggregator =
getAggregatorMap().get(aggregatorName);
if (aggregator == null) {
continue;
}
Writable aggregatorValue = aggregator.getAggregatedValue();
InputStream input =
new ByteArrayInputStream(
Base64.decode(aggregatorArray.getJSONObject(i).
getString(AGGREGATOR_VALUE_KEY)));
aggregatorValue.readFields(
new DataInputStream(input));
aggregator.setAggregatedValue(aggregatorValue);
if (LOG.isDebugEnabled()) {
LOG.debug("getAggregatorValues: " +
"Got aggregator=" + aggregatorName + " value=" +
aggregatorValue);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (LOG.isInfoEnabled()) {
LOG.info("getAggregatorValues: Finished loading " +
mergedAggregatorPath + " with aggregator values " +
aggregatorArray);
}
}
/**
* Register the health of this worker for a given superstep
*
* @param superstep Superstep to register health on
*/
private void registerHealth(long superstep) {
JSONArray hostnamePort = new JSONArray();
hostnamePort.put(getHostname());
hostnamePort.put(workerInfo.getPort());
String myHealthPath = null;
if (isHealthy()) {
myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
getSuperstep());
}
else {
myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
getSuperstep());
}
myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
try {
myHealthZnode = getZkExt().createExt(
myHealthPath,
WritableUtils.writeToByteArray(workerInfo),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
true);
} catch (KeeperException.NodeExistsException e) {
LOG.warn("registerHealth: myHealthPath already exists (likely " +
"from previous failure): " + myHealthPath +
". Waiting for change in attempts " +
"to re-join the application");
getApplicationAttemptChangedEvent().waitForever();
if (LOG.isInfoEnabled()) {
LOG.info("registerHealth: Got application " +
"attempt changed event, killing self");
}
throw new RuntimeException(
"registerHealth: Trying " +
"to get the new application attempt by killing self", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (LOG.isInfoEnabled()) {
LOG.info("registerHealth: Created my health node for attempt=" +
getApplicationAttempt() + ", superstep=" +
getSuperstep() + " with " + myHealthZnode +
" and workerInfo= " + workerInfo);
}
}
/**
* Do this to help notify the master quicker that this worker has failed.
*/
private void unregisterHealth() {
LOG.error("unregisterHealth: Got failure, unregistering health on " +
myHealthZnode + " on superstep " + getSuperstep());
try {
getZkExt().delete(myHealthZnode, -1);
} catch (InterruptedException e) {
throw new IllegalStateException(
"unregisterHealth: InterruptedException - Couldn't delete " +
myHealthZnode, e);
} catch (KeeperException e) {
throw new IllegalStateException(
"unregisterHealth: KeeperException - Couldn't delete " +
myHealthZnode, e);
}
}
@Override
public void failureCleanup() {
unregisterHealth();
}
@Override
public Collection<? extends PartitionOwner> startSuperstep() {
// Algorithm:
// 1. Communication service will combine message from previous
// superstep
// 2. Register my health for the next superstep.
// 3. Wait until the partition assignment is complete and get it
// 4. Get the aggregator values from the previous superstep
if (getSuperstep() != INPUT_SUPERSTEP) {
commService.prepareSuperstep();
}
registerHealth(getSuperstep());
String partitionAssignmentsNode =
getPartitionAssignmentsPath(getApplicationAttempt(),
getSuperstep());
Collection<? extends PartitionOwner> masterSetPartitionOwners;
try {
while (getZkExt().exists(partitionAssignmentsNode, true) ==
null) {
getPartitionAssignmentsReadyChangedEvent().waitForever();
getPartitionAssignmentsReadyChangedEvent().reset();
}
List<? extends Writable> writableList =
WritableUtils.readListFieldsFromZnode(
getZkExt(),
partitionAssignmentsNode,
false,
null,
workerGraphPartitioner.createPartitionOwner().getClass(),
getConfiguration());
@SuppressWarnings("unchecked")
Collection<? extends PartitionOwner> castedWritableList =
(Collection<? extends PartitionOwner>) writableList;
masterSetPartitionOwners = castedWritableList;
} catch (KeeperException e) {
throw new IllegalStateException(
"startSuperstep: KeeperException getting assignments", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"startSuperstep: InterruptedException getting assignments", e);
}
if (LOG.isInfoEnabled()) {
LOG.info("startSuperstep: Ready for computation on superstep " +
getSuperstep() + " since worker " +
"selection and vertex range assignments are done in " +
partitionAssignmentsNode);
}
if (getSuperstep() != INPUT_SUPERSTEP) {
getAggregatorValues(getSuperstep());
}
getContext().setStatus("startSuperstep: " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
return masterSetPartitionOwners;
}
@Override
public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
// This barrier blocks until success (or the master signals it to
// restart).
//
// Master will coordinate the barriers and aggregate "doneness" of all
// the vertices. Each worker will:
// 1. Flush the unsent messages
// 2. Execute user postSuperstep() if necessary.
// 3. Save aggregator values that are in use.
// 4. Report the statistics (vertices, edges, messages, etc.)
// of this worker
// 5. Let the master know it is finished.
// 6. Wait for the master's global stats, and check if done
long workerSentMessages = 0;
try {
workerSentMessages = commService.flush(getContext());
} catch (IOException e) {
throw new IllegalStateException(
"finishSuperstep: flush failed", e);
}
if (getSuperstep() != INPUT_SUPERSTEP) {
getWorkerContext().postSuperstep();
getContext().progress();
}
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
MemoryUtils.getRuntimeMemoryStats());
}
JSONArray aggregatorValueArray =
marshalAggregatorValues(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, workerPartitionMap);
List<PartitionStats> finalizedPartitionStatsList =
new ArrayList<PartitionStats>(finalizedPartitionStats);
byte [] partitionStatsBytes =
WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
JSONObject workerFinishedInfoObj = new JSONObject();
try {
workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
aggregatorValueArray);
workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
Base64.encodeBytes(partitionStatsBytes));
workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
workerSentMessages);
} catch (JSONException e) {
throw new RuntimeException(e);
}
String finishedWorkerPath =
getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
"/" + getHostnamePartitionId();
try {
getZkExt().createExt(finishedWorkerPath,
workerFinishedInfoObj.toString().getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
} catch (KeeperException.NodeExistsException e) {
LOG.warn("finishSuperstep: finished worker path " +
finishedWorkerPath + " already exists!");
} catch (Exception e) {
throw new RuntimeException(e);
}
getContext().setStatus("finishSuperstep: (waiting for rest " +
"of workers) " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
String superstepFinishedNode =
getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
getSuperstepFinishedEvent().waitForever();
getSuperstepFinishedEvent().reset();
}
} catch (KeeperException e) {
throw new IllegalStateException(
"finishSuperstep: Failed while waiting for master to " +
"signal completion of superstep " + getSuperstep(), e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"finishSuperstep: Failed while waiting for master to " +
"signal completion of superstep " + getSuperstep(), e);
}
GlobalStats globalStats = new GlobalStats();
WritableUtils.readFieldsFromZnode(
getZkExt(), superstepFinishedNode, false, null, globalStats);
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
" with global stats " + globalStats);
}
incrCachedSuperstep();
getContext().setStatus("finishSuperstep: (all workers done) " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
getGraphMapper().getGraphState().
setNumEdges(globalStats.getEdgeCount()).
setNumVertices(globalStats.getVertexCount());
return ((globalStats.getFinishedVertexCount() ==
globalStats.getVertexCount()) &&
(globalStats.getMessageCount() == 0));
}
/**
* Save the vertices using the user-defined VertexOutputFormat from our
* vertexArray based on the split.
* @throws InterruptedException
*/
private void saveVertices() throws IOException, InterruptedException {
if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS)
== null) {
LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
" not specified -- there will be no saved output");
return;
}
VertexOutputFormat<I, V, E> vertexOutputFormat =
BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
VertexWriter<I, V, E> vertexWriter =
vertexOutputFormat.createVertexWriter(getContext());
vertexWriter.initialize(getContext());
for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
vertexWriter.writeVertex(vertex);
}
}
vertexWriter.close(getContext());
}
@Override
public void cleanup() throws IOException, InterruptedException {
commService.closeConnections();
setCachedSuperstep(getSuperstep() - 1);
saveVertices();
// All worker processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String cleanedUpPath = CLEANED_UP_PATH + "/" +
getTaskPartition() + WORKER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(cleanedUpPath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
if (LOG.isInfoEnabled()) {
LOG.info("cleanup: Notifying master its okay to cleanup with " +
finalFinishedPath);
}
} catch (KeeperException.NodeExistsException e) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanup: Couldn't create finished node '" +
cleanedUpPath);
}
} catch (KeeperException e) {
// Cleaning up, it's okay to fail after cleanup is successful
LOG.error("cleanup: Got KeeperException on notifcation " +
"to master about cleanup", e);
} catch (InterruptedException e) {
// Cleaning up, it's okay to fail after cleanup is successful
LOG.error("cleanup: Got InterruptedException on notifcation " +
"to master about cleanup", e);
}
try {
getZkExt().close();
} catch (InterruptedException e) {
// cleanup phase -- just log the error
LOG.error("cleanup: Zookeeper failed to close with " + e);
}
// Preferably would shut down the service only after
// all clients have disconnected (or the exceptions on the
// client side ignored).
commService.close();
}
@Override
public void storeCheckpoint() throws IOException {
getContext().setStatus("storeCheckpoint: Starting checkpoint " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
// Algorithm:
// For each partition, dump vertices and messages
Path metadataFilePath =
new Path(getCheckpointBasePath(getSuperstep()) + "." +
getHostnamePartitionId() +
CHECKPOINT_METADATA_POSTFIX);
Path verticesFilePath =
new Path(getCheckpointBasePath(getSuperstep()) + "." +
getHostnamePartitionId() +
CHECKPOINT_VERTICES_POSTFIX);
Path validFilePath =
new Path(getCheckpointBasePath(getSuperstep()) + "." +
getHostnamePartitionId() +
CHECKPOINT_VALID_POSTFIX);
// Remove these files if they already exist (shouldn't though, unless
// of previous failure of this worker)
if (getFs().delete(validFilePath, false)) {
LOG.warn("storeCheckpoint: Removed valid file " +
validFilePath);
}
if (getFs().delete(metadataFilePath, false)) {
LOG.warn("storeCheckpoint: Removed metadata file " +
metadataFilePath);
}
if (getFs().delete(verticesFilePath, false)) {
LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
}
FSDataOutputStream verticesOutputStream =
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
// Write the metadata for this partition
// Format:
// <index count>
// <index 0 start pos><partition id>
// <index 1 start pos><partition id>
metadataOutput.writeLong(startPos);
metadataOutput.writeInt(partition.getPartitionId());
if (LOG.isDebugEnabled()) {
LOG.debug("storeCheckpoint: Vertex file starting " +
"offset = " + startPos + ", length = " +
(verticesOutputStream.getPos() - startPos) +
", partition = " + partition.toString());
}
}
// Metadata is buffered and written at the end since it's small and
// needs to know how many partitions this worker owns
FSDataOutputStream metadataOutputStream =
getFs().create(metadataFilePath);
metadataOutputStream.writeInt(workerPartitionMap.size());
metadataOutputStream.write(metadataByteStream.toByteArray());
metadataOutputStream.close();
verticesOutputStream.close();
if (LOG.isInfoEnabled()) {
LOG.info("storeCheckpoint: Finished metadata (" +
metadataFilePath + ") and vertices (" + verticesFilePath
+ ").");
}
getFs().createNewFile(validFilePath);
}
@Override
public void loadCheckpoint(long superstep) {
// Algorithm:
// Examine all the partition owners and load the ones
// that match my hostname and id from the master designated checkpoint
// prefixes.
long startPos = 0;
int loadedPartitions = 0;
for (PartitionOwner partitionOwner :
workerGraphPartitioner.getPartitionOwners()) {
if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
String metadataFile =
partitionOwner.getCheckpointFilesPrefix() +
CHECKPOINT_METADATA_POSTFIX;
String partitionsFile =
partitionOwner.getCheckpointFilesPrefix() +
CHECKPOINT_VERTICES_POSTFIX;
try {
int partitionId = -1;
DataInputStream metadataStream =
getFs().open(new Path(metadataFile));
int partitions = metadataStream.readInt();
for (int i = 0; i < partitions; ++i) {
startPos = metadataStream.readLong();
partitionId = metadataStream.readInt();
if (partitionId == partitionOwner.getPartitionId()) {
break;
}
}
if (partitionId != partitionOwner.getPartitionId()) {
throw new IllegalStateException(
"loadCheckpoint: " + partitionOwner +
" not found!");
}
metadataStream.close();
Partition<I, V, E, M> partition =
new Partition<I, V, E, M>(
getConfiguration(),
partitionId);
DataInputStream partitionsStream =
getFs().open(new Path(partitionsFile));
if (partitionsStream.skip(startPos) != startPos) {
throw new IllegalStateException(
"loadCheckpoint: Failed to skip " + startPos +
" on " + partitionsFile);
}
partition.readFields(partitionsStream);
partitionsStream.close();
if (LOG.isInfoEnabled()) {
LOG.info("loadCheckpoint: Loaded partition " +
partition);
}
if (getPartitionMap().put(partitionId, partition) != null) {
throw new IllegalStateException(
"loadCheckpoint: Already has partition owner " +
partitionOwner);
}
++loadedPartitions;
} catch (IOException e) {
throw new RuntimeException(
"loadCheckpoing: Failed to get partition owner " +
partitionOwner, e);
}
}
}
if (LOG.isInfoEnabled()) {
LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
" partitions of out " +
workerGraphPartitioner.getPartitionOwners().size() +
" total.");
}
// Communication service needs to setup the connections prior to
// processing vertices
commService.setup();
}
/**
* Send the worker partitions to their destination workers
*
* @param workerPartitionMap Map of worker info to the partitions stored
* on this worker to be sent
*/
private void sendWorkerPartitions(
Map<WorkerInfo, List<Integer>> workerPartitionMap) {
List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
new ArrayList<Entry<WorkerInfo, List<Integer>>>(
workerPartitionMap.entrySet());
Collections.shuffle(randomEntryList);
for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
randomEntryList) {
for (Integer partitionId : workerPartitionList.getValue()) {
Partition<I, V, E, M> partition =
getPartitionMap().get(partitionId);
if (partition == null) {
throw new IllegalStateException(
"sendWorkerPartitions: Couldn't find partition " +
partitionId + " to send to " +
workerPartitionList.getKey());
}
if (LOG.isInfoEnabled()) {
LOG.info("sendWorkerPartitions: Sending worker " +
workerPartitionList.getKey() + " partition " +
partitionId);
}
getGraphMapper().getGraphState().getWorkerCommunications().
sendPartitionReq(workerPartitionList.getKey(),
partition);
getPartitionMap().remove(partitionId);
}
}
String myPartitionExchangeDonePath =
getPartitionExchangeWorkerPath(
getApplicationAttempt(), getSuperstep(), getWorkerInfo());
try {
getZkExt().createExt(myPartitionExchangeDonePath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
} catch (KeeperException e) {
throw new IllegalStateException(
"sendWorkerPartitions: KeeperException to create " +
myPartitionExchangeDonePath, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"sendWorkerPartitions: InterruptedException to create " +
myPartitionExchangeDonePath, e);
}
if (LOG.isInfoEnabled()) {
LOG.info("sendWorkerPartitions: Done sending all my partitions.");
}
}
@Override
public final void exchangeVertexPartitions(
Collection<? extends PartitionOwner> masterSetPartitionOwners) {
// 1. Fix the addresses of the partition ids if they have changed.
// 2. Send all the partitions to their destination workers in a random
// fashion.
// 3. Notify completion with a ZooKeeper stamp
// 4. Wait for all my dependencies to be done (if any)
// 5. Add the partitions to myself.
PartitionExchange partitionExchange =
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
commService.fixPartitionIdToSocketAddrMap();
Map<WorkerInfo, List<Integer>> workerPartitionMap =
partitionExchange.getSendWorkerPartitionMap();
if (!workerPartitionMap.isEmpty()) {
sendWorkerPartitions(workerPartitionMap);
}
Set<WorkerInfo> myDependencyWorkerSet =
partitionExchange.getMyDependencyWorkerSet();
Set<String> workerIdSet = new HashSet<String>();
for (WorkerInfo workerInfo : myDependencyWorkerSet) {
if (workerIdSet.add(workerInfo.getHostnameId()) != true) {
throw new IllegalStateException(
"exchangeVertexPartitions: Duplicate entry " + workerInfo);
}
}
if (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
if (LOG.isInfoEnabled()) {
LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
"exiting early");
}
return;
}
String vertexExchangePath =
getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
List<String> workerDoneList;
try {
while (true) {
workerDoneList = getZkExt().getChildrenExt(
vertexExchangePath, true, false, false);
workerIdSet.removeAll(workerDoneList);
if (workerIdSet.isEmpty()) {
break;
}
if (LOG.isInfoEnabled()) {
LOG.info("exchangeVertexPartitions: Waiting for workers " +
workerIdSet);
}
getPartitionExchangeChildrenChangedEvent().waitForever();
getPartitionExchangeChildrenChangedEvent().reset();
}
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (LOG.isInfoEnabled()) {
LOG.info("exchangeVertexPartitions: Done with exchange.");
}
// Add the partitions sent earlier
movePartitionsToWorker(commService);
}
/**
* Partitions that are exchanged need to be moved from the communication
* service to the worker.
*
* @param commService Communication service where the partitions are
* temporarily stored.
*/
private void movePartitionsToWorker(
ServerInterface<I, V, E, M> commService) {
Map<Integer, List<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
commService.getInPartitionVertexMap();
synchronized (inPartitionVertexMap) {
for (Entry<Integer, List<BasicVertex<I, V, E, M>>> entry :
inPartitionVertexMap.entrySet()) {
if (getPartitionMap().containsKey(entry.getKey())) {
throw new IllegalStateException(
"moveVerticesToWorker: Already has partition " +
getPartitionMap().get(entry.getKey()) +
", cannot receive vertex list of size " +
entry.getValue().size());
}
Partition<I, V, E, M> tmpPartition =
new Partition<I, V, E, M>(getConfiguration(),
entry.getKey());
for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
if (tmpPartition.putVertex(vertex) != null) {
throw new IllegalStateException(
"moveVerticesToWorker: Vertex " + vertex +
" already exists!");
}
}
if (LOG.isInfoEnabled()) {
LOG.info("moveVerticesToWorker: Adding " +
entry.getValue().size() +
" vertices for partition id " + entry.getKey());
}
getPartitionMap().put(tmpPartition.getPartitionId(),
tmpPartition);
entry.getValue().clear();
}
inPartitionVertexMap.clear();
}
}
final public BspEvent getPartitionExchangeChildrenChangedEvent() {
return partitionExchangeChildrenChanged;
}
@Override
protected boolean processEvent(WatchedEvent event) {
boolean foundEvent = false;
if (event.getPath().startsWith(MASTER_JOB_STATE_PATH) &&
(event.getType() == EventType.NodeChildrenChanged)) {
if (LOG.isInfoEnabled()) {
LOG.info("processEvent: Job state changed, checking " +
"to see if it needs to restart");
}
JSONObject jsonObj = getJobState();
try {
if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
ApplicationState.START_SUPERSTEP) &&
jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
getApplicationAttempt()) {
LOG.fatal("processEvent: Worker will restart " +
"from command - " + jsonObj.toString());
System.exit(-1);
}
} catch (JSONException e) {
throw new RuntimeException(
"processEvent: Couldn't properly get job state from " +
jsonObj.toString());
}
foundEvent = true;
} else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
event.getType() == EventType.NodeChildrenChanged) {
if (LOG.isInfoEnabled()) {
LOG.info("processEvent : partitionExchangeChildrenChanged " +
"(at least one worker is done sending partitions)");
}
partitionExchangeChildrenChanged.signal();
foundEvent = true;
}
return foundEvent;
}
@Override
public WorkerInfo getWorkerInfo() {
return workerInfo;
}
@Override
public Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
return workerPartitionMap;
}
@Override
public Collection<? extends PartitionOwner> getPartitionOwners() {
return workerGraphPartitioner.getPartitionOwners();
}
@Override
public PartitionOwner getVertexPartitionOwner(I vertexIndex) {
return workerGraphPartitioner.getPartitionOwner(vertexIndex);
}
public Partition<I, V, E, M> getPartition(I vertexIndex) {
PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
return workerPartitionMap.get(partitionOwner.getPartitionId());
}
@Override
public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
return workerPartitionMap.get(
partitionOwner.getPartitionId()).getVertex(vertexIndex);
} else {
return null;
}
}
}