blob: 266feb7a36b3ced3d0cd29a23d8b8641958e8b0c [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.example.maximalclique;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.pregelix.api.graph.Edge;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.io.VLongWritable;
import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
/**
* The maximal clique example -- find maximal cliques in an undirected graph.
* The result cliques contains vertexes ordered by the vertex id ascendingly. The algorithm takes
* advantage of that property to do effective pruning.
*/
public class MaximalCliqueVertex extends Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
private Map<VLongWritable, AdjacencyListWritable> map = new TreeMap<VLongWritable, AdjacencyListWritable>();
private List<VLongWritable> vertexList = new ArrayList<VLongWritable>();
private Map<VLongWritable, Integer> invertedMap = new TreeMap<VLongWritable, Integer>();
private int largestCliqueSizeSoFar = 0;
private List<BitSet> currentMaximalCliques = new ArrayList<BitSet>();
private CliquesWritable tmpValue = new CliquesWritable();
private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
/**
* Update the current maximal cliques
*
* @param values
* the received adjcency lists
*/
private void updateCurrentMaximalCliques(Iterator<AdjacencyListWritable> values) {
map.clear();
vertexList.clear();
invertedMap.clear();
currentMaximalCliques.clear();
cliques.clear();
tmpValue.reset();
// build the initial sub graph
while (values.hasNext()) {
AdjacencyListWritable adj = values.next();
map.put(adj.getSource(), adj);
}
VLongWritable srcId = getVertexId();
map.put(srcId, new AdjacencyListWritable());
// build the vertex list (vertex id in ascending order) and the inverted list of vertexes
int i = 0;
for (VLongWritable v : map.keySet()) {
vertexList.add(v);
invertedMap.put(v, i++);
}
//clean up adjacency list --- remove vertexes who are not neighbors of key
for (AdjacencyListWritable adj : map.values()) {
adj.cleanNonMatch(vertexList);
}
// get the h-index of the subgraph --- which is the maximum depth to explore
int[] neighborCounts = new int[map.size()];
i = 0;
for (AdjacencyListWritable adj : map.values()) {
neighborCounts[i++] = adj.numberOfNeighbors();
}
Arrays.sort(neighborCounts);
int h = 0;
for (i = neighborCounts.length - 1; i >= 0; i--) {
if (h >= neighborCounts[i]) {
break;
}
h++;
}
if (h < largestCliqueSizeSoFar) {
return;
}
//start depth-first search
BitSet cliqueSoFar = new BitSet(h);
for (VLongWritable v : vertexList) {
cliqueSoFar.set(invertedMap.get(v));
searchClique(h, cliqueSoFar, 1, v);
cliqueSoFar.clear();
}
//output local maximal cliques
for (BitSet clique : currentMaximalCliques) {
int keyIndex = invertedMap.get(srcId);
clique.set(keyIndex);
generateClique(clique);
tmpValue.addCliques(cliques);
tmpValue.setCliqueSize(clique.cardinality());
}
//update the vertex state
setVertexValue(tmpValue);
}
/**
* Output a clique with vertex ids.
*
* @param clique
* the bitmap representation of a clique
*/
private void generateClique(BitSet clique) {
for (int j = 0; j < clique.length();) {
j = clique.nextSetBit(j);
VLongWritable v = vertexList.get(j);
cliques.add(v);
j++;
}
}
/**
* find cliques using the depth-first search
*
* @param maxDepth
* the maximum search depth
* @param cliqueSoFar
* the the cliques found so far
* @param depthSoFar
* the current search depth
* @param currentSource
* the vertex to be added into the clique
*/
private void searchClique(int maxDepth, BitSet cliqueSoFar, int depthSoFar, VLongWritable currentSource) {
if (depthSoFar > maxDepth) {
// update maximal clique info
updateMaximalClique(cliqueSoFar);
return;
}
AdjacencyListWritable adj = map.get(currentSource);
Iterator<VLongWritable> neighbors = adj.getNeighbors();
++depthSoFar;
while (neighbors.hasNext()) {
VLongWritable neighbor = neighbors.next();
if (!isTested(neighbor, cliqueSoFar) && isClique(neighbor, cliqueSoFar)) {
//snapshot the clique
int cliqueLength = cliqueSoFar.length();
// expand the clique
cliqueSoFar.set(invertedMap.get(neighbor));
searchClique(maxDepth, cliqueSoFar, depthSoFar, neighbor);
// back to the snapshot clique
cliqueSoFar.set(cliqueLength, cliqueSoFar.length(), false);
}
}
// update maximal clique info
updateMaximalClique(cliqueSoFar);
}
/**
* Update the maximal clique to a larger one if it exists
*
* @param cliqueSoFar
* the clique so far, in the bitmap representation
*/
private void updateMaximalClique(BitSet cliqueSoFar) {
int cliqueSize = cliqueSoFar.cardinality();
if (cliqueSize > largestCliqueSizeSoFar) {
currentMaximalCliques.clear();
currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
largestCliqueSizeSoFar = cliqueSize;
} else if (cliqueSize == largestCliqueSizeSoFar) {
currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
} else {
return;
}
}
/**
* Should we test the vertex newVertex?
*
* @param newVertex
* the vertex to be tested
* @param cliqueSoFar
* the current clique, in the bitmap representation
* @return true if new vertex has been tested
*/
private boolean isTested(VLongWritable newVertex, BitSet cliqueSoFar) {
int index = invertedMap.get(newVertex);
int largestSetIndex = cliqueSoFar.length() - 1;
if (index > largestSetIndex) {
// we only return cliques with vertexes in the ascending order
// hence, the new vertex must be larger than the largesetSetIndex in the clique
return false;
} else {
// otherwise, we think the vertex is "tested"
return true;
}
}
/**
* Will adding the newVertex yield a bigger clique?
*
* @param newVertex
* the new vertex id
* @param cliqueSoFar
* the bitmap representation of the clique
* @return true if adding the new vertex yelds a bigger clique
*/
private boolean isClique(VLongWritable newVertex, BitSet cliqueSoFar) {
AdjacencyListWritable adj = map.get(newVertex);
// check whether each existing vertex is in the neighbor set of newVertex
for (int i = 0; i < cliqueSoFar.length();) {
i = cliqueSoFar.nextSetBit(i);
VLongWritable v = vertexList.get(i);
if (!adj.isNeighbor(v)) {
return false;
}
i++;
}
return true;
}
/**
* For superstep 1, send outgoing mesages.
* For superstep 2, calculate maximal cliques.
* otherwise, vote to halt.
*/
@Override
public void compute(Iterator<AdjacencyListWritable> msgIterator) {
if (getSuperstep() == 1) {
sortEdges();
sendOutgoingMsgs(getEdges());
} else if (getSuperstep() == 2) {
updateCurrentMaximalCliques(msgIterator);
} else {
voteToHalt();
}
}
@Override
public String toString() {
return getVertexId() + " " + getVertexValue();
}
private static CliquesWritable readMaximalCliqueResult(Configuration conf) {
try {
CliquesWritable result = (CliquesWritable) IterationUtils.readGlobalAggregateValue(conf,
BspUtils.getJobId(conf));
return result;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(TriangleCountingVertex.class.getSimpleName());
job.setVertexClass(MaximalCliqueVertex.class);
job.setGlobalAggregatorClass(MaximalCliqueAggregator.class);
job.setDynamicVertexValueSize(true);
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
Client.run(args, job);
System.out.println("maximal cliques: \n" + readMaximalCliqueResult(job.getConfiguration()));
}
/**
* Send the adjacency lists
*
* @param edges
* the outgoing edges
*/
private void sendOutgoingMsgs(List<Edge<VLongWritable, NullWritable>> edges) {
for (int i = 0; i < edges.size(); i++) {
if (edges.get(i).getDestVertexId().get() < getVertexId().get()) {
// only add emit for the vertexes whose id is smaller than the vertex id
// to avoid the duplicate removal step,
// because all the resulting cliques will have vertexes in the ascending order.
AdjacencyListWritable msg = new AdjacencyListWritable();
msg.setSource(getVertexId());
for (int j = i + 1; j < edges.size(); j++) {
msg.addNeighbor(edges.get(j).getDestVertexId());
}
sendMsg(edges.get(i).getDestVertexId(), msg);
}
}
}
/**
* Maximal Clique VertexWriter
*/
public static class MaximalCliqueVertexWriter extends
TextVertexWriter<VLongWritable, CliquesWritable, NullWritable> {
public MaximalCliqueVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
public void writeVertex(Vertex<VLongWritable, CliquesWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
getRecordWriter().write(new Text(vertex.getVertexId().toString()),
new Text(vertex.getVertexValue().toString()));
}
}
/**
* output format for maximal clique
*/
public static class MaximalCliqueVertexOutputFormat extends
TextVertexOutputFormat<VLongWritable, CliquesWritable, NullWritable> {
@Override
public VertexWriter<VLongWritable, CliquesWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
return new MaximalCliqueVertexWriter(recordWriter);
}
}
}