blob: dcdb002d26825018245832effb385b477d670288 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework.no_vtx;
import java.util.Iterator;
import org.apache.giraph.block_app.framework.AbstractBlockFactory;
import org.apache.giraph.block_app.framework.BlockUtils;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.test_setup.TestGraphUtils;
import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.function.ObjectTransfer;
import org.apache.giraph.function.primitive.Int2IntFunction;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.types.ops.collections.array.WIntArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
public class MessagesWithoutVerticesTest {
@Test
public void testWithLocalBlockRunner() throws Exception {
testSumOverSameGroup(3, false);
}
@Test
public void testWithGiraphEnv() throws Exception {
testSumOverSameGroup(3, true);
}
private void testSumOverSameGroup(int max, boolean fullGiraphEnv) throws Exception {
// uses messages to [-1, max - 1)
Int2IntFunction f = (id) -> id % max - 1;
TestGraphUtils.runTest(
TestGraphUtils.chainModifiers(
new Small1GraphInit<IntWritable, IntWritable, NullWritable>(),
new EachVertexInit<>((vertex) -> vertex.getValue().set(f.apply(vertex.getId().get())))),
(graph) -> {
Int2IntOpenHashMap sums = new Int2IntOpenHashMap();
for (int i = 0; i < max; i ++) {
sums.addTo(f.apply(i), f.apply(i) + 1);
}
for (Vertex<IntWritable, IntWritable, NullWritable> vtx : graph.getTestGraph()) {
sums.addTo(f.apply(vtx.getId().get()), vtx.getId().get() + 1);
}
for (Vertex<IntWritable, IntWritable, NullWritable> vtx : graph.getTestGraph()) {
Assert.assertEquals(sums.get(f.apply(vtx.getId().get())), vtx.getValue().get());
}
Assert.assertNull(graph.getVertex(-1));
},
(GiraphConfiguration conf) -> {
BlockUtils.setBlockFactoryClass(conf, MessagesWithoutVerticesBlockFactory.class);
TestGraphUtils.USE_FULL_GIRAPH_ENV_IN_TESTS.set(conf, fullGiraphEnv);
});
}
public static class MessagesWithoutVerticesBlockFactory extends AbstractBlockFactory<Object> {
@Override
public Block createBlock(GiraphConfiguration conf) {
ObjectTransfer<Iterable<IntWritable>> msgsTransfer = new ObjectTransfer<>();
return new SequenceBlock(
new Piece<IntWritable, IntWritable, NullWritable, IntWritable, Object>() {
@Override
public VertexSender<IntWritable, IntWritable, NullWritable> getVertexSender(
BlockWorkerSendApi<IntWritable, IntWritable, NullWritable, IntWritable> workerApi,
Object executionStage) {
return (vtx) -> {
workerApi.sendMessage(vtx.getValue(), vtx.getId());
};
}
@Override
public VertexReceiver<IntWritable,IntWritable,NullWritable,IntWritable> getVertexReceiver(
BlockWorkerReceiveApi<IntWritable> workerApi, Object executionStage) {
return (vtx, msgs) -> {
Assert.assertFalse("" + vtx.getId(), Iterables.isEmpty(msgs));
msgsTransfer.apply(msgs);
};
}
@Override
protected Class<IntWritable> getMessageClass() {
return IntWritable.class;
}
@Override
protected boolean receiveIgnoreExistingVertices() {
return true;
}
},
new Piece<IntWritable, IntWritable, NullWritable, IntWritable, Object>() {
@Override
public VertexSender<IntWritable, IntWritable, NullWritable> getVertexSender(
BlockWorkerSendApi<IntWritable, IntWritable, NullWritable, IntWritable> workerApi,
Object executionStage) {
return (vtx) -> {
WIntArrayList received = new WIntArrayList();
int sum = vtx.getId().get() + 1;
for (IntWritable msg : msgsTransfer.get()) {
received.add(msg.get());
sum += msg.get() + 1;
}
workerApi.sendMessageToMultipleEdges(received.fastIteratorW(), new IntWritable(sum));
};
}
@Override
public VertexReceiver<IntWritable,IntWritable,NullWritable,IntWritable> getVertexReceiver(
BlockWorkerReceiveApi<IntWritable> workerApi, Object executionStage) {
return (vtx, msgs) -> {
Iterator<IntWritable> iter = msgs.iterator();
vtx.getValue().set(iter.next().get());
Assert.assertFalse(iter.hasNext());
};
}
@Override
protected Class<IntWritable> getMessageClass() {
return IntWritable.class;
}
}
);
}
@Override
public Object createExecutionStage(GiraphConfiguration conf) {
return new Object();
}
@Override
protected Class<IntWritable> getVertexIDClass(GiraphConfiguration conf) {
return IntWritable.class;
}
@Override
protected Class<IntWritable> getVertexValueClass(GiraphConfiguration conf) {
return IntWritable.class;
}
@Override
protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) {
return NullWritable.class;
}
}
}