blob: 0254498a254fa85ed2562c5f1753b55c52135a46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.io.gora;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.avro.util.Utf8;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.gora.GoraEdgeOutputFormat;
import org.apache.giraph.io.gora.generated.GEdge;
import org.apache.giraph.io.gora.generated.GEdgeResult;
import org.apache.gora.persistency.Persistent;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Implementation of a specific writer for a generated data bean.
*/
public class GoraTestEdgeOutputFormat
extends GoraEdgeOutputFormat<LongWritable, DoubleWritable,
FloatWritable> {
/**
* Default constructor
*/
public GoraTestEdgeOutputFormat() {
}
@Override
public GoraEdgeWriter createEdgeWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
return new GoraGEdgeEdgeWriter();
}
/**
* Gora edge writer.
*/
protected class GoraGEdgeEdgeWriter
extends GoraEdgeWriter {
@Override
protected Persistent getGoraEdge(LongWritable srcId,
DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
GEdgeResult tmpGEdge = new GEdgeResult();
Utf8 keyLabel = new Utf8(srcId.toString() + "-" +
edge.getTargetVertexId().toString());
tmpGEdge.setEdgeId(keyLabel);
tmpGEdge.setEdgeWeight(edge.getValue().get());
tmpGEdge.setVertexInId(new Utf8(srcId.toString()));
tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString()));
tmpGEdge.setLabel(keyLabel);
getLogger().debug("GoraObject created: " + tmpGEdge.toString());
return tmpGEdge;
}
@Override
protected Object getGoraKey(LongWritable srcId,
DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
String goraKey = String.valueOf(
edge.getTargetVertexId().get() + edge.getValue().get());
return goraKey;
}
@Override
public void writeEdge(LongWritable srcId, DoubleWritable srcValue,
Edge<LongWritable, FloatWritable> edge)
throws IOException, InterruptedException {
super.writeEdge(srcId, srcValue, edge);
Object goraKey = getGoraKey(srcId, srcValue, edge);
String keyLabel = String.valueOf(srcId) + "-" +
String.valueOf(edge.getTargetVertexId());
float weight = Float.valueOf(srcId.toString()) +
Float.valueOf(edge.getTargetVertexId().toString());
// Asserting
Assert.assertEquals(createEdge(keyLabel, String.valueOf(srcId),
String.valueOf(edge.getTargetVertexId()),keyLabel, weight),
getDataStore().get(goraKey));
}
/**
* Creates an edge using an id and a set of edges.
* @param id Vertex id.
* @param vertexInId Vertex source Id.
* @param vertexOutId Vertex destination Id.
* @param edgeLabel Edge label.
* @param edgeWeight Edge wight.
* @return GEdge created.
*/
private GEdge createEdge(String id, String vertexInId,
String vertexOutId, String edgeLabel, float edgeWeight) {
GEdge newEdge = new GEdge();
newEdge.setEdgeId(new Utf8(id));
newEdge.setVertexInId(new Utf8(vertexInId));
newEdge.setVertexOutId(new Utf8(vertexOutId));
newEdge.setLabel(new Utf8(edgeLabel));
newEdge.setEdgeWeight(edgeWeight);
return newEdge;
}
}
}