blob: b21a4e225d3d3f32b1976bbc925869445137b10c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.rexster.io;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_E_ESTIMATE;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.List;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.rexster.utils.RexsterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
/**
* Abstract class that users should subclass to use their own Rexster based
* vertex input format. This class was inspired by the Rexster Input format
* available in Faunus authored by Stephen Mallette.
* @param <I> Vertex id
* @param <E> Edge data
*/
@SuppressWarnings("rawtypes")
public abstract class RexsterEdgeInputFormat<I extends WritableComparable,
E extends Writable> extends EdgeInputFormat<I, E> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(RexsterEdgeInputFormat.class);
/**
* @param conf configuration parameters
*/
public void checkInputSpecs(Configuration conf) {
GiraphConfiguration gconf = new GiraphConfiguration(conf);
String msg = "Rexster InputFormat usage requires both Edge and Vertex " +
"InputFormat's.";
/* check for Vertex InputFormat since both are required by Rexster */
if (!gconf.hasVertexInputFormat()) {
LOG.error(msg);
throw new RuntimeException(msg);
}
String endpoint = GIRAPH_REXSTER_HOSTNAME.get(conf);
if (endpoint == null) {
throw new RuntimeException(GIRAPH_REXSTER_HOSTNAME.getKey() +
" is a mandatory parameter.");
}
}
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
return RexsterUtils.getSplits(context,
GIRAPH_REXSTER_E_ESTIMATE.get(getConf()));
}
@Override
public abstract RexsterEdgeReader createEdgeReader(InputSplit split,
TaskAttemptContext context) throws IOException;
/**
* Abstract class to be implemented by the user based on their specific
* vertex input. Easiest to ignore the key value separator and only use
* key instead.
*/
protected abstract class RexsterEdgeReader extends EdgeReader<I, E> {
/** Input stream from the HTTP connection to the REST endpoint */
private BufferedReader rexsterBufferedStream;
/** JSON parser/tokenizer object */
private JSONTokener tokener;
/** start index of the Rexster paging */
private long splitStart;
/** end index of the Rexster paging */
private long splitEnd;
/** number of iterated items */
private long itemsIterated = 0;
/** current edge obtained from Rexster */
private Edge<I, E> edge;
/** first call to the nextEdge fuction */
private boolean isFirstEdge;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
RexsterInputSplit rexsterInputSplit = (RexsterInputSplit) inputSplit;
splitEnd = rexsterInputSplit.getEnd();
splitStart = rexsterInputSplit.getStart();
rexsterBufferedStream =
RexsterUtils.Edge.openInputStream(getConf(), splitStart, splitEnd);
tokener = RexsterUtils.parseJSONEnvelope(rexsterBufferedStream);
isFirstEdge = true;
}
@Override
public void close() throws IOException {
rexsterBufferedStream.close();
}
@Override
public float getProgress() throws IOException, InterruptedException {
final float estimated = GIRAPH_REXSTER_E_ESTIMATE.get(getConf());
if (this.splitStart == this.splitEnd) {
return 0.0f;
} else {
/* assuming you got the estimate right this progress should be
pretty close; */
return Math.min(1.0f, this.itemsIterated / (float) estimated);
}
}
@Override
public Edge<I, E> getCurrentEdge()
throws IOException, InterruptedException {
return edge;
}
@Override
public boolean nextEdge() throws IOException, InterruptedException {
try {
/* if the tokener was not set, no objects are in fact available */
if (this.tokener == null) {
return false;
}
char c;
if (isFirstEdge) {
c = this.tokener.nextClean();
isFirstEdge = false;
if (c == RexsterUtils.END_ARRAY) {
return false;
}
tokener.back();
}
JSONObject obj = new JSONObject(this.tokener);
edge = parseEdge(obj);
LOG.info(edge);
c = tokener.nextClean();
if (c == RexsterUtils.ARRAY_SEPARATOR) {
itemsIterated += 1;
return true;
} else if (c == RexsterUtils.END_ARRAY) {
return false;
} else {
LOG.error(String.format("Expected a '%c' at the end of the array",
RexsterUtils.END_ARRAY));
throw new InterruptedException();
}
} catch (JSONException e) {
throw new InterruptedException(e.toString());
}
}
/**
* Parser for a single edge JSON object
*
* @param jsonEdge edge represented as JSON object
* @return The edge object associated with the JSON object
*/
protected abstract Edge<I, E> parseEdge(JSONObject jsonEdge)
throws JSONException;
@Override
public abstract I getCurrentSourceId()
throws IOException, InterruptedException;
}
}