blob: ca4e861f7da64c1675cea8008a14bae8a424f40c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.rexster.io;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_E_TXSIZE;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_VLABEL;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_BACKOFF_DELAY;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_BACKOFF_RETRY;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.EdgeWriter;
import org.apache.giraph.rexster.utils.RexsterUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.json.JSONObject;
/**
* Abstract class that users should subclass to use their own Rexster based
* edge output format.
*
* @param <I>
* @param <V>
* @param <E>
*/
@SuppressWarnings("rawtypes")
public class RexsterEdgeOutputFormat<I extends WritableComparable,
V extends Writable, E extends Writable>
extends EdgeOutputFormat<I, V, E> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(RexsterEdgeOutputFormat.class);
@Override
public RexsterEdgeWriter
createEdgeWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new RexsterEdgeWriter();
}
@Override
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
GiraphConfiguration gconf =
new GiraphConfiguration(context.getConfiguration());
String msg = "Rexster OutputFormat usage requires both Edge and " +
"Vertex OutputFormat's.";
if (!gconf.hasVertexOutputFormat()) {
LOG.error(msg);
throw new InterruptedException(msg);
}
String endpoint = GIRAPH_REXSTER_HOSTNAME.get(gconf);
if (endpoint == null) {
throw new InterruptedException(GIRAPH_REXSTER_HOSTNAME.getKey() +
" is a mandatory parameter.");
}
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new NullOutputCommitter();
}
/**
* Empty output commiter for hadoop.
*/
private static class NullOutputCommitter extends OutputCommitter {
@Override
public void abortTask(TaskAttemptContext taskContext) { }
@Override
public void cleanupJob(JobContext jobContext) { }
@Override
public void commitTask(TaskAttemptContext taskContext) { }
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
@Override
public void setupJob(JobContext jobContext) { }
@Override
public void setupTask(TaskAttemptContext taskContext) { }
}
/**
* Abstract class to be implemented by the user based on their specific
* vertex/edges output. Easiest to ignore the key value separator and only
* use key instead.
*/
protected class RexsterEdgeWriter extends EdgeWriter<I, V, E> {
/** array key that points to the edges and vertices */
private static final String JSON_ARRAY_KEY = "tx";
/** Connection to the HTTP REST endpoint */
private HttpURLConnection rexsterConn;
/** Output stream from the HTTP connection to the REST endpoint */
private BufferedWriter rexsterBufferedStream;
/** attribute used to keep the state of the element array status */
private boolean isFirstElement = true;
/** number of vertices before starting a new connection */
private int txsize;
/** number of vertexes of vertices sent */
private int txcounter = 0;
/** label of the vertex id field */
private String vlabel;
/** Back-off time delay in milliseconds */
private int backoffDelay = 0;
/** Back-off number of attempts */
private int backoffRetry = 0;
@Override
public void initialize(TaskAttemptContext context) throws IOException,
InterruptedException {
txsize = GIRAPH_REXSTER_OUTPUT_E_TXSIZE.get(getConf());
vlabel = GIRAPH_REXSTER_VLABEL.get(getConf());
backoffDelay = GIRAPH_REXSTER_BACKOFF_DELAY.get(getConf());
backoffRetry = GIRAPH_REXSTER_BACKOFF_RETRY.get(getConf());
startConnection();
}
@Override
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
stopConnection();
}
@Override
public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
throws IOException, InterruptedException {
if (txcounter == txsize) {
txcounter = 0;
isFirstElement = true;
stopConnection();
startConnection();
}
try {
JSONObject jsonEdge;
String suffix;
/* extract the JSON object of the vertex */
jsonEdge = getEdge(srcId, srcValue, edge);
/* determine the suffix to add the object into the JSON array */
if (isFirstElement) {
isFirstElement = false;
suffix = "";
} else {
suffix = ",";
}
rexsterBufferedStream.write(suffix + jsonEdge);
txcounter += 1;
} catch (JSONException e) {
throw new InterruptedException("Error writing the edge: " +
e.getMessage());
}
}
/**
* Start a new connection with the Rexster REST endpoint.
*/
private void startConnection() throws IOException, InterruptedException {
rexsterConn = RexsterUtils.Edge.openOutputConnection(getConf());
rexsterBufferedStream = new BufferedWriter(
new OutputStreamWriter(rexsterConn.getOutputStream(),
Charset.forName("UTF-8")));
/* open the JSON container: is an object containing an array of
elements */
rexsterBufferedStream.write("{ ");
rexsterBufferedStream.write("\"vlabel\" : \"" + vlabel + "\",");
rexsterBufferedStream.write("\"delay\" : \"" + backoffDelay + "\",");
rexsterBufferedStream.write("\"retry\" : \"" + backoffRetry + "\",");
rexsterBufferedStream.write("\"" + JSON_ARRAY_KEY + "\"");
rexsterBufferedStream.write(" : [ ");
}
/**
* Stop a new connection with the Rexster REST endpoint. By default the
* JDK manages keep-alive so no particular code is sent in place for this
* aim.
*/
private void stopConnection() throws IOException, InterruptedException {
/* close the JSON container */
rexsterBufferedStream.write(" ] }");
rexsterBufferedStream.flush();
rexsterBufferedStream.close();
/* check the response and in case of error signal the unsuccessful state
via exception */
RexsterUtils.Edge.handleResponse(rexsterConn);
}
/**
* Each edge needs to be transformed into a JSON object to be sent to the
* batch interface of Rexster.
*
* @param srcId source vertex ID of the edge
* @param srcValue source vertex value of the edge
* @param edge edge to be transformed in JSON
* @return JSON representation of the edge
*/
protected JSONObject getEdge(I srcId, V srcValue, Edge<I, E> edge)
throws JSONException {
String outId = srcId.toString();
String inId = edge.getTargetVertexId().toString();
String value = edge.getValue().toString();
JSONObject jsonEdge = new JSONObject();
jsonEdge.accumulate("_outV", outId);
jsonEdge.accumulate("_inV", inId);
jsonEdge.accumulate("value", value);
return jsonEdge;
}
}
}