blob: 5111a87fb6a184ede166163dd63eaeeadb345594 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.lib;
import com.google.common.collect.Maps;
import net.iharder.Base64;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Map;
/**
* Simple way to represent the structure of the graph with a JSON object.
* The actual vertex ids, values, edges are stored by the
* Writable serialized bytes that are Byte64 encoded.
* Works with {@link JsonBase64VertexOutputFormat}
*
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
public class JsonBase64VertexInputFormat<
I extends WritableComparable, V extends Writable, E extends Writable,
M extends Writable>
extends TextVertexInputFormat<I, V, E, M> implements
JsonBase64VertexFormat {
/**
* Simple reader that supports {@link JsonBase64VertexInputFormat}
*
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
*/
private static class JsonBase64VertexReader<
I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> extends TextVertexReader<I, V, E, M> {
/**
* Only constructor. Requires the LineRecordReader
*
* @param lineRecordReader Line record reader to read from
*/
public JsonBase64VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return getRecordReader().nextKeyValue();
}
@Override
public BasicVertex<I, V, E, M> getCurrentVertex()
throws IOException, InterruptedException {
Configuration conf = getContext().getConfiguration();
BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
Text line = getRecordReader().getCurrentValue();
JSONObject vertexObject;
try {
vertexObject = new JSONObject(line.toString());
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get the vertex", e);
}
DataInput input = null;
byte[] decodedWritable = null;
I vertexId = null;
try {
decodedWritable = Base64.decode(
vertexObject.getString(VERTEX_ID_KEY));
input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
vertexId = BspUtils.<I>createVertexIndex(conf);
vertexId.readFields(input);
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get vertex id", e);
}
V vertexValue = null;
try {
decodedWritable = Base64.decode(
vertexObject.getString(VERTEX_VALUE_KEY));
input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
vertexValue = BspUtils.<V>createVertexValue(conf);
vertexValue.readFields(input);
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get vertex value", e);
}
JSONArray edgeArray = null;
try {
edgeArray = vertexObject.getJSONArray(EDGE_ARRAY_KEY);
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get edge array", e);
}
Map<I, E> edgeMap = Maps.newHashMap();
for (int i = 0; i < edgeArray.length(); ++i) {
try {
decodedWritable =
Base64.decode(edgeArray.getString(i));
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get edge value", e);
}
input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
Edge<I, E> edge = new Edge<I, E>();
edge.setConf(getContext().getConfiguration());
edge.readFields(input);
edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
}
vertex.initialize(vertexId, vertexValue, edgeMap, null);
return vertex;
}
}
@Override
public VertexReader<I, V, E, M> createVertexReader(
InputSplit split,
TaskAttemptContext context) throws IOException {
return new JsonBase64VertexReader<I, V, E, M>(textInputFormat.createRecordReader(split,
context));
}
}