blob: 49a9cc6a345e938ae4cffd6d3d0e026bc4277c54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.basic.operators;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.plan.wayangplan.EstimationContextProperty;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.types.DataSetType;
import java.util.Optional;
/**
* {@link Operator} for the PageRank algorithm. It takes as input a list of directed edges, whereby each edge
* is represented as {@code (source vertex ID, target vertex ID)} tuple. Its output are the page ranks, codified
* as {@code (vertex ID, page rank)} tuples.
*/
public class PageRankOperator extends UnaryToUnaryOperator<Tuple2<Long, Long>, Tuple2<Long, Float>> {
public static final double DEFAULT_DAMPING_FACTOR = 0.85d;
public static final ProbabilisticDoubleInterval DEFAULT_GRAPH_DENSITIY = new ProbabilisticDoubleInterval(.0001d, .5d, .5d);
@EstimationContextProperty
protected final Integer numIterations;
protected final float dampingFactor;
protected final ProbabilisticDoubleInterval graphDensity;
/**
* Creates a new instance.
*
* @param numIterations the number of PageRank iterations that this instance should perform
*/
public PageRankOperator(Integer numIterations) {
this(numIterations, DEFAULT_DAMPING_FACTOR, DEFAULT_GRAPH_DENSITIY);
}
/**
* Creates a new instance.
*
* @param numIterations the number of PageRank iterations that this instance should perform
*/
public PageRankOperator(Integer numIterations, Double dampingFactor, ProbabilisticDoubleInterval graphDensitiy) {
super(DataSetType.createDefaultUnchecked(Tuple2.class),
DataSetType.createDefaultUnchecked(Tuple2.class),
false);
this.numIterations = numIterations;
this.dampingFactor = dampingFactor.floatValue();
this.graphDensity = graphDensitiy;
}
/**
* Copies an instance (exclusive of broadcasts).
*
* @param that that should be copied
*/
public PageRankOperator(PageRankOperator that) {
super(that);
this.numIterations = that.getNumIterations();
this.dampingFactor = that.dampingFactor;
this.graphDensity = that.graphDensity;
}
public int getNumIterations() {
return numIterations;
}
public float getDampingFactor() {
return dampingFactor;
}
public ProbabilisticDoubleInterval getGraphDensity() {
return graphDensity;
}
@Override
public Optional<CardinalityEstimator> createCardinalityEstimator(int outputIndex, Configuration configuration) {
switch (outputIndex) {
case 0:
return Optional.of((optimizationContext, inputEstimates) -> {
assert inputEstimates.length == 1;
return new CardinalityEstimate(
calculateNumVertices(inputEstimates[0].getLowerEstimate(), PageRankOperator.this.graphDensity.getUpperEstimate()),
calculateNumVertices(inputEstimates[0].getUpperEstimate(), PageRankOperator.this.graphDensity.getLowerEstimate()),
inputEstimates[0].getCorrectnessProbability() * PageRankOperator.this.graphDensity.getCorrectnessProbability()
);
});
default:
throw new IllegalArgumentException(String.format("%s does not have an OutputSlot with index %d.", this, outputIndex));
}
}
/**
* Calculate the number of vertices in a graph with a given number of edges and density.
*
* @param numEdges number of edges in the graph
* @param density the graph density
* @return the number of vertices in the graph
*/
private static long calculateNumVertices(long numEdges, double density) {
return density == 0 ? 0L : Math.round(0.5d + Math.sqrt(0.25 + 2 * numEdges / density));
}
}