| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.benchmark; |
| |
| import com.google.common.collect.Maps; |
| import org.apache.giraph.bsp.BspInputSplit; |
| import org.apache.giraph.graph.BasicVertex; |
| import org.apache.giraph.graph.BspUtils; |
| import org.apache.giraph.graph.VertexInputFormat; |
| import org.apache.giraph.graph.VertexReader; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.DoubleWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.log4j.Logger; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| |
| /** |
| * This VertexInputFormat is meant for large scale testing. It allows the user |
| * to create an input data source that a variable number of aggregate vertices |
| * and edges per vertex that is repeatable for the exact same parameter |
| * (pseudo-random). |
| */ |
| public class PseudoRandomVertexInputFormat<M extends Writable> extends |
| VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> { |
| /** Set the number of aggregate vertices */ |
| public static final String AGGREGATE_VERTICES = |
| "pseudoRandomVertexReader.aggregateVertices"; |
| /** Set the number of edges per vertex (pseudo-random destination) */ |
| public static final String EDGES_PER_VERTEX = |
| "pseudoRandomVertexReader.edgesPerVertex"; |
| |
| @Override |
| public List<InputSplit> getSplits(JobContext context, int numWorkers) |
| throws IOException, InterruptedException { |
| // This is meaningless, the PseudoRandomVertexReader will generate |
| // all the test data |
| List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); |
| for (int i = 0; i < numWorkers; ++i) { |
| inputSplitList.add(new BspInputSplit(i, numWorkers)); |
| } |
| return inputSplitList; |
| } |
| |
| @Override |
| public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> |
| createVertexReader(InputSplit split, TaskAttemptContext context) |
| throws IOException { |
| return new PseudoRandomVertexReader<M>(); |
| } |
| |
| /** |
| * Used by {@link PseudoRandomVertexInputFormat} to read |
| * pseudo-randomly generated data |
| */ |
| private static class PseudoRandomVertexReader<M extends Writable> implements |
| VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> { |
| /** Logger */ |
| private static final Logger LOG = |
| Logger.getLogger(PseudoRandomVertexReader.class); |
| /** Starting vertex id */ |
| private long startingVertexId = -1; |
| /** Vertices read so far */ |
| private long verticesRead = 0; |
| /** Total vertices to read (on this split alone) */ |
| private long totalSplitVertices = -1; |
| /** Aggregate vertices (all input splits) */ |
| private long aggregateVertices = -1; |
| /** Edges per vertex */ |
| private long edgesPerVertex = -1; |
| /** BspInputSplit (used only for index) */ |
| private BspInputSplit bspInputSplit; |
| |
| private Configuration configuration; |
| |
| public PseudoRandomVertexReader() { |
| } |
| |
| @Override |
| public void initialize(InputSplit inputSplit, |
| TaskAttemptContext context) throws IOException { |
| configuration = context.getConfiguration(); |
| aggregateVertices = |
| configuration.getLong( |
| PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0); |
| if (aggregateVertices <= 0) { |
| throw new IllegalArgumentException( |
| "initialize: " + |
| PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0"); |
| } |
| if (inputSplit instanceof BspInputSplit) { |
| bspInputSplit = (BspInputSplit) inputSplit; |
| long extraVertices = |
| aggregateVertices % bspInputSplit.getNumSplits(); |
| totalSplitVertices = |
| aggregateVertices / bspInputSplit.getNumSplits(); |
| if (bspInputSplit.getSplitIndex() < extraVertices) { |
| ++totalSplitVertices; |
| } |
| startingVertexId = (bspInputSplit.getSplitIndex() * |
| (aggregateVertices / bspInputSplit.getNumSplits())) + |
| Math.min(bspInputSplit.getSplitIndex(), |
| extraVertices); |
| } else { |
| throw new IllegalArgumentException( |
| "initialize: Got " + inputSplit.getClass() + |
| " instead of " + BspInputSplit.class); |
| } |
| edgesPerVertex = |
| configuration.getLong( |
| PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0); |
| if (edgesPerVertex <= 0) { |
| throw new IllegalArgumentException( |
| "initialize: " + |
| PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0"); |
| } |
| } |
| |
| @Override |
| public boolean nextVertex() throws IOException, InterruptedException { |
| return totalSplitVertices > verticesRead; |
| } |
| |
| @Override |
| public BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> getCurrentVertex() |
| throws IOException, InterruptedException { |
| BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> |
| vertex = BspUtils.createVertex(configuration); |
| long vertexId = startingVertexId + verticesRead; |
| // Seed on the vertex id to keep the vertex data the same when |
| // on different number of workers, but other parameters are the |
| // same. |
| Random rand = new Random(vertexId); |
| DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble()); |
| Map<LongWritable, DoubleWritable> edges = Maps.newHashMap(); |
| for (long i = 0; i < edgesPerVertex; ++i) { |
| LongWritable destVertexId = null; |
| do { |
| destVertexId = |
| new LongWritable(Math.abs(rand.nextLong()) % |
| aggregateVertices); |
| } while (edges.containsKey(destVertexId)); |
| edges.put(destVertexId, new DoubleWritable(rand.nextDouble())); |
| } |
| vertex.initialize( |
| new LongWritable(vertexId), vertexValue, edges, null); |
| ++verticesRead; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("next: Return vertexId=" + |
| vertex.getVertexId().get() + |
| ", vertexValue=" + vertex.getVertexValue() + |
| ", edgeMap=" + vertex.iterator()); |
| } |
| return vertex; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public float getProgress() throws IOException { |
| return verticesRead * 100.0f / totalSplitVertices; |
| } |
| } |
| } |