blob: c095d9052238d20df3f67c8d477dab794cfe547d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.jython;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.GraphType;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Iterator;
/**
* Base class for writing computations in Jython.
*
* Note that this class DOES NOT implement
* {@link org.apache.giraph.graph.Computation}.
* This is because we want to support passing in pure Jython types,
* and implementing the {@link org.apache.giraph.graph.Computation}
* requires passing in {@link Writable}s.
* Calling such methods from Jython would throw errors. So, instead,
* we have recreated the methods with the same name here. In each method
* we check if the type is a pure Jython value, and if so wrap it in
* the necessary
* {@link org.apache.giraph.jython.wrappers.JythonWritableWrapper}.
*
* This class works together with {@link JythonGiraphComputation} which takes
* care of the {@link org.apache.giraph.graph.Computation}
* Giraph infrastructure side of things.
*/
public abstract class JythonComputation extends
DefaultImmutableClassesGiraphConfigurable {
/** The computation to callback to */
private JythonGiraphComputation giraphCompute;
public void setGiraphCompute(JythonGiraphComputation giraphCompute) {
this.giraphCompute = giraphCompute;
}
/**
* User's computation function
*
* @param vertex the Vertex to compute on
* @param messages iterable of messages
*/
public abstract void compute(Object vertex, Iterable messages);
/**
* Prepare for computation. This method is executed exactly once prior to
* {@link #compute(Object, Iterable)} being called for any of the vertices
* in the partition.
*/
public void preSuperstep() { }
/**
* Finish computation. This method is executed exactly once after computation
* for all vertices in the partition is complete.
*/
public void postSuperstep() { }
/**
* Retrieves the current superstep.
*
* @return Current superstep
*/
public long getSuperstep() {
return giraphCompute.getSuperstep();
}
/**
* Get the total (all workers) number of vertices that
* existed in the previous superstep.
*
* @return Total number of vertices (-1 if first superstep)
*/
public long getTotalNumVertices() {
return giraphCompute.getTotalNumVertices();
}
/**
* Get the total (all workers) number of edges that
* existed in the previous superstep.
*
* @return Total number of edges (-1 if first superstep)
*/
public long getTotalNumEdges() {
return giraphCompute.getTotalNumEdges();
}
/**
* Send a message to a vertex id.
*
* @param id Vertex id to send the message to
* @param message Message data to send
*/
public void sendMessage(Object id, Object message) {
WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
GraphType.OUTGOING_MESSAGE_VALUE);
giraphCompute.sendMessage(wrappedId, wrappedMessage);
}
/**
* Send a message to all edges.
*
* @param vertex Vertex whose edges to send the message to.
* @param message Message sent to all edges.
*/
public void sendMessageToAllEdges(Vertex vertex, Object message) {
Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
GraphType.OUTGOING_MESSAGE_VALUE);
giraphCompute.sendMessageToAllEdges(vertex, wrappedMessage);
}
/**
* Send a message to multiple target vertex ids in the iterator.
*
* @param vertexIdIterator An iterator to multiple target vertex ids.
* @param message Message sent to all targets in the iterator.
*/
public void sendMessageToMultipleEdges(Iterator vertexIdIterator,
Object message) {
Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
GraphType.OUTGOING_MESSAGE_VALUE);
giraphCompute.sendMessageToMultipleEdges(vertexIdIterator, wrappedMessage);
}
/**
* Sends a request to create a vertex that will be available during the
* next superstep.
*
* @param id Vertex id
* @param vertexValue Vertex value
* @param edges Initial edges
*/
public void addVertexRequest(Object id, Object vertexValue,
OutEdges edges) throws IOException {
WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
Writable wrappedValue = giraphCompute.wrapIfNecessary(vertexValue,
GraphType.VERTEX_VALUE);
giraphCompute.addVertexRequest(wrappedId, wrappedValue, edges);
}
/**
* Sends a request to create a vertex that will be available during the
* next superstep.
*
* @param id Vertex id
* @param vertexValue Vertex value
*/
public void addVertexRequest(Object id, Object vertexValue)
throws IOException {
WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
Writable wrappedVertexValue = giraphCompute.wrapIfNecessary(vertexValue,
GraphType.VERTEX_VALUE);
giraphCompute.addVertexRequest(wrappedId, wrappedVertexValue);
}
/**
* Request to remove a vertex from the graph
* (applied just prior to the next superstep).
*
* @param id Id of the vertex to be removed.
*/
public void removeVertexRequest(Object id) throws IOException {
WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
giraphCompute.removeVertexRequest(wrappedId);
}
/**
* Request to add an edge of a vertex in the graph
* (processed just prior to the next superstep)
*
* @param sourceVertexId Source vertex id of edge
* @param edge Edge to add
*/
public void addEdgeRequest(Object sourceVertexId, Edge edge)
throws IOException {
WritableComparable wrappedSourceId =
giraphCompute.wrapIdIfNecessary(sourceVertexId);
giraphCompute.addEdgeRequest(wrappedSourceId, edge);
}
/**
* Request to remove all edges from a given source vertex to a given target
* vertex (processed just prior to the next superstep).
*
* @param sourceVertexId Source vertex id
* @param targetVertexId Target vertex id
*/
public void removeEdgesRequest(Object sourceVertexId, Object targetVertexId)
throws IOException {
WritableComparable wrappedSourceVertexId =
giraphCompute.wrapIdIfNecessary(sourceVertexId);
WritableComparable wrappedTargetVertexId =
giraphCompute.wrapIdIfNecessary(targetVertexId);
giraphCompute.removeEdgesRequest(wrappedSourceVertexId,
wrappedTargetVertexId);
}
/**
* Get the mapper context
*
* @return Mapper context
*/
public Mapper.Context getContext() {
return giraphCompute.getContext();
}
/**
* Get the worker context
*
* @param <W> WorkerContext class
* @return WorkerContext context
*/
@SuppressWarnings("unchecked")
public <W extends WorkerContext> W getWorkerContext() {
return (W) giraphCompute.getWorkerContext();
}
}