blob: 44c77755e16e6f84e869b635acbb4053fa6fbcc4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework;
import java.util.HashSet;
import java.util.List;
import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
import org.apache.giraph.block_app.test_setup.NumericTestGraph;
import org.apache.giraph.block_app.test_setup.TestGraphModifier;
import org.apache.giraph.block_app.test_setup.TestGraphUtils;
import org.apache.giraph.conf.BulkConfigurator;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.types.NoMessage;
import org.apache.giraph.utils.TestGraph;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.junit.Assert;
import org.junit.Test;
/**
* Test sending worker to worker messages
*/
public class TestWorkerMessages {
@Test
public void testWorkerMessages() throws Exception {
GiraphConfiguration conf = new GiraphConfiguration();
BlockUtils.setAndInitBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
TestGraph testGraph = new TestGraph(conf);
testGraph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
LocalBlockRunner.runApp(testGraph);
}
@Test
public void testWithTestSetup() throws Exception {
TestGraphUtils.runTest(
new TestGraphModifier<WritableComparable, Writable, Writable>() {
@Override
public void modifyGraph(NumericTestGraph<WritableComparable, Writable, Writable> graph) {
graph.addEdge(1, 2);
}
},
null,
new BulkConfigurator() {
@Override
public void configure(GiraphConfiguration conf) {
BlockUtils.setBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
}
});
}
public static class TestWorkerMessagesBlockFactory extends TestLongNullNullBlockFactory {
@Override
public Block createBlock(GiraphConfiguration conf) {
return new SequenceBlock(
new TestWorkerMessagesPiece(2, 4, 11),
new TestWorkerMessagesPiece(3, 5, 2, 100));
}
}
public static class TestWorkerMessagesPiece extends PieceWithWorkerContext<LongWritable,
Writable, Writable, NoMessage, Object, LongWritable, Object> {
private final HashSet<Long> values;
public TestWorkerMessagesPiece(long... values) {
this.values = new HashSet<>();
for (long value : values) {
this.values.add(value);
}
}
@Override
public void workerContextSend(
BlockWorkerContextSendApi<LongWritable, LongWritable> workerContextApi,
Object executionStage, Object workerValue) {
for (long value : values) {
workerContextApi.sendMessageToWorker(new LongWritable(value),
workerContextApi.getMyWorkerIndex());
}
}
@Override
public void workerContextReceive(BlockWorkerContextReceiveApi workerContextApi,
Object executionStage, Object workerValue, List<LongWritable> workerMessages) {
Assert.assertEquals(values.size(), workerMessages.size());
for (LongWritable workerMessage : workerMessages) {
Assert.assertTrue(values.remove(workerMessage.get()));
}
}
}
}