blob: 48544b0064639fd3d2bc49ed535324083cc91e22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.io.formats;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
/**
* This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
* As with {@link PseudoRandomVertexInputFormat}, the user specifies the
* number of vertices and the number of edges per vertex.
*/
public class PseudoRandomEdgeInputFormat
extends EdgeInputFormat<LongWritable, DoubleWritable> {
@Override
public final List<InputSplit> getSplits(final JobContext context,
final int minSplitCountHint)
throws IOException, InterruptedException {
return PseudoRandomUtils.getSplits(minSplitCountHint);
}
@Override
public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new PseudoRandomEdgeReader();
}
/**
* {@link EdgeReader} that generates pseudo-random edges.
*/
private static class PseudoRandomEdgeReader
extends EdgeReader<LongWritable, DoubleWritable> {
/** Logger. */
private static final Logger LOG =
Logger.getLogger(PseudoRandomEdgeReader.class);
/** Starting vertex id. */
private long startingVertexId = -1;
/** Vertices read so far. */
private long verticesRead = 0;
/** Total vertices to read (on this split alone). */
private long totalSplitVertices = -1;
/** Current vertex id. */
private LongWritable currentVertexId = new LongWritable(-1);
/** Edges read for the current vertex. */
private int currentOutEdgesRead = 0;
/** Target vertices of edges for current vertex. */
private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
/** Random number generator for the current vertex (for consistency
* across runs on different numbers of workers). */
private Random random = new Random();
/** Aggregate vertices (all input splits). */
private long aggregateVertices = -1;
/** Edges per vertex. */
private int edgesPerVertex = -1;
/** BspInputSplit (used only for index). */
private BspInputSplit bspInputSplit;
/** Helper for generating pseudo-random local edges. */
private PseudoRandomLocalEdgesHelper localEdgesHelper;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
aggregateVertices = getConf().getLong(
PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
if (aggregateVertices <= 0) {
throw new IllegalArgumentException(
PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
}
if (inputSplit instanceof BspInputSplit) {
bspInputSplit = (BspInputSplit) inputSplit;
long extraVertices =
aggregateVertices % bspInputSplit.getNumSplits();
totalSplitVertices =
aggregateVertices / bspInputSplit.getNumSplits();
if (bspInputSplit.getSplitIndex() < extraVertices) {
++totalSplitVertices;
}
startingVertexId = (bspInputSplit.getSplitIndex() *
(aggregateVertices / bspInputSplit.getNumSplits())) +
Math.min(bspInputSplit.getSplitIndex(),
extraVertices);
} else {
throw new IllegalArgumentException(
"initialize: Got " + inputSplit.getClass() +
" instead of " + BspInputSplit.class);
}
edgesPerVertex = getConf().getInt(
PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
if (edgesPerVertex <= 0) {
throw new IllegalArgumentException(
PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
}
float minLocalEdgesRatio = getConf().getFloat(
PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
minLocalEdgesRatio, getConf());
}
@Override
public boolean nextEdge() throws IOException, InterruptedException {
return totalSplitVertices > verticesRead + 1 ||
(totalSplitVertices == verticesRead + 1 &&
edgesPerVertex > currentOutEdgesRead);
}
@Override
public LongWritable getCurrentSourceId() throws IOException,
InterruptedException {
if (currentOutEdgesRead == edgesPerVertex) {
++verticesRead;
currentVertexId = new LongWritable(-1);
}
if (currentVertexId.get() == -1) {
currentVertexId.set(startingVertexId + verticesRead);
currentOutEdgesRead = 0;
// Seed on the vertex id to keep the vertex data the same when
// on different number of workers, but other parameters are the
// same.
random.setSeed(currentVertexId.get());
currentVertexDestVertices.clear();
}
return currentVertexId;
}
@Override
public Edge<LongWritable, DoubleWritable> getCurrentEdge()
throws IOException, InterruptedException {
LongWritable destVertexId = new LongWritable();
do {
destVertexId.set(localEdgesHelper.generateDestVertex(
currentVertexId.get(), random));
} while (currentVertexDestVertices.contains(destVertexId));
++currentOutEdgesRead;
currentVertexDestVertices.add(destVertexId);
if (LOG.isTraceEnabled()) {
LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
"" + destVertexId + ")");
}
return EdgeFactory.create(
destVertexId,
new DoubleWritable(random.nextDouble()));
}
@Override
public void close() throws IOException { }
@Override
public float getProgress() throws IOException, InterruptedException {
return (verticesRead * edgesPerVertex + currentOutEdgesRead) *
100.0f / (totalSplitVertices * edgesPerVertex);
}
}
}