blob: 90360d6ddd1e880588f6bfef261b62435bc665e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.library.triangles;
import org.apache.giraph.block_app.framework.AbstractBlockFactory;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.IfBlock;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.library.prepare_graph.PrepareGraphPieces;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.LongNullHashSetEdges;
import org.apache.giraph.types.ops.LongTypeOps;
import org.apache.giraph.types.ops.collections.array.WLongArrayList;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.longs.LongListIterator;
/**
* Count triangles in undirected graph.
* Assumes that edges in both directions are present, or optionally makes
* graph symmetric first.
*/
public class UndirectedTriangleCountingBlockFactory
extends AbstractBlockFactory<Object> {
/**
* Set to true to make graph symmetric before calculation if it's not already
*/
public static final BooleanConfOption MAKE_GRAPH_SYMMETRIC =
new BooleanConfOption("giraph.triangleCounting.makeGraphSymmetric",
false, "Whether it's needed to first make graph symmetric or not");
@Override
protected Class<? extends WritableComparable> getVertexIDClass(
GiraphConfiguration conf) {
return LongWritable.class;
}
@Override
protected Class<? extends Writable> getVertexValueClass(
GiraphConfiguration conf) {
return LongWritable.class;
}
@Override
protected Class<? extends Writable> getEdgeValueClass(
GiraphConfiguration conf) {
return NullWritable.class;
}
@Override
public Block createBlock(GiraphConfiguration conf) {
return new SequenceBlock(
new IfBlock(() -> MAKE_GRAPH_SYMMETRIC.get(conf),
PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE)),
new UndirectedCountTrianglesPiece());
}
@Override
public Object createExecutionStage(GiraphConfiguration conf) {
return new Object();
}
@Override
protected void additionalInitConfig(GiraphConfiguration conf) {
conf.setOutEdgesClass(LongNullHashSetEdges.class);
GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset(
conf, MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX);
}
/**
* Piece in which all vertices will send to their neighbors the list of their
* neighbors and vertices will count how many of received ids are their
* neighbors.
* TODO could be striped to make memory requirements lower
*/
public static class UndirectedCountTrianglesPiece extends
Piece<LongWritable, LongWritable, NullWritable, WLongArrayList, Object> {
@Override
public VertexSender<LongWritable, LongWritable, NullWritable>
getVertexSender(
BlockWorkerSendApi<LongWritable, LongWritable, NullWritable,
WLongArrayList> workerApi,
Object executionStage) {
WLongArrayList reusableMessage = new WLongArrayList();
return vertex -> {
reusableMessage.clear();
for (Edge<LongWritable, NullWritable> edge : vertex.getEdges()) {
// TODO We could only send edge once
reusableMessage.add(edge.getTargetVertexId().get());
}
workerApi.sendMessageToAllEdges(vertex, reusableMessage);
};
}
@Override
public VertexReceiver<LongWritable, LongWritable, NullWritable,
WLongArrayList> getVertexReceiver(
BlockWorkerReceiveApi<LongWritable> workerApi, Object executionStage) {
LongWritable reusableId = new LongWritable();
return (vertex, messages) -> {
long numTriangles = 0;
for (WLongArrayList message : messages) {
LongListIterator iter = message.iterator();
while (iter.hasNext()) {
reusableId.set(iter.nextLong());
if (reusableId.get() != vertex.getId().get() &&
vertex.getEdgeValue(reusableId) != null) {
numTriangles++;
}
}
}
Preconditions.checkArgument(numTriangles % 2 == 0);
vertex.getValue().set(numTriangles / 2);
};
}
@Override
protected Class<WLongArrayList> getMessageClass() {
return WLongArrayList.class;
}
@Override
protected boolean allowOneMessageToManyIdsEncoding() {
return true;
}
}
}