| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.graph.library.linkanalysis; |
| |
| import org.apache.flink.api.common.aggregators.ConvergenceCriterion; |
| import org.apache.flink.api.common.aggregators.DoubleSumAggregator; |
| import org.apache.flink.api.common.functions.CoGroupFunction; |
| import org.apache.flink.api.common.functions.FlatMapFunction; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.common.functions.ReduceFunction; |
| import org.apache.flink.api.common.functions.RichJoinFunction; |
| import org.apache.flink.api.common.functions.RichMapFunction; |
| import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; |
| import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; |
| import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; |
| import org.apache.flink.api.java.operators.IterativeDataSet; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.graph.Edge; |
| import org.apache.flink.graph.Graph; |
| import org.apache.flink.graph.Vertex; |
| import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees; |
| import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees; |
| import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; |
| import org.apache.flink.graph.asm.result.PrintableResult; |
| import org.apache.flink.graph.asm.result.UnaryResultBase; |
| import org.apache.flink.graph.library.linkanalysis.Functions.SumScore; |
| import org.apache.flink.graph.library.linkanalysis.PageRank.Result; |
| import org.apache.flink.graph.utils.GraphUtils; |
| import org.apache.flink.graph.utils.MurmurHash; |
| import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase; |
| import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; |
| import org.apache.flink.types.DoubleValue; |
| import org.apache.flink.types.LongValue; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.util.Preconditions; |
| |
| import java.util.Collection; |
| import java.util.Iterator; |
| |
| /** |
| * PageRank computes a per-vertex score which is the sum of PageRank scores |
| * transmitted over in-edges. Each vertex's score is divided evenly among |
| * out-edges. High-scoring vertices are linked to by other high-scoring |
| * vertices; this is similar to the 'authority' score in {@link HITS}. |
| * |
| * <p>See http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf |
| * |
| * @param <K> graph ID type |
| * @param <VV> vertex value type |
| * @param <EV> edge value type |
| */ |
| public class PageRank<K, VV, EV> |
| extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { |
| |
| private static final String VERTEX_COUNT = "vertex count"; |
| |
| private static final String SUM_OF_SCORES = "sum of scores"; |
| |
| private static final String CHANGE_IN_SCORES = "change in scores"; |
| |
| // Required configuration |
| private final double dampingFactor; |
| |
| private int maxIterations; |
| |
| private double convergenceThreshold; |
| |
| // Optional configuration |
| private boolean includeZeroDegreeVertices = false; |
| |
| /** |
| * PageRank with a fixed number of iterations. |
| * |
| * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex |
| * @param iterations fixed number of iterations |
| */ |
| public PageRank(double dampingFactor, int iterations) { |
| this(dampingFactor, iterations, Double.MAX_VALUE); |
| } |
| |
| /** |
| * PageRank with a convergence threshold. The algorithm terminates when the |
| * change in score over all vertices falls to or below the given threshold value. |
| * |
| * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex |
| * @param convergenceThreshold convergence threshold for sum of scores |
| */ |
| public PageRank(double dampingFactor, double convergenceThreshold) { |
| this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold); |
| } |
| |
| /** |
| * PageRank with a convergence threshold and a maximum iteration count. The |
| * algorithm terminates after either the given number of iterations or when |
| * the change in score over all vertices falls to or below the given |
| * threshold value. |
| * |
| * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex |
| * @param maxIterations maximum number of iterations |
| * @param convergenceThreshold convergence threshold for sum of scores |
| */ |
| public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) { |
| Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1, |
| "Damping factor must be between zero and one"); |
| Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero"); |
| Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero"); |
| |
| this.dampingFactor = dampingFactor; |
| this.maxIterations = maxIterations; |
| this.convergenceThreshold = convergenceThreshold; |
| } |
| |
| /** |
| * This PageRank implementation properly handles both source and sink |
| * vertices which have, respectively, only outgoing and incoming edges. |
| * |
| * <p>Setting this flag includes "zero-degree" vertices in the PageRank |
| * computation and result. These vertices are handled the same as other |
| * "source" vertices (with a consistent score of |
| * <code>(1 - damping factor) / number of vertices</code>) but only |
| * affect the scores of other vertices indirectly through the taking of |
| * this proportional portion of the "random jump" score. |
| * |
| * <p>The cost to include zero-degree vertices is a reduce for uniqueness |
| * on the vertex set followed by an outer join on the vertex degree |
| * DataSet. |
| * |
| * @param includeZeroDegreeVertices whether to include zero-degree vertices in the iterative computation |
| * @return this |
| */ |
| public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { |
| this.includeZeroDegreeVertices = includeZeroDegreeVertices; |
| |
| return this; |
| } |
| |
| @Override |
| protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) { |
| if (!super.canMergeConfigurationWith(other)) { |
| return false; |
| } |
| |
| PageRank rhs = (PageRank) other; |
| |
| return dampingFactor == rhs.dampingFactor && |
| includeZeroDegreeVertices == rhs.includeZeroDegreeVertices; |
| } |
| |
| @Override |
| protected void mergeConfiguration(GraphAlgorithmWrappingBase other) { |
| super.mergeConfiguration(other); |
| |
| PageRank rhs = (PageRank) other; |
| |
| maxIterations = Math.max(maxIterations, rhs.maxIterations); |
| convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold); |
| } |
| |
| @Override |
| public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) |
| throws Exception { |
| // vertex degree |
| DataSet<Vertex<K, Degrees>> vertexDegree = input |
| .run(new VertexDegrees<K, VV, EV>() |
| .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) |
| .setParallelism(parallelism)); |
| |
| // vertex count |
| DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree); |
| |
| // s, t, d(s) |
| DataSet<Edge<K, LongValue>> edgeSourceDegree = input |
| .run(new EdgeSourceDegrees<K, VV, EV>() |
| .setParallelism(parallelism)) |
| .map(new ExtractSourceDegree<>()) |
| .setParallelism(parallelism) |
| .name("Extract source degree"); |
| |
| // vertices with zero in-edges |
| DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree |
| .flatMap(new InitializeSourceVertices<>()) |
| .setParallelism(parallelism) |
| .name("Initialize source vertex scores"); |
| |
| // s, initial pagerank(s) |
| DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree |
| .map(new InitializeVertexScores<>()) |
| .withBroadcastSet(vertexCount, VERTEX_COUNT) |
| .setParallelism(parallelism) |
| .name("Initialize scores"); |
| |
| IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores |
| .iterate(maxIterations) |
| .setParallelism(parallelism); |
| |
| // s, projected pagerank(s) |
| DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative |
| .coGroup(edgeSourceDegree) |
| .where(0) |
| .equalTo(0) |
| .with(new SendScore<>()) |
| .setParallelism(parallelism) |
| .name("Send score") |
| .groupBy(0) |
| .reduce(new SumScore<>()) |
| .setCombineHint(CombineHint.HASH) |
| .setParallelism(parallelism) |
| .name("Sum"); |
| |
| // ignored ID, total pagerank |
| DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores |
| .reduce(new SumVertexScores<>()) |
| .setParallelism(parallelism) |
| .name("Sum"); |
| |
| // s, adjusted pagerank(s) |
| DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores |
| .union(sourceVertices) |
| .setParallelism(parallelism) |
| .name("Union with source vertices") |
| .map(new AdjustScores<>(dampingFactor)) |
| .withBroadcastSet(sumOfScores, SUM_OF_SCORES) |
| .withBroadcastSet(vertexCount, VERTEX_COUNT) |
| .setParallelism(parallelism) |
| .name("Adjust scores"); |
| |
| DataSet<Tuple2<K, DoubleValue>> passThrough; |
| |
| if (convergenceThreshold < Double.MAX_VALUE) { |
| passThrough = iterative |
| .join(adjustedScores) |
| .where(0) |
| .equalTo(0) |
| .with(new ChangeInScores<>()) |
| .setParallelism(parallelism) |
| .name("Change in scores"); |
| |
| iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold)); |
| } else { |
| passThrough = adjustedScores; |
| } |
| |
| return iterative |
| .closeWith(passThrough) |
| .map(new TranslateResult<>()) |
| .setParallelism(parallelism) |
| .name("Map result"); |
| } |
| |
| /** |
| * Remove the unused original edge value and extract the out-degree. |
| * |
| * @param <T> ID type |
| * @param <ET> edge value type |
| */ |
| @ForwardedFields("0; 1") |
| private static class ExtractSourceDegree<T, ET> |
| implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> { |
| Edge<T, LongValue> output = new Edge<>(); |
| |
| @Override |
| public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge) |
| throws Exception { |
| output.f0 = edge.f0; |
| output.f1 = edge.f1; |
| output.f2 = edge.f2.f1.getOutDegree(); |
| return output; |
| } |
| } |
| |
| /** |
| * Source vertices have no in-edges so have a projected score of 0.0. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFields("0") |
| private static class InitializeSourceVertices<T> |
| implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> { |
| private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0)); |
| |
| @Override |
| public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out) |
| throws Exception { |
| if (vertex.f1.getInDegree().getValue() == 0) { |
| output.f0 = vertex.f0; |
| out.collect(output); |
| } |
| } |
| } |
| |
| /** |
| * PageRank scores sum to 1.0 so initialize each vertex with the inverse of |
| * the number of vertices. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFields("0") |
| private static class InitializeVertexScores<T> |
| extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> { |
| private Tuple2<T, DoubleValue> output = new Tuple2<>(); |
| |
| @Override |
| public void open(Configuration parameters) |
| throws Exception { |
| super.open(parameters); |
| |
| Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); |
| Iterator<LongValue> vertexCountIterator = vertexCount.iterator(); |
| output.f1 = new DoubleValue(vertexCountIterator.hasNext() ? 1.0 / vertexCountIterator.next().getValue() : Double.NaN); |
| } |
| |
| @Override |
| public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex) |
| throws Exception { |
| output.f0 = vertex.f0; |
| return output; |
| } |
| } |
| |
| /** |
| * The PageRank score for each vertex is divided evenly and projected to |
| * neighbors on out-edges. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFieldsSecond("1->0") |
| private static class SendScore<T> |
| implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> { |
| private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue()); |
| |
| @Override |
| public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out) |
| throws Exception { |
| Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator(); |
| |
| if (edgeIterator.hasNext()) { |
| Edge<T, LongValue> edge = edgeIterator.next(); |
| |
| output.f0 = edge.f1; |
| output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue()); |
| out.collect(output); |
| |
| while (edgeIterator.hasNext()) { |
| edge = edgeIterator.next(); |
| output.f0 = edge.f1; |
| out.collect(output); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sum the PageRank score over all vertices. The vertex ID must be ignored |
| * but is retained rather than adding another operator. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFields("0") |
| private static class SumVertexScores<T> |
| implements ReduceFunction<Tuple2<T, DoubleValue>> { |
| @Override |
| public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) |
| throws Exception { |
| first.f1.setValue(first.f1.getValue() + second.f1.getValue()); |
| return first; |
| } |
| } |
| |
| /** |
| * Each iteration the per-vertex scores are adjusted with the damping |
| * factor. Each score is multiplied by the damping factor then added to the |
| * probability of a "random hop", which is one minus the damping factor. |
| * |
| * <p>This operation also accounts for 'sink' vertices, which have no |
| * out-edges to project score to. The sink scores are computed by taking |
| * one minus the sum of vertex scores, which also includes precision error. |
| * This 'missing' score is evenly distributed across vertices as with the |
| * random hop. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFields("0") |
| private static class AdjustScores<T> |
| extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> { |
| private double dampingFactor; |
| |
| private long vertexCount; |
| |
| private double uniformlyDistributedScore; |
| |
| public AdjustScores(double dampingFactor) { |
| this.dampingFactor = dampingFactor; |
| } |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| super.open(parameters); |
| |
| Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES); |
| Iterator<Tuple2<T, DoubleValue>> sumOfScoresIterator = sumOfScores.iterator(); |
| // floating point precision error is also included in sumOfSinks |
| double sumOfSinks = 1 - (sumOfScoresIterator.hasNext() ? sumOfScoresIterator.next().f1.getValue() : 0); |
| |
| Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT); |
| Iterator<LongValue> vertexCountIterator = vertexCount.iterator(); |
| this.vertexCount = vertexCountIterator.hasNext() ? vertexCountIterator.next().getValue() : 0; |
| |
| this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount; |
| } |
| |
| @Override |
| public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception { |
| value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue())); |
| return value; |
| } |
| } |
| |
| /** |
| * Computes the sum of the absolute change in vertex PageRank scores |
| * between iterations. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFieldsFirst("0") |
| @ForwardedFieldsSecond("*") |
| private static class ChangeInScores<T> |
| extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> { |
| private double changeInScores; |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| super.open(parameters); |
| |
| changeInScores = 0.0; |
| } |
| |
| @Override |
| public void close() |
| throws Exception { |
| super.close(); |
| |
| DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES); |
| agg.aggregate(changeInScores); |
| } |
| |
| @Override |
| public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) |
| throws Exception { |
| changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue()); |
| return second; |
| } |
| } |
| |
| /** |
| * Monitors the sum of the absolute change in vertex scores. The algorithm |
| * terminates when the change in scores compared against the prior iteration |
| * falls to or below the given convergence threshold. |
| */ |
| private static class ScoreConvergence |
| implements ConvergenceCriterion<DoubleValue> { |
| private double convergenceThreshold; |
| |
| public ScoreConvergence(double convergenceThreshold) { |
| this.convergenceThreshold = convergenceThreshold; |
| } |
| |
| @Override |
| public boolean isConverged(int iteration, DoubleValue value) { |
| double val = value.getValue(); |
| return (val <= convergenceThreshold); |
| } |
| } |
| |
| /** |
| * Map the Tuple result to the return type. |
| * |
| * @param <T> ID type |
| */ |
| @ForwardedFields("0->vertexId0; 1->pageRankScore") |
| private static class TranslateResult<T> |
| implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> { |
| private Result<T> output = new Result<>(); |
| |
| @Override |
| public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception { |
| output.setVertexId0(value.f0); |
| output.setPageRankScore(value.f1); |
| return output; |
| } |
| } |
| |
| /** |
| * A result for the PageRank algorithm. |
| * |
| * @param <T> ID type |
| */ |
| public static class Result<T> |
| extends UnaryResultBase<T> |
| implements PrintableResult { |
| private DoubleValue pageRankScore; |
| |
| /** |
| * Get the PageRank score. |
| * |
| * @return the PageRank score |
| */ |
| public DoubleValue getPageRankScore() { |
| return pageRankScore; |
| } |
| |
| /** |
| * Set the PageRank score. |
| * |
| * @param pageRankScore the PageRank score |
| */ |
| public void setPageRankScore(DoubleValue pageRankScore) { |
| this.pageRankScore = pageRankScore; |
| } |
| |
| @Override |
| public String toString() { |
| return "(" + getVertexId0() |
| + "," + pageRankScore |
| + ")"; |
| } |
| |
| @Override |
| public String toPrintableString() { |
| return "Vertex ID: " + getVertexId0() |
| + ", PageRank score: " + pageRankScore; |
| } |
| |
| // ---------------------------------------------------------------------------------------- |
| |
| public static final int HASH_SEED = 0x4010af29; |
| |
| private transient MurmurHash hasher; |
| |
| @Override |
| public int hashCode() { |
| if (hasher == null) { |
| hasher = new MurmurHash(HASH_SEED); |
| } |
| |
| return hasher.reset() |
| .hash(getVertexId0().hashCode()) |
| .hash(pageRankScore.getValue()) |
| .hash(); |
| } |
| } |
| } |