blob: 61624e5846f382dc046ae7acad00637e4ef47278 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MultiRandomAccessOutEdges;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.edge.MutableEdgesIterable;
import org.apache.giraph.edge.MutableEdgesWrapper;
import org.apache.giraph.edge.MutableOutEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.StrictRandomAccessOutEdges;
import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Iterator;
/**
* Basic abstract class for writing a BSP application for computation.
* Giraph will store Vertex value and edges, hence all user data should
* be stored as part of the vertex value.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
* @param <M> Message data
*/
public abstract class Vertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
implements WorkerAggregatorUsage {
/** Vertex id. */
private I id;
/** Vertex value. */
private V value;
/** Outgoing edges. */
private OutEdges<I, E> edges;
/** If true, do not do anymore computation on this vertex. */
private boolean halt;
/** Global graph state **/
private GraphState<I, V, E, M> graphState;
/**
* Initialize id, value, and edges.
* This method (or the alternative form initialize(id, value)) must be called
* after instantiation, unless readFields() is called.
*
* @param id Vertex id
* @param value Vertex value
* @param edges Iterable of edges
*/
public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
this.id = id;
this.value = value;
setEdges(edges);
}
/**
* Initialize id and value. Vertex edges will be empty.
* This method (or the alternative form initialize(id, value, edges))
* must be called after instantiation, unless readFields() is called.
*
* @param id Vertex id
* @param value Vertex value
*/
public void initialize(I id, V value) {
this.id = id;
this.value = value;
this.edges = getConf().createAndInitializeOutEdges(0);
}
/**
* Set the outgoing edges for this vertex.
*
* @param edges Iterable of edges
*/
public void setEdges(Iterable<Edge<I, E>> edges) {
// If the iterable is actually an instance of OutEdges,
// we simply take the reference.
// Otherwise, we initialize a new OutEdges.
if (edges instanceof OutEdges) {
this.edges = (OutEdges<I, E>) edges;
} else {
this.edges = getConf().createAndInitializeOutEdges(edges);
}
}
/**
* Must be defined by user to do computation on a single Vertex.
*
* @param messages Messages that were sent to this vertex in the previous
* superstep. Each message is only guaranteed to have
* a life expectancy as long as next() is not called.
* @throws IOException
*/
public abstract void compute(Iterable<M> messages) throws IOException;
/**
* Retrieves the current superstep.
*
* @return Current superstep
*/
public long getSuperstep() {
return graphState.getSuperstep();
}
/**
* Get the vertex id.
*
* @return My vertex id.
*/
public I getId() {
return id;
}
/**
* Get the vertex value (data stored with vertex)
*
* @return Vertex value
*/
public V getValue() {
return value;
}
/**
* Set the vertex data (immediately visible in the computation)
*
* @param value Vertex data to be set
*/
public void setValue(V value) {
this.value = value;
}
/**
* Get the total (all workers) number of vertices that
* existed in the previous superstep.
*
* @return Total number of vertices (-1 if first superstep)
*/
public long getTotalNumVertices() {
return graphState.getTotalNumVertices();
}
/**
* Get the total (all workers) number of edges that
* existed in the previous superstep.
*
* @return Total number of edges (-1 if first superstep)
*/
public long getTotalNumEdges() {
return graphState.getTotalNumEdges();
}
/**
* Get a read-only view of the out-edges of this vertex.
* Note: edge objects returned by this iterable may be invalidated as soon
* as the next element is requested. Thus, keeping a reference to an edge
* almost always leads to undesired behavior.
*
* @return the out edges (sort order determined by subclass implementation).
*/
public Iterable<Edge<I, E>> getEdges() {
return edges;
}
/**
* Get an iterable of out-edges that can be modified in-place.
* This can mean changing the current edge value or removing the current edge
* (by using the iterator version).
* Note: if
*
* @return An iterable of mutable out-edges
*/
public Iterable<MutableEdge<I, E>> getMutableEdges() {
// If the OutEdges implementation has a specialized mutable iterator,
// we use that; otherwise, we build a new data structure as we iterate
// over the current edges.
if (edges instanceof MutableOutEdges) {
return new Iterable<MutableEdge<I, E>>() {
@Override
public Iterator<MutableEdge<I, E>> iterator() {
return ((MutableOutEdges<I, E>) edges).mutableIterator();
}
};
} else {
return new MutableEdgesIterable<I, E>(this);
}
}
/**
* If a {@link MutableEdgesWrapper} was used to provide a mutable iterator,
* copy any remaining edges to the new {@link org.apache.giraph.edge.OutEdges}
* data structure and keep a direct reference to it (thus discarding the
* wrapper).
* Called by the Giraph infrastructure after computation.
*/
public void unwrapMutableEdges() {
if (edges instanceof MutableEdgesWrapper) {
edges = ((MutableEdgesWrapper<I, E>) edges).unwrap();
}
}
/**
* Get the number of outgoing edges on this vertex.
*
* @return the total number of outbound edges from this vertex
*/
public int getNumEdges() {
return edges.size();
}
/**
* Return the value of the first edge with the given target vertex id,
* or null if there is no such edge.
* Note: edge value objects returned by this method may be invalidated by
* the next call. Thus, keeping a reference to an edge value almost always
* leads to undesired behavior.
*
* @param targetVertexId Target vertex id
* @return Edge value (or null if missing)
*/
public E getEdgeValue(I targetVertexId) {
// If the OutEdges implementation has a specialized random-access
// method, we use that; otherwise, we scan the edges.
if (edges instanceof StrictRandomAccessOutEdges) {
return ((StrictRandomAccessOutEdges<I, E>) edges)
.getEdgeValue(targetVertexId);
} else {
for (Edge<I, E> edge : edges) {
if (edge.getTargetVertexId().equals(targetVertexId)) {
return edge.getValue();
}
}
return null;
}
}
/**
* If an edge to the target vertex exists, set it to the given edge value.
* This only makes sense with strict graphs.
*
* @param targetVertexId Target vertex id
* @param edgeValue Edge value
*/
public void setEdgeValue(I targetVertexId, E edgeValue) {
// If the OutEdges implementation has a specialized random-access
// method, we use that; otherwise, we scan the edges.
if (edges instanceof StrictRandomAccessOutEdges) {
((StrictRandomAccessOutEdges<I, E>) edges).setEdgeValue(
targetVertexId, edgeValue);
} else {
for (MutableEdge<I, E> edge : getMutableEdges()) {
if (edge.getTargetVertexId().equals(targetVertexId)) {
edge.setValue(edgeValue);
}
}
}
}
/**
* Get an iterable over the values of all edges with the given target
* vertex id. This only makes sense for multigraphs (i.e. graphs with
* parallel edges).
* Note: edge value objects returned by this method may be invalidated as
* soon as the next element is requested. Thus, keeping a reference to an
* edge value almost always leads to undesired behavior.
*
* @param targetVertexId Target vertex id
* @return Iterable of edge values
*/
public Iterable<E> getAllEdgeValues(final I targetVertexId) {
// If the OutEdges implementation has a specialized random-access
// method, we use that; otherwise, we scan the edges.
if (edges instanceof MultiRandomAccessOutEdges) {
return ((MultiRandomAccessOutEdges<I, E>) edges)
.getAllEdgeValues(targetVertexId);
} else {
return new Iterable<E>() {
@Override
public Iterator<E> iterator() {
return new UnmodifiableIterator<E>() {
/** Iterator over all edges. */
private Iterator<Edge<I, E>> edgeIterator = edges.iterator();
/** Last matching edge found. */
private Edge<I, E> currentEdge;
@Override
public boolean hasNext() {
while (edgeIterator.hasNext()) {
currentEdge = edgeIterator.next();
if (currentEdge.getTargetVertexId().equals(targetVertexId)) {
return true;
}
}
return false;
}
@Override
public E next() {
return currentEdge.getValue();
}
};
}
};
}
}
/**
* Send a message to a vertex id. The message should not be mutated after
* this method returns or else undefined results could occur.
*
* @param id Vertex id to send the message to
* @param message Message data to send. Note that after the message is sent,
* the user should not modify the object.
*/
public void sendMessage(I id, M message) {
if (graphState.getWorkerClientRequestProcessor().
sendMessageRequest(id, message)) {
graphState.getGraphTaskManager().notifySentMessages();
}
}
/**
* Send a message to all edges.
*
* @param message Message sent to all edges.
*/
public void sendMessageToAllEdges(M message) {
for (Edge<I, E> edge : getEdges()) {
sendMessage(edge.getTargetVertexId(), message);
}
}
/**
* After this is called, the compute() code will no longer be called for
* this vertex unless a message is sent to it. Then the compute() code
* will be called once again until this function is called. The
* application finishes only when all vertices vote to halt.
*/
public void voteToHalt() {
halt = true;
}
/**
* Re-activate vertex if halted.
*/
public void wakeUp() {
halt = false;
}
/**
* Is this vertex done?
*
* @return True if halted, false otherwise.
*/
public boolean isHalted() {
return halt;
}
/**
* Add an edge for this vertex (happens immediately)
*
* @param edge Edge to add
*/
public void addEdge(Edge<I, E> edge) {
edges.add(edge);
}
/**
* Removes all edges pointing to the given vertex id.
*
* @param targetVertexId the target vertex id
*/
public void removeEdges(I targetVertexId) {
edges.remove(targetVertexId);
}
/**
* Sends a request to create a vertex that will be available during the
* next superstep.
*
* @param id Vertex id
* @param value Vertex value
* @param edges Initial edges
*/
public void addVertexRequest(I id, V value, OutEdges<I, E> edges)
throws IOException {
Vertex<I, V, E, M> vertex = getConf().createVertex();
vertex.initialize(id, value, edges);
graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex);
}
/**
* Sends a request to create a vertex that will be available during the
* next superstep.
*
* @param id Vertex id
* @param value Vertex value
*/
public void addVertexRequest(I id, V value) throws IOException {
addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
}
/**
* Request to remove a vertex from the graph
* (applied just prior to the next superstep).
*
* @param vertexId Id of the vertex to be removed.
*/
public void removeVertexRequest(I vertexId) throws IOException {
graphState.getWorkerClientRequestProcessor().
removeVertexRequest(vertexId);
}
/**
* Request to add an edge of a vertex in the graph
* (processed just prior to the next superstep)
*
* @param sourceVertexId Source vertex id of edge
* @param edge Edge to add
*/
public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
throws IOException {
graphState.getWorkerClientRequestProcessor().
addEdgeRequest(sourceVertexId, edge);
}
/**
* Request to remove all edges from a given source vertex to a given target
* vertex (processed just prior to the next superstep).
*
* @param sourceVertexId Source vertex id
* @param targetVertexId Target vertex id
*/
public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
throws IOException {
graphState.getWorkerClientRequestProcessor().
removeEdgesRequest(sourceVertexId, targetVertexId);
}
/**
* Set the graph state for all workers
*
* @param graphState Graph state for all workers
*/
public void setGraphState(GraphState<I, V, E, M> graphState) {
this.graphState = graphState;
}
/**
* Get the mapper context
*
* @return Mapper context
*/
public Mapper.Context getContext() {
return graphState.getContext();
}
/**
* Get the partition context
*
* @return Partition context
*/
public PartitionContext getPartitionContext() {
return graphState.getPartitionContext();
}
/**
* Get the worker context
*
* @return WorkerContext context
*/
public WorkerContext getWorkerContext() {
return graphState.getGraphTaskManager().getWorkerContext();
}
@Override
public <A extends Writable> void aggregate(String name, A value) {
graphState.getWorkerAggregatorUsage().aggregate(name, value);
}
@Override
public <A extends Writable> A getAggregatedValue(String name) {
return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
}
@Override
public String toString() {
return "Vertex(id=" + getId() + ",value=" + getValue() +
",#edges=" + getNumEdges() + ")";
}
}