blob: a7451bc04daa39f94c495d5b198f7f7806d93142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.partition;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.utils.NoOpComputation;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test case for partition stores.
*/
@SuppressWarnings("unchecked")
public class TestPartitionStores {
private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable> conf;
private Mapper<?, ?, ?, ?>.Context context;
/* these static variables are used for the multithreaded tests */
private static final int NUM_OF_VERTEXES_PER_PARTITION = 20;
private static final int NUM_OF_EDGES_PER_VERTEX = 5;
private static final int NUM_OF_THREADS = 8;
private static final int NUM_OF_PARTITIONS = 30;
private static final int NUM_PARTITIONS_IN_MEMORY = 12;
public static class MyComputation extends NoOpComputation<IntWritable,
IntWritable, NullWritable, IntWritable> { }
private Partition<IntWritable, IntWritable, NullWritable> createPartition(
ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable> conf,
Integer id,
Vertex<IntWritable, IntWritable, NullWritable>... vertices) {
Partition<IntWritable, IntWritable, NullWritable> partition =
conf.createPartition(id, context);
for (Vertex<IntWritable, IntWritable, NullWritable> v : vertices) {
partition.putVertex(v);
}
return partition;
}
@Before
public void setUp() {
GiraphConfiguration configuration = new GiraphConfiguration();
configuration.setComputationClass(MyComputation.class);
conf = new ImmutableClassesGiraphConfiguration<>(configuration);
context = Mockito.mock(Mapper.Context.class);
}
@Test
public void testSimplePartitionStore() {
PartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore = new SimplePartitionStore<>(conf, context);
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
}
@Test
public void testUnsafePartitionSerializationClass() throws IOException {
conf.setPartitionClass(ByteArrayPartition.class);
Vertex<IntWritable, IntWritable, NullWritable> v1 =
conf.createVertex();
v1.initialize(new IntWritable(1), new IntWritable(1));
Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
v2.initialize(new IntWritable(2), new IntWritable(2));
Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
v3.initialize(new IntWritable(3), new IntWritable(3));
Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
v4.initialize(new IntWritable(4), new IntWritable(4));
Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
v5.initialize(new IntWritable(5), new IntWritable(5));
Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
v6.initialize(new IntWritable(6), new IntWritable(6));
Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
v7.initialize(new IntWritable(7), new IntWritable(7));
Partition<IntWritable, IntWritable, NullWritable> partition =
createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
assertEquals(3, partition.getId());
assertEquals(0, partition.getEdgeCount());
assertEquals(7, partition.getVertexCount());
UnsafeByteArrayOutputStream outputStream = new
UnsafeByteArrayOutputStream();
partition.write(outputStream);
UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
outputStream.getByteArray(), 0, outputStream.getPos());
Partition<IntWritable, IntWritable, NullWritable> deserializatedPartition =
conf.createPartition(-1, context);
deserializatedPartition.readFields(inputStream);
assertEquals(3, deserializatedPartition.getId());
assertEquals(0, deserializatedPartition.getEdgeCount());
assertEquals(7, deserializatedPartition.getVertexCount());
}
@Test
public void testDiskBackedPartitionStoreWithByteArrayPartition()
throws IOException {
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(
conf, new File(directory, "giraph_partitions").toString());
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
conf.setPartitionClass(ByteArrayPartition.class);
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
GraphTaskManager<IntWritable, IntWritable, NullWritable>
graphTaskManager = new GraphTaskManager<>(context);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
(DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
serverData.getPartitionStore();
partitionStore.initialize();
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
FileUtils.deleteDirectory(directory);
}
@Test
public void testDiskBackedPartitionStore() throws IOException {
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(
conf, new File(directory, "giraph_partitions").toString());
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
GraphTaskManager<IntWritable, IntWritable, NullWritable>
graphTaskManager = new GraphTaskManager<>(context);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
(DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
serverData.getPartitionStore();
partitionStore.initialize();
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
FileUtils.deleteDirectory(directory);
}
@Test
public void testDiskBackedPartitionStoreComputation() throws Exception {
Iterable<String> results;
String[] graph =
{
"[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
"[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
};
String[] expected =
{
"1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
"6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
};
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
new File(directory, "giraph_partitions").toString());
conf.setComputationClass(EmptyComputation.class);
conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
results = InternalVertexRunner.run(conf, graph);
checkResults(results, expected);
FileUtils.deleteDirectory(directory);
}
@Test
public void testDiskBackedPartitionStoreWithByteArrayComputation()
throws Exception {
Iterable<String> results;
String[] graph =
{
"[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
"[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
};
String[] expected =
{
"1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
"6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
};
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
new File(directory, "giraph_partitions").toString());
conf.setPartitionClass(ByteArrayPartition.class);
conf.setComputationClass(EmptyComputation.class);
conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
results = InternalVertexRunner.run(conf, graph);
checkResults(results, expected);
FileUtils.deleteDirectory(directory);
}
@Test
public void testDiskBackedPartitionStoreMT() throws Exception {
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
GiraphConstants.STATIC_GRAPH.set(conf, false);
testMultiThreaded();
}
@Test
public void testDiskBackedPartitionStoreMTStatic() throws Exception {
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
GiraphConstants.STATIC_GRAPH.set(conf, true);
testMultiThreaded();
}
@Test
public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
GiraphConstants.STATIC_GRAPH.set(conf, true);
NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
testMultiThreaded();
}
private void testMultiThreaded() throws Exception {
final AtomicInteger vertexCounter = new AtomicInteger(0);
ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
ExecutorCompletionService<Boolean> executor =
new ExecutorCompletionService<>(pool);
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(
conf, new File(directory, "giraph_partitions").toString());
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
GraphTaskManager<IntWritable, IntWritable, NullWritable>
graphTaskManager = new GraphTaskManager<>(context);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
store =
(DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
serverData.getPartitionStore();
store.initialize();
// Create a new Graph in memory using multiple threads
for (int i = 0; i < NUM_OF_THREADS; ++i) {
List<Integer> partitionIds = new ArrayList<>();
for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
partitionIds.add(id);
}
Worker worker =
new Worker(vertexCounter, store, partitionIds, conf);
executor.submit(worker, true);
}
for (int i = 0; i < NUM_OF_THREADS; ++i)
executor.take();
pool.shutdownNow();
// Check the number of vertices
int totalVertexes = 0;
int totalEdges = 0;
Partition<IntWritable, IntWritable, NullWritable> partition;
for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
totalVertexes += store.getPartitionVertexCount(i);
totalEdges += store.getPartitionEdgeCount(i);
}
assert vertexCounter.get() == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
assert totalVertexes == NUM_OF_PARTITIONS * NUM_OF_VERTEXES_PER_PARTITION;
assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
// Check the content of the vertices
int expected = 0;
for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION * NUM_OF_PARTITIONS; ++i) {
expected += i;
}
int totalValues = 0;
store.startIteration();
for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
partition = store.getNextPartition();
assert partition != null;
for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) {
totalValues += v.getId().get();
}
store.putPartition(partition);
}
assert totalValues == expected;
store.shutdown();
}
private Partition<IntWritable, IntWritable, NullWritable>
getPartition(PartitionStore<IntWritable, IntWritable,
NullWritable> partitionStore, int partitionId) {
Partition p;
Partition result = null;
while ((p = partitionStore.getNextPartition()) != null) {
if (p.getId() == partitionId) {
result = p;
}
partitionStore.putPartition(p);
}
return result;
}
/**
* Test reading/writing to/from a partition store
*
* @param partitionStore Partition store to test
* @param conf Configuration to use
*/
public void testReadWrite(
PartitionStore<IntWritable, IntWritable,
NullWritable> partitionStore,
ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable> conf) {
Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
v1.initialize(new IntWritable(1), new IntWritable(1));
Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
v2.initialize(new IntWritable(2), new IntWritable(2));
Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
v3.initialize(new IntWritable(3), new IntWritable(3));
Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
v4.initialize(new IntWritable(4), new IntWritable(4));
Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
v5.initialize(new IntWritable(5), new IntWritable(5));
Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
v6.initialize(new IntWritable(7), new IntWritable(7));
Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
v7.initialize(new IntWritable(7), new IntWritable(7));
v7.addEdge(EdgeFactory.create(new IntWritable(1)));
v7.addEdge(EdgeFactory.create(new IntWritable(2)));
partitionStore.addPartition(createPartition(conf, 1, v1, v2, v6));
partitionStore.addPartition(createPartition(conf, 2, v3, v4));
partitionStore.addPartition(createPartition(conf, 3, v5));
partitionStore.addPartition(createPartition(conf, 4, v7));
partitionStore.startIteration();
getPartition(partitionStore, 1);
partitionStore.startIteration();
getPartition(partitionStore, 2);
partitionStore.startIteration();
partitionStore.removePartition(3);
getPartition(partitionStore, 4);
assertEquals(3, partitionStore.getNumPartitions());
assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
int partitionsNumber = 0;
partitionStore.startIteration();
Partition<IntWritable, IntWritable, NullWritable> p;
while ((p = partitionStore.getNextPartition()) != null) {
partitionStore.putPartition(p);
partitionsNumber++;
}
assertEquals(3, partitionsNumber);
assertTrue(partitionStore.hasPartition(1));
assertTrue(partitionStore.hasPartition(2));
assertFalse(partitionStore.hasPartition(3));
assertTrue(partitionStore.hasPartition(4));
assertEquals(3, partitionStore.getPartitionVertexCount(1));
assertEquals(2, partitionStore.getPartitionVertexCount(2));
assertEquals(1, partitionStore.getPartitionVertexCount(4));
assertEquals(2, partitionStore.getPartitionEdgeCount(4));
}
/**
* Internal checker to verify the correctness of the tests.
* @param results the actual results obtained
* @param expected expected results
*/
private void checkResults(Iterable<String> results, String[] expected) {
for (String str : results) {
boolean found = false;
for (String expectedStr : expected) {
if (expectedStr.equals(str)) {
found = true;
}
}
assert found;
}
}
/**
* Test compute method that sends each edge a notification of its parents.
* The test set only has a 1-1 parent-to-child ratio for this unit test.
*/
public static class EmptyComputation
extends BasicComputation<LongWritable, DoubleWritable, FloatWritable,
LongWritable> {
@Override
public void compute(
Vertex<LongWritable, DoubleWritable,FloatWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
vertex.voteToHalt();
}
}
@Test
public void testEdgeCombineWithSimplePartition() throws IOException {
testEdgeCombine(SimplePartition.class);
}
@Test
public void testEdgeCombineWithByteArrayPartition() throws IOException {
testEdgeCombine(ByteArrayPartition.class);
}
private void testEdgeCombine(Class<? extends Partition> partitionClass)
throws IOException {
Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
v1.initialize(new IntWritable(1), new IntWritable(1));
Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
v2.initialize(new IntWritable(2), new IntWritable(2));
Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
v3.initialize(new IntWritable(3), new IntWritable(3));
Vertex<IntWritable, IntWritable, NullWritable> v1e2 = conf.createVertex();
v1e2.initialize(new IntWritable(1), new IntWritable(1));
v1e2.addEdge(EdgeFactory.create(new IntWritable(2)));
Vertex<IntWritable, IntWritable, NullWritable> v1e3 = conf.createVertex();
v1e3.initialize(new IntWritable(1), new IntWritable(1));
v1e3.addEdge(EdgeFactory.create(new IntWritable(3)));
GiraphConfiguration newconf = new GiraphConfiguration(conf);
newconf.setPartitionClass(partitionClass);
Partition<IntWritable, IntWritable, NullWritable> partition =
(new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable>(newconf)).createPartition(1, context);
assertEquals(partitionClass, partition.getClass());
partition.putVertex(v1);
partition.putVertex(v2);
partition.putVertex(v3);
assertEquals(3, partition.getVertexCount());
assertEquals(0, partition.getEdgeCount());
partition.putOrCombine(v1e2);
assertEquals(3, partition.getVertexCount());
assertEquals(1, partition.getEdgeCount());
partition.putOrCombine(v1e3);
assertEquals(3, partition.getVertexCount());
assertEquals(2, partition.getEdgeCount());
v1 = partition.getVertex(new IntWritable(1));
assertEquals(new IntWritable(1), v1.getId());
assertEquals(new IntWritable(1), v1.getValue());
assertEquals(2, v1.getNumEdges());
}
private class Worker implements Runnable {
private final AtomicInteger vertexCounter;
private final PartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore;
private final List<Integer> partitionIds;
private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable> conf;
public Worker(AtomicInteger vertexCounter,
PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
List<Integer> partitionIds,
ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
NullWritable> conf) {
this.vertexCounter = vertexCounter;
this.partitionStore = partitionStore;
this.partitionIds = partitionIds;
this.conf = conf;
}
public void run() {
for (int partitionId : partitionIds) {
Partition<IntWritable, IntWritable, NullWritable> partition =
conf.createPartition(partitionId, context);
for (int i = 0; i < NUM_OF_VERTEXES_PER_PARTITION; ++i) {
int id = vertexCounter.getAndIncrement();
Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
v.initialize(new IntWritable(id), new IntWritable(id));
Random rand = new Random(id);
for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
int dest = rand.nextInt(id + 1);
v.addEdge(EdgeFactory.create(new IntWritable(dest)));
}
partition.putVertex(v);
}
partitionStore.addPartition(partition);
}
}
}
@Test
public void testOutOfCoreMessages() throws Exception {
Iterable<String> results;
String[] graph =
{ "1 0 2 3", "2 0 3 5", "3 0 1 2 4", "4 0 3", "5 0 6 7 1 2",
"6 0 10 8 7", "7 0 1 3", "8 0 1 10 9 4 6", "9 0 8 1 5 7",
"10 0 9" };
String[] expected =
{
"1\t32", "2\t9", "3\t14", "4\t11", "5\t11",
"6\t13", "7\t20", "8\t15", "9\t18", "10\t14"
};
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
GiraphConstants.USER_PARTITION_COUNT.set(conf, 10);
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(conf,
new File(directory, "giraph_partitions").toString());
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
conf.setComputationClass(TestOutOfCoreMessagesComputation.class);
conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
results = InternalVertexRunner.run(conf, graph);
checkResults(results, expected);
FileUtils.deleteDirectory(directory);
}
public static class TestOutOfCoreMessagesComputation extends
BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
@Override
public void compute(
Vertex<IntWritable, IntWritable, NullWritable> vertex,
Iterable<IntWritable> messages) throws IOException {
if (getSuperstep() == 0) {
// Send id to all neighbors
sendMessageToAllEdges(vertex, vertex.getId());
} else {
// Add received messages and halt
int sum = 0;
for (IntWritable message : messages) {
sum += message.get();
}
vertex.setValue(new IntWritable(sum));
vertex.voteToHalt();
}
}
}
}