blob: 96efc3e3e6a2f244f317b424527802306dfd0bfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
/**
* Basic interface for writing a BSP application for computation.
*
* @param <I> vertex id
* @param <V> vertex data
* @param <E> edge data
* @param <M> message data
*/
@SuppressWarnings("rawtypes")
public abstract class BasicVertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements AggregatorUsage, Iterable<I>, Writable, Configurable {
/** Global graph state **/
private GraphState<I,V,E,M> graphState;
/** Configuration */
private Configuration conf;
/** If true, do not do anymore computation on this vertex. */
boolean halt = false;
/**
* This method must be called after instantiation of a vertex with BspUtils
* unless deserialization from readFields() is called.
*
* @param vertexId Will be the vertex id
* @param vertexValue Will be the vertex value
* @param edges A map of destination edge ids to edge values (can be null)
* @param messages Initial messages for this vertex (can be null)
*/
public abstract void initialize(
I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages);
/**
* Must be defined by user to do computation on a single Vertex.
*
* @param msgIterator Iterator to the messages that were sent to this
* vertex in the previous superstep
* @throws IOException
*/
public abstract void compute(Iterator<M> msgIterator) throws IOException;
/**
* Retrieves the current superstep.
*
* @return Current superstep
*/
public long getSuperstep() {
return getGraphState().getSuperstep();
}
/**
* Get the vertex id
*/
public abstract I getVertexId();
/**
* Get the vertex value (data stored with vertex)
*
* @return Vertex value
*/
public abstract V getVertexValue();
/**
* Set the vertex data (immediately visible in the computation)
*
* @param vertexValue Vertex data to be set
*/
public abstract void setVertexValue(V vertexValue);
/**
* Get the total (all workers) number of vertices that
* existed in the previous superstep.
*
* @return Total number of vertices (-1 if first superstep)
*/
public long getNumVertices() {
return getGraphState().getNumVertices();
}
/**
* Get the total (all workers) number of edges that
* existed in the previous superstep.
*
* @return Total number of edges (-1 if first superstep)
*/
public long getNumEdges() {
return getGraphState().getNumEdges();
}
/**
* Get a read-only view of the out-edges of this vertex.
*
* @return the out edges (sort order determined by subclass implementation).
*/
@Override
public abstract Iterator<I> iterator();
/**
* Get the edge value associated with a target vertex id.
*
* @param targetVertexId Target vertex id to check
*
* @return the value of the edge to targetVertexId (or null if there
* is no edge to it)
*/
public abstract E getEdgeValue(I targetVertexId);
/**
* Does an edge with the target vertex id exist?
*
* @param targetVertexId Target vertex id to check
* @return true if there is an edge to the target
*/
public abstract boolean hasEdge(I targetVertexId);
/**
* Get the number of outgoing edges on this vertex.
*
* @return the total number of outbound edges from this vertex
*/
public abstract int getNumOutEdges();
/**
* Send a message to a vertex id. The message should not be mutated after
* this method returns or else undefined results could occur.
*
* @param id Vertex id to send the message to
* @param msg Message data to send. Note that after the message is sent,
* the user should not modify the object.
*/
public void sendMsg(I id, M msg) {
if (msg == null) {
throw new IllegalArgumentException(
"sendMsg: Cannot send null message to " + id);
}
getGraphState().getWorkerCommunications().
sendMessageReq(id, msg);
}
/**
* Send a message to all edges.
*/
public abstract void sendMsgToAllEdges(M msg);
/**
* After this is called, the compute() code will no longer be called for
* this vertex unless a message is sent to it. Then the compute() code
* will be called once again until this function is called. The
* application finishes only when all vertices vote to halt.
*/
public void voteToHalt() {
halt = true;
}
/**
* Is this vertex done?
*/
public boolean isHalted() {
return halt;
}
/**
* Get the list of incoming messages from the previous superstep. Same as
* the message iterator passed to compute().
*/
public abstract Iterable<M> getMessages();
/**
* Copy the messages this vertex should process in the current superstep
*
* @param messages the messages sent to this vertex in the previous superstep
*/
abstract void putMessages(Iterable<M> messages);
/**
* Release unnecessary resources (will be called after vertex returns from
* {@link #compute()})
*/
abstract void releaseResources();
/**
* Get the graph state for all workers.
*
* @return Graph state for all workers
*/
GraphState<I, V, E, M> getGraphState() {
return graphState;
}
/**
* Set the graph state for all workers
*
* @param graphState Graph state for all workers
*/
void setGraphState(GraphState<I, V, E, M> graphState) {
this.graphState = graphState;
}
/**
* Get the mapper context
*
* @return Mapper context
*/
public Mapper.Context getContext() {
return getGraphState().getContext();
}
/**
* Get the worker context
*
* @return WorkerContext context
*/
public WorkerContext getWorkerContext() {
return getGraphState().getGraphMapper().getWorkerContext();
}
@Override
public final <A extends Writable> Aggregator<A> registerAggregator(
String name,
Class<? extends Aggregator<A>> aggregatorClass)
throws InstantiationException, IllegalAccessException {
return getGraphState().getGraphMapper().getAggregatorUsage().
registerAggregator(name, aggregatorClass);
}
@Override
public final Aggregator<? extends Writable> getAggregator(String name) {
return getGraphState().getGraphMapper().getAggregatorUsage().
getAggregator(name);
}
@Override
public final boolean useAggregator(String name) {
return getGraphState().getGraphMapper().getAggregatorUsage().
useAggregator(name);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
}