blob: c2a82d8084870cbb7132ac51a02851fb19210952 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.api.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.JobStateUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
* *compute* method.
*
* @param <I>
* Vertex identifier type
* @param <V>
* Vertex value type
* @param <E>
* Edge value type
* @param <M>
* Message value type
*/
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements Writable {
/** task context, only used in scanners */
public static TaskAttemptContext taskContext;
/** vertex context */
private VertexContext context;
/** Vertex id */
private I vertexId = null;
/** Vertex value */
private V vertexValue = null;
/** Map of destination vertices and their edge values */
private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
/** If true, do not do anymore computation on this vertex. */
boolean halt = false;
/** List of incoming messages from the previous superstep */
private final List<M> msgList = new ArrayList<M>();
/** a delegate for hyracks stuff */
private final VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
/** this vertex is updated or not */
private boolean updated = false;
/** has outgoing messages */
private boolean hasMessage = false;
/** whether the vertex has spilled to HDFS */
private boolean spilled = false;
/** the spilled path */
private String pathStr;
/** file system handle */
private FileSystem dfs;
/** created new vertex */
private boolean createdNewLiveVertex = false;
/** terminate the partition */
private boolean terminatePartition = false;
/**
* use object pool for re-using objects
*/
private final List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
private final List<M> msgPool = new ArrayList<M>();
private final List<V> valuePool = new ArrayList<V>();
private int usedEdge = 0;
private int usedMessage = 0;
private int usedValue = 0;
/**
* The key method that users need to implement to process
* incoming messages in each superstep.
* 1. In a superstep, this method can be called multiple times in a continuous manner for a single
* vertex, each of which is to process a batch of messages. (Note that
* this only happens for the case when the mssages for a single vertex
* exceed one frame.)
* 2. In each superstep, before any invocation of this method for a vertex,
* open() is called; after all the invocations of this method for the vertex,
* close is called.
* 3. In each partition, the vertex Java object is reused
* for all the vertice to be processed in the same partition. (The model
* is the same as the key-value objects in hadoop map tasks.)
*
* @param msgIterator
* an iterator of incoming messages
*/
public abstract void compute(Iterator<M> msgIterator) throws Exception;
/**
* Add an edge for the vertex.
*
* @param targetVertexId
* @param edgeValue
* @return successful or not
*/
public final boolean addEdge(I targetVertexId, E edgeValue) {
Edge<I, E> edge = this.allocateEdge();
edge.setDestVertexId(targetVertexId);
edge.setEdgeValue(edgeValue);
destEdgeList.add(edge);
updated = true;
return true;
}
/**
* Initialize a new vertex
*
* @param vertexId
* @param vertexValue
* @param edges
* @param messages
*/
public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
if (vertexId != null) {
setVertexId(vertexId);
}
if (vertexValue != null) {
setVertexValue(vertexValue);
}
destEdgeList.clear();
if (edges != null && !edges.isEmpty()) {
for (Map.Entry<I, E> entry : edges.entrySet()) {
destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
}
}
if (messages != null && !messages.isEmpty()) {
msgList.addAll(messages);
}
}
/**
* reset a vertex object: clear its internal states
*/
public void reset() {
usedEdge = 0;
usedMessage = 0;
usedValue = 0;
updated = false;
spilled = false;
}
/**
* Set the vertex id
*
* @param vertexId
*/
public final void setVertexId(I vertexId) {
this.vertexId = vertexId;
delegate.setVertexId(vertexId);
}
/**
* Get the vertex id
*
* @return vertex id
*/
public final I getVertexId() {
return vertexId;
}
/**
* Get the vertex value
*
* @return the vertex value
*/
public final V getVertexValue() {
return vertexValue;
}
/**
* Set the vertex value
*
* @param vertexValue
*/
public final void setVertexValue(V vertexValue) {
this.vertexValue = vertexValue;
this.updated = true;
}
/***
* Send a message to a specific vertex
*
* @param id
* the receiver vertex id
* @param msg
* the message
*/
public final void sendMsg(I id, M msg) {
if (msg == null) {
throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
}
delegate.sendMsg(id, msg);
this.hasMessage = true;
}
/**
* Send a message to all direct outgoing neighbors
*
* @param msg
* the message
*/
public final void sendMsgToAllEdges(M msg) {
if (msg == null) {
throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
}
for (Edge<I, E> edge : destEdgeList) {
sendMsg(edge.getDestVertexId(), msg);
}
}
/**
* Vote to halt. Once all vertex vote to halt and no more messages, a
* Pregelix job will terminate.
* The state of the current vertex value is saved.
*/
public final void voteToHalt() {
halt = true;
updated = true;
}
/**
* Vote to halt. Once all vertex vote to halt and no more messages, a
* Pregelix job will terminate.
*
* @param update
* whether or not to save the vertex value
*/
public final void voteToHalt(boolean update) {
halt = true;
updated = update;
}
/**
* Activate a halted vertex such that it is alive again.
* The state of the current vertex value is saved.
*/
public final void activate() {
halt = false;
updated = true;
}
/**
* Activate a halted vertex such that it is alive again.
*
* @param update
* whether or not to save the vertex value
*/
public final void activate(boolean update) {
halt = false;
updated = update;
}
/**
* @return the vertex is halted (true) or not (false)
*/
public final boolean isHalted() {
return halt;
}
@Override
final public void readFields(DataInput in) throws IOException {
reset();
if (vertexId == null) {
vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
}
//read the boolean state
byte booleanState = in.readByte();
halt = (booleanState & 1) == 1 ? true : false;
spilled = (booleanState & 2) == 2 ? true : false;
if (!spilled) {
//read the vertex data
readVertexData(in);
} else {
//read file hosted vertex
if (dfs == null) {
dfs = FileSystem.get(getContext().getConfiguration());
}
pathStr = in.readUTF();
Path path = new Path(pathStr);
FSDataInputStream dataInput = dfs.open(path);
readVertexData(dataInput);
dataInput.close();
}
}
/**
* Read B-tree stored vertex
*
* @param in
* @throws IOException
*/
private void readVertexData(DataInput in) throws IOException {
vertexId.readFields(in);
delegate.setVertexId(vertexId);
boolean hasVertexValue = in.readBoolean();
if (hasVertexValue) {
vertexValue = allocateValue();
vertexValue.readFields(in);
delegate.setVertex(this);
}
destEdgeList.clear();
long edgeMapSize = WritableUtils.readVLong(in);
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = allocateEdge();
edge.setConf(getContext().getConfiguration());
edge.readFields(in);
addEdge(edge);
}
msgList.clear();
long msgListSize = WritableUtils.readVLong(in);
for (long i = 0; i < msgListSize; ++i) {
M msg = allocateMessage();
msg.readFields(in);
msgList.add(msg);
}
updated = false;
hasMessage = false;
createdNewLiveVertex = false;
}
@Override
public void write(DataOutput out) throws IOException {
//write boolean states
int haltBit = halt ? 1 : 0;
int spilledBit = spilled ? 2 : 0;
byte booleanStates = (byte) (haltBit | spilledBit);
out.writeByte(booleanStates);
if (!spilled) {
// write B-tree stored vertex data
writeVertexData(out);
} else {
//write file-hosted vertex
if (dfs == null) {
dfs = FileSystem.get(getContext().getConfiguration());
}
Path path = new Path(pathStr);
if (dfs.exists(path)) {
dfs.delete(path, true);
}
FSDataOutputStream dataOutput = dfs.create(path, true);
writeVertexData(dataOutput);
dataOutput.flush();
dataOutput.close();
// write the btree content
pathStr = path.toUri().toString();
out.writeUTF(pathStr);
}
}
/**
* @param out
* @throws IOException
*/
private void writeVertexData(DataOutput out) throws IOException {
vertexId.write(out);
out.writeBoolean(vertexValue != null);
if (vertexValue != null) {
vertexValue.write(out);
}
WritableUtils.writeVLong(out, destEdgeList.size());
for (Edge<I, E> edge : destEdgeList) {
edge.write(out);
}
WritableUtils.writeVLong(out, msgList.size());
for (M msg : msgList) {
msg.write(out);
}
}
/**
* Get the list of incoming messages
*
* @return the list of messages
*/
public List<M> getMsgList() {
return msgList;
}
/**
* Get outgoing edge list
*
* @return a list of outgoing edges
*/
public List<Edge<I, E>> getEdges() {
return this.destEdgeList;
}
@Override
public String toString() {
return getStringRepresetation();
}
@SuppressWarnings("unchecked")
public String getStringRepresetation() {
Collections.sort(destEdgeList);
StringBuffer edgeBuffer = new StringBuffer();
edgeBuffer.append("(");
for (Edge<I, E> edge : destEdgeList) {
edgeBuffer.append(edge.getDestVertexId()).append(",");
}
edgeBuffer.append(")");
return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
}
/**
* Get the number of outgoing edges
*
* @return the number of outging edges
*/
public int getNumOutEdges() {
return destEdgeList.size();
}
/**
* Pregelix internal use only
*
* @param writers
*/
public void setOutputWriters(List<IFrameWriter> writers) {
delegate.setOutputWriters(writers);
}
/**
* Pregelix internal use only
*
* @param writers
*/
public void setOutputAppenders(List<FrameTupleAppender> appenders) {
delegate.setOutputAppenders(appenders);
}
/**
* Pregelix internal use only
*
* @param writers
*/
public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
delegate.setOutputTupleBuilders(tbs);
}
/**
* Pregelix internal use only
*
* @param writers
*/
public void finishCompute() throws IOException {
delegate.finishCompute();
}
/**
* Pregelix internal use only
*/
public boolean hasUpdate() {
return this.updated;
}
/**
* Pregelix internal use only
*/
public boolean hasMessage() {
return this.hasMessage;
}
/**
* Pregelix internal use only
*/
public boolean createdNewLiveVertex() {
return this.createdNewLiveVertex;
}
/**
* sort the edges
*/
@SuppressWarnings("unchecked")
public void sortEdges() {
updated = true;
Collections.sort(destEdgeList);
}
/**
* set the HDFS spilled bit and path
*
* @param spilledHDFS
*/
public void setSpilled(String path) {
this.spilled = true;
this.pathStr = path;
}
/**
* set the spilled bit to false and the path to null
*/
public void setUnSpilled() {
this.spilled = false;
this.pathStr = null;
}
/**
* Allocate a new edge from the edge pool
*/
private Edge<I, E> allocateEdge() {
Edge<I, E> edge;
if (usedEdge < edgePool.size()) {
edge = edgePool.get(usedEdge);
usedEdge++;
} else {
edge = new Edge<I, E>();
edgePool.add(edge);
usedEdge++;
}
return edge;
}
/**
* Allocate a new message from the message pool
*/
private M allocateMessage() {
M message;
if (usedMessage < msgPool.size()) {
message = msgPool.get(usedMessage);
usedMessage++;
} else {
message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
msgPool.add(message);
usedMessage++;
}
return message;
}
/**
* Add an outgoing edge into the vertex
*
* @param edge
* the edge to be added
* @return true if the edge list changed as a result of this call
*/
public boolean addEdge(Edge<I, E> edge) {
edge.setConf(getContext().getConfiguration());
updated = true;
return destEdgeList.add(edge);
}
/**
* remove an outgoing edge in the graph
*
* @param edge
* the edge to be removed
* @return true if the edge is in the edge list of the vertex
*/
public boolean removeEdge(Edge<I, E> edge) {
updated = true;
return destEdgeList.remove(edge);
}
/**
* Add a new vertex into the graph
*
* @param vertexId
* the vertex id
* @param vertex
* the vertex
*/
public final void addVertex(I vertexId, Vertex vertex) {
createdNewLiveVertex |= !vertex.isHalted();
delegate.addVertex(vertexId, vertex);
}
/**
* Delete a vertex from id
*
* @param vertexId
* the vertex id
*/
public final void deleteVertex(I vertexId) {
delegate.deleteVertex(vertexId);
}
/**
* Allocate a vertex value from the object pool
*
* @return a vertex value instance
*/
private V allocateValue() {
V value;
if (usedValue < valuePool.size()) {
value = valuePool.get(usedValue);
usedValue++;
} else {
value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
valuePool.add(value);
usedValue++;
}
return value;
}
/**
* Get the current global superstep number
*
* @return the current superstep number
*/
public final long getSuperstep() {
return context.getSuperstep();
}
/**
* Get the number of vertexes in the graph
*
* @return the number of vertexes in the graph
*/
public final long getNumVertices() {
return context.getNumVertices();
}
/**
* Get the number of edges from this graph
*
* @return the number of edges in the graph
*/
public final long getNumEdges() {
return context.getNumVertices();
}
/**
* Pregelix internal use only
*/
public final TaskAttemptContext getContext() {
if (context != null) {
return context.getContext();
} else {
return taskContext;
}
}
@Override
public int hashCode() {
return vertexId.hashCode();
}
@Override
public boolean equals(Object object) {
Vertex vertex = (Vertex) object;
return vertexId.equals(vertex.getVertexId());
}
/**
* called *once* per partition at the start of each iteration,
* before calls to open() or compute()
* Users can override this method to configure the pregelix job
* and vertex state.
*/
public void configure(Configuration conf) {
}
/**
* called *once* per partition at the end of each iteration,
* before calls to compute() or close()
* Users can override this method to configure the pregelix job
* and vertex state.
*/
public void endSuperstep(Configuration conf) {
}
/**
* called immediately before invocations of compute() on a vertex
* Users can override this method to initiate the state for a vertex
* before the compute() invocations
*/
public void open() {
}
/**
* called immediately after all the invocations of compute() on a vertex
* Users can override this method to initiate the state for a vertex
* before the compute() invocations
*/
public void close() {
}
/**
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
*/
protected final void terminatePartition() {
voteToHalt();
terminatePartition = true;
}
/**
* Terminate the Pregelix job.
* This will take effect only when the current iteration completed.
*
* @throws Exception
*/
protected void terminateJob() throws Exception {
Configuration conf = getContext().getConfiguration();
JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
}
/***
* @return true if the partition is terminated; false otherwise
*/
public boolean isPartitionTerminated() {
return terminatePartition;
}
/**
* Set the vertex context
*
* @param ctx
*/
public void setVertexContext(VertexContext ctx) {
this.context = ctx;
}
/***
* Get the vertex context
*
* @return the vertex context
*/
public VertexContext getVertexContext() {
return this.context;
}
}