| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.flink.operators; |
| |
| import org.apache.flink.api.common.functions.FilterFunction; |
| import org.apache.flink.api.common.functions.FlatMapFunction; |
| import org.apache.flink.api.common.functions.GroupReduceFunction; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.operators.IterativeDataSet; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.util.Collector; |
| import org.apache.wayang.basic.operators.PageRankOperator; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.platform.ChannelDescriptor; |
| import org.apache.wayang.core.platform.ChannelInstance; |
| import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; |
| import org.apache.wayang.core.util.Tuple; |
| import org.apache.wayang.flink.channels.DataSetChannel; |
| import org.apache.wayang.flink.execution.FlinkExecutor; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import static org.apache.flink.api.java.aggregation.Aggregations.SUM; |
| |
| /** |
| * Flink implementation of the {@link PageRankOperator}. |
| */ |
| public class FlinkPageRankOperator extends PageRankOperator implements FlinkExecutionOperator { |
| private static final float DAMPENING_FACTOR = 0.85f; |
| private static final float EPSILON = 0.001f; |
| |
| public FlinkPageRankOperator(Integer numIterations) { |
| super(numIterations); |
| } |
| |
| public FlinkPageRankOperator(PageRankOperator pageRankOperator) { |
| super(pageRankOperator); |
| } |
| |
| |
| @Override |
| public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> |
| evaluate(ChannelInstance[] inputs, |
| ChannelInstance[] outputs, |
| FlinkExecutor flinkExecutor, |
| OptimizationContext.OperatorContext operatorContext |
| ) { |
| assert inputs.length == this.getNumInputs(); |
| assert outputs.length == this.getNumOutputs(); |
| |
| final DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0]; |
| final DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0]; |
| |
| MapFunction<org.apache.wayang.basic.data.Tuple2<Long, Long>, Tuple2<Long,Long>> mapFunction = new MapFunction<org.apache.wayang.basic.data.Tuple2<Long, Long>, Tuple2<Long, Long>>() { |
| @Override |
| public Tuple2<Long, Long> map(org.apache.wayang.basic.data.Tuple2<Long, Long> longLongTuple2) throws Exception { |
| return new Tuple2<>(longLongTuple2.field0, longLongTuple2.field1); |
| } |
| }; |
| |
| final DataSet<org.apache.wayang.basic.data.Tuple2<Long, Long>> dataSetInput = input.provideDataSet(); |
| |
| final DataSet<Tuple2<Long, Long>> dataSetInputReal = dataSetInput.map(mapFunction); |
| |
| |
| FlatMapFunction<Tuple2<Long, Long>, Long> flatMapFunction = new FlatMapFunction<Tuple2<Long, Long>, Long>() { |
| @Override |
| public void flatMap(Tuple2<Long, Long> longLongTuple2, Collector<Long> collector) throws Exception { |
| collector.collect(longLongTuple2.f0); |
| collector.collect(longLongTuple2.f1); |
| } |
| }; |
| |
| final DataSet<Long> pages = dataSetInputReal.flatMap(flatMapFunction).distinct(); |
| |
| int numPages = 0; |
| try { |
| numPages = (int) pages.count(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| // get input data |
| DataSet<Long> pagesInput = pages; |
| DataSet<Tuple2<Long, Long>> linksInput = dataSetInputReal; |
| |
| // assign initial rank to pages |
| DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput. |
| map(new RankAssigner((1.0d / numPages))); |
| |
| // build adjacency list from link input |
| DataSet<Tuple2<Long, Long[]>> adjacencyListInput = |
| linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList()); |
| |
| // set iterative data set |
| IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(this.numIterations); |
| |
| DataSet<Tuple2<Long, Double>> newRanks = iteration |
| // join pages with outgoing edges and distribute rank |
| .join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) |
| // collect and sum ranks |
| .groupBy(0).aggregate(SUM, 1) |
| // apply dampening factor |
| .map(new Dampener(DAMPENING_FACTOR, numPages)); |
| |
| DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( |
| newRanks, |
| newRanks.join(iteration).where(0).equalTo(0) |
| // termination condition |
| .filter(new EpsilonFilter())); |
| |
| |
| final DataSet<org.apache.wayang.basic.data.Tuple2<Long, Float>> dataSetOutput = finalPageRanks.map( |
| new MapFunction<Tuple2<Long, Double>, org.apache.wayang.basic.data.Tuple2<Long, Float>>() { |
| @Override |
| public org.apache.wayang.basic.data.Tuple2<Long, Float> map(Tuple2<Long, Double> longDoubleTuple2) throws Exception { |
| return new org.apache.wayang.basic.data.Tuple2<Long, Float>(longDoubleTuple2.f0, longDoubleTuple2.f1.floatValue()); |
| } |
| } |
| ); |
| |
| output.accept(dataSetOutput, flinkExecutor); |
| |
| return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); |
| } |
| |
| @Override |
| public boolean containsAction() { |
| return false; |
| } |
| |
| @Override |
| public Collection<String> getLoadProfileEstimatorConfigurationKeys() { |
| return Arrays.asList("wayang.flink.pagerank.load.main", "wayang.flink.pagerank.load.output"); |
| } |
| |
| @Override |
| public List<ChannelDescriptor> getSupportedInputChannels(int index) { |
| return Collections.singletonList(DataSetChannel.DESCRIPTOR); |
| } |
| |
| @Override |
| public List<ChannelDescriptor> getSupportedOutputChannels(int index) { |
| return Collections.singletonList(DataSetChannel.DESCRIPTOR); |
| } |
| |
| @Override |
| protected ExecutionOperator createCopy() { |
| return new FlinkPageRankOperator(this.numIterations); |
| } |
| // ************************************************************************* |
| // USER FUNCTIONS |
| // ************************************************************************* |
| |
| /** |
| * A map function that assigns an initial rank to all pages. |
| */ |
| public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> { |
| Tuple2<Long, Double> outPageWithRank; |
| |
| public RankAssigner(double rank) { |
| this.outPageWithRank = new Tuple2<Long, Double>(-1L, rank); |
| } |
| |
| @Override |
| public Tuple2<Long, Double> map(Long page) { |
| outPageWithRank.f0 = page; |
| return outPageWithRank; |
| } |
| } |
| |
| /** |
| * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges |
| * originate. Run as a pre-processing step. |
| */ |
| public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { |
| |
| private final ArrayList<Long> neighbors = new ArrayList<Long>(); |
| |
| @Override |
| public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) { |
| neighbors.clear(); |
| Long id = 0L; |
| |
| for (Tuple2<Long, Long> n : values) { |
| id = n.f0; |
| neighbors.add(n.f1); |
| } |
| out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()]))); |
| } |
| } |
| |
| /** |
| * Join function that distributes a fraction of a vertex's rank to all neighbors. |
| */ |
| public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { |
| |
| @Override |
| public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){ |
| Long[] neighbors = value.f1.f1; |
| double rank = value.f0.f1; |
| double rankToDistribute = rank / ((double) neighbors.length); |
| |
| for (Long neighbor: neighbors) { |
| out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute)); |
| } |
| } |
| } |
| |
| /** |
| * The function that applies the page rank dampening formula. |
| */ |
| public static final class Dampener implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> { |
| |
| private final double dampening; |
| private final double randomJump; |
| |
| public Dampener(double dampening, double numVertices) { |
| this.dampening = dampening; |
| this.randomJump = (1 - dampening) / numVertices; |
| } |
| |
| @Override |
| public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { |
| value.f1 = (value.f1 * dampening) + randomJump; |
| return value; |
| } |
| } |
| |
| /** |
| * Filter that filters vertices where the rank difference is below a threshold. |
| */ |
| public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { |
| |
| @Override |
| public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { |
| return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; |
| } |
| } |
| } |