blob: 0ab7249a8218578d8e0a57368ed47d8bab67a1f1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This is an example implementation of PageRank. For more conventional use,
Please refer to PageRank implementation provided by graphx
Example Usage:
bin/spark-submit examples/src/main/python/pagerank.py data/mllib/pagerank_data.txt 10
"""
import re
import sys
from operator import add
from pyspark.sql import SparkSession
def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)
def parseNeighbors(urls):
"""Parses a urls pair string into urls pair."""
parts = re.split(r'\s+', urls)
return parts[0], parts[1]
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: pagerank <file> <iterations>", file=sys.stderr)
sys.exit(-1)
print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
"Please refer to PageRank implementation provided by graphx",
file=sys.stderr)
# Initialize the spark context.
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print("%s has rank: %s." % (link, rank))
spark.stop()