blob: bee22f576ff47312ff82af93583bb95d85cb10af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.rexster.io;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_V_ESTIMATE;
import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.List;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.rexster.utils.RexsterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
/**
* Abstract class that users should subclass to use their own Rexster based
* vertex input format. This class was inspired by the Rexster Input format
* available in Faunus authored by Stephen Mallette.
*
* @param <I>
* @param <V>
* @param <E>
*/
@SuppressWarnings("rawtypes")
public abstract class RexsterVertexInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable>
extends VertexInputFormat<I, V, E> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(RexsterVertexInputFormat.class);
/**
* @param conf configuration parameters
*/
public void checkInputSpecs(Configuration conf) {
GiraphConfiguration gconf = new GiraphConfiguration(conf);
if (!gconf.hasEdgeInputFormat()) {
String err = "Rexster Input I/O requires " +
"both Vertex- and EdgeInputFormat.";
LOG.error(err);
throw new RuntimeException(err);
}
String endpoint = GIRAPH_REXSTER_HOSTNAME.get(conf);
if (endpoint == null) {
throw new RuntimeException(GIRAPH_REXSTER_HOSTNAME.getKey() +
" is a mandatory parameter.");
}
}
/**
* Create a vertex reader for a given split. Guaranteed to have been
* configured with setConf() prior to use. The framework will also call
* {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
*
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
*/
public abstract RexsterVertexReader createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException;
@Override
public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
throws IOException, InterruptedException {
return RexsterUtils.getSplits(context,
GIRAPH_REXSTER_V_ESTIMATE.get(getConf()));
}
/**
* Abstract class to be implemented by the user based on their specific
* vertex input. Easiest to ignore the key value separator and only use
* key instead.
*/
protected abstract class RexsterVertexReader extends VertexReader<I, V, E> {
/** Input stream from the HTTP connection to the REST endpoint */
private BufferedReader rexsterBufferedStream;
/** JSON parser/tokenizer object */
private JSONTokener tokener;
/** start index of the Rexster paging */
private long splitStart;
/** end index of the Rexster paging */
private long splitEnd;
/** index to access the iterated vertices */
private long itemsIterated = 0;
/** current vertex */
private Vertex<I, V, E> vertex;
/** first call to the nextVertex fuction */
private boolean isFirstVertex;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
RexsterInputSplit rexsterInputSplit = (RexsterInputSplit) inputSplit;
splitEnd = rexsterInputSplit.getEnd();
splitStart = rexsterInputSplit.getStart();
rexsterBufferedStream = RexsterUtils.Vertex.openInputStream(getConf(),
splitStart, splitEnd);
tokener = RexsterUtils.parseJSONEnvelope(rexsterBufferedStream);
isFirstVertex = true;
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
try {
/* if the tokener was not set, no object is in fact available */
if (this.tokener == null) {
return false;
}
char c;
if (isFirstVertex) {
isFirstVertex = false;
c = this.tokener.nextClean();
if (c == RexsterUtils.END_ARRAY) {
return false;
}
tokener.back();
}
JSONObject obj = new JSONObject(this.tokener);
this.vertex = parseVertex(obj);
c = this.tokener.nextClean();
if (c == RexsterUtils.ARRAY_SEPARATOR) {
itemsIterated += 1;
return true;
} else if (c == RexsterUtils.END_ARRAY) {
return false;
} else {
LOG.error(String.format("Expected a '%c' at the end of the array",
RexsterUtils.END_ARRAY));
throw new InterruptedException(
String.format("Expected a '%c' at the end of the array",
RexsterUtils.END_ARRAY));
}
} catch (JSONException e) {
throw new InterruptedException(e.toString());
}
}
@Override
public void close() throws IOException {
this.rexsterBufferedStream.close();
}
@Override
public float getProgress() throws IOException, InterruptedException {
float vestimated = GIRAPH_REXSTER_V_ESTIMATE.get(getConf());
if (this.splitStart == this.splitEnd) {
return 0.0f;
} else {
// assuming you got the estimate right this progress should be
// pretty close;
return Math.min(1.0f, this.itemsIterated / (float) vestimated);
}
}
@Override
public Vertex<I, V, E> getCurrentVertex()
throws IOException, InterruptedException {
return this.vertex;
}
/**
* Parser for a single vertex JSON object
*
* @param jsonVertex vertex represented as JSON object
* @return The vertex object represented by the JSON object
*/
protected abstract Vertex<I, V, E> parseVertex(JSONObject jsonVertex)
throws JSONException;
}
}