blob: c9390bacd2c40c2945d157f42dae2e032d4a305f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.io.formats;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReusableEdge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.List;
import java.util.Random;
/**
* VertexInputFormat for large scale testing,
* like {@link PseudoRandomVertexInputFormat}, but for the unweighted graphs
* where vertex ids are integers.
*/
public class PseudoRandomIntNullVertexInputFormat extends
VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint) throws IOException, InterruptedException {
return PseudoRandomUtils.getSplits(minSplitCountHint);
}
@Override
public VertexReader<IntWritable, FloatWritable, NullWritable>
createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new PseudoRandomVertexReader();
}
/**
* Used by {@link PseudoRandomIntNullVertexInputFormat} to read
* pseudo-randomly generated data.
*/
private static class PseudoRandomVertexReader extends
VertexReader<IntWritable, FloatWritable, NullWritable> {
/** Starting vertex id. */
private int startingVertexId = -1;
/** Vertices read so far. */
private int verticesRead = 0;
/** Total vertices to read (on this split alone). */
private int totalSplitVertices = -1;
/** Edges per vertex. */
private int edgesPerVertex = -1;
/** Reusable int set */
private final IntSet destVertices = new IntOpenHashSet();
/** Resuable edge object */
private ReusableEdge<IntWritable, NullWritable> reusableEdge = null;
/** Helper for generating pseudo-random local edges. */
private PseudoRandomIntNullLocalEdgesHelper localEdgesHelper;
/** Random */
private Random rand;
/** Default constructor for reflection. */
public PseudoRandomVertexReader() {
}
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
int aggregateVertices = getConf().getInt(
PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
int extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
if (bspInputSplit.getSplitIndex() < extraVertices) {
++totalSplitVertices;
}
startingVertexId = bspInputSplit.getSplitIndex() *
(aggregateVertices / bspInputSplit.getNumSplits()) +
Math.min(bspInputSplit.getSplitIndex(), extraVertices);
edgesPerVertex = getConf().getInt(
PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
rand = new Random(bspInputSplit.getSplitIndex());
if (getConf().reuseEdgeObjects()) {
reusableEdge = getConf().createReusableEdge();
}
localEdgesHelper = new PseudoRandomIntNullLocalEdgesHelper(
aggregateVertices, getConf());
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return totalSplitVertices > verticesRead;
}
@Override
public Vertex<IntWritable, FloatWritable, NullWritable, ?>
getCurrentVertex() throws IOException, InterruptedException {
Vertex<IntWritable, FloatWritable, NullWritable, ?> vertex =
getConf().createVertex();
int vertexId = startingVertexId + verticesRead;
OutEdges<IntWritable, NullWritable> edges =
getConf().createOutEdges();
edges.initialize(edgesPerVertex);
destVertices.clear();
for (int i = 0; i < edgesPerVertex; ++i) {
int destVertexId;
do {
destVertexId = localEdgesHelper.generateDestVertex(vertexId, rand);
} while (!destVertices.add(destVertexId));
Edge<IntWritable, NullWritable> edge =
(reusableEdge == null) ? getConf().createEdge() : reusableEdge;
edge.getTargetVertexId().set(destVertexId);
edges.add(edge);
}
vertex.initialize(
new IntWritable(vertexId), new FloatWritable(1.0f), edges);
++verticesRead;
return vertex;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException {
return verticesRead * 100.0f / totalSplitVertices;
}
}
}