blob: 67863dc0f5003c36e13b8580d561e89822622b14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.partition;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.WorkerGraphPartitioner;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
/** Test {@link org.apache.giraph.partition.SimpleLongRangePartitionerFactory}. */
public class SimpleRangePartitionFactoryTest {
private void testRange(int numWorkers, int numPartitions, int keySpaceSize,
int allowedWorkerDiff, boolean emptyWorkers) {
Configuration conf = new Configuration();
conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize);
GiraphConstants.USER_PARTITION_COUNT.set(conf, numPartitions);
SimpleLongRangePartitionerFactory<Writable, Writable> factory =
new SimpleLongRangePartitionerFactory<Writable, Writable>();
factory.setConf(new ImmutableClassesGiraphConfiguration(conf));
ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
for (int i = 0; i < numWorkers; i++) {
WorkerInfo info = new WorkerInfo();
info.setInetSocketAddress(new InetSocketAddress(8080), "127.0.0.1");
info.setTaskId(i);
infos.add(info);
}
Collection<PartitionOwner> owners =
factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1);
int[] tasks = new int[owners.size()];
for (PartitionOwner owner : owners) {
WorkerInfo worker = owner.getWorkerInfo();
assertEquals(0, tasks[owner.getPartitionId()]);
tasks[owner.getPartitionId()] = worker.getTaskId() + 1;
}
checkMapping(tasks, allowedWorkerDiff, emptyWorkers);
WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
factory.createWorkerGraphPartitioner();
workerPartitioner.updatePartitionOwners(null, owners);
LongWritable longWritable = new LongWritable();
int[] partitions = new int[keySpaceSize];
for (int i = 0; i < keySpaceSize; i++) {
longWritable.set(i);
PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable);
partitions[i] = owner.getPartitionId();
}
checkMapping(partitions, 1, emptyWorkers);
}
private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) {
int prev = -1;
int max = 0;
int min = Integer.MAX_VALUE;
int cur = 0;
for (int value : mapping) {
if (value != prev) {
if (prev != -1) {
min = Math.min(cur, min);
max = Math.max(cur, max);
assertTrue(prev < value);
if (!emptyWorkers) {
assertEquals(prev + 1, value);
}
}
cur = 1;
} else {
cur++;
}
prev = value;
}
assertTrue(min + allowedDiff >= max);
}
@Test
public void testLongRangePartitionerFactory() {
// perfect distribution
testRange(10, 1000, 100000, 0, false);
testRange(1000, 50000, 100000, 0, false);
// perfect distribution even when max is hit, and max is not divisible by #workers
testRange(8949, (50000 / 8949) * 8949, 100023, 0, false);
testRange(1949, (50000 / 1949) * 1949, 211111, 0, false);
// imperfect distribution - because there are more workers than max partitions.
testRange(194942, 50000, 211111, 1, true);
}
}