blob: 9a92ba506d55d48f9370e1856dd47aea75f82937 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.apps.crocopr
import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper, StdOut}
import org.apache.wayang.api.graph._
import org.apache.wayang.api.{DataQuanta, PlanBuilder}
import org.apache.wayang.apps.util.ProfileDBHelper
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.exception.WayangException
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.plugin.Plugin
import org.apache.wayang.core.util.fs.FileSystems
/**
* wayang implementation of the cross-community PageRank.
*/
class CrocoPR(plugins: Plugin*) {
/**
* Executes the cross-community PageRank on the given files.
*
* @param inputUrl1 URL to the first RDF N3 file
* @param inputUrl2 URL to the second RDF N3 file
* @return the page ranks
*/
def apply(inputUrl1: String, inputUrl2: String, numIterations: Int)
(implicit experiment: Experiment, configuration: Configuration) = {
// Initialize.
val wayangCtx = new WayangContext(configuration)
plugins.foreach(wayangCtx.register)
implicit val planBuilder = new PlanBuilder(wayangCtx)
.withExperiment(experiment)
.withJobName(s"CrocoPR ($inputUrl1, $inputUrl2, $numIterations iterations)")
.withUdfJarsOf(this.getClass)
// Read the input files.
val links1 = readLinks(inputUrl1)
val links2 = readLinks(inputUrl2)
// Merge the links.
val allLinks = links1
.union(links2).withName("Union links")
.distinct.withName("Distinct links")
// Create vertex IDs.
val vertexIds = allLinks
.flatMap(link => Seq(link._1, link._2)).withName("Flatten vertices")
.distinct.withName("Distinct vertices")
.zipWithId.withName("Add vertex IDs")
type VertexId = org.apache.wayang.basic.data.Tuple2[Vertex, String]
val edges = allLinks
.join[VertexId, String](_._1, vertexIds, _.field1).withName("Join source vertex IDs")
.map { linkAndVertexId =>
(linkAndVertexId.field1.field0, linkAndVertexId.field0._2)
}.withName("Set source vertex ID")
.join[VertexId, String](_._2, vertexIds, _.field1).withName("Join target vertex IDs")
.map(linkAndVertexId => new Edge(linkAndVertexId.field0._1, linkAndVertexId.field1.field0)).withName("Set target vertex ID")
// Run the PageRank.
val pageRanks = edges.pageRank(numIterations)
// Make the page ranks readable.
pageRanks
.map(identity).withName("Hotfix")
.join[VertexId, Long](_.field0, vertexIds, _.field0).withName("Join page ranks with vertex IDs")
.map(joinTuple => (joinTuple.field1.field1, joinTuple.field0.field1)).withName("Make page ranks readable")
.collect()
}
/**
* Reads and parses an input file.
*
* @param inputUrl URL to the file
* @param planBuilder used to build to create wayang operators
* @return [[DataQuanta]] representing the parsed file
*/
def readLinks(inputUrl: String)(implicit planBuilder: PlanBuilder): DataQuanta[(String, String)] = {
val linkPattern =
"""<http://dbpedia.org/resource/([^>]+)>\s+<http://dbpedia.org/ontology/wikiPageWikiLink>\s+<http://dbpedia.org/resource/([^>]+)>\s+\.""".r
planBuilder
.readTextFile(inputUrl).withName(s"Load $inputUrl")
.filter(!_.startsWith("#")).withName("Filter comments")
.map {
case linkPattern(source, target) => (source, target)
case str => throw new WayangException(s"Cannot parse $str.")
}.withName("Parse lines")
}
}
/**
* Companion object for [[CrocoPR]].
*/
object CrocoPR extends ExperimentDescriptor {
override def version = "0.1.0"
def main(args: Array[String]) {
// Parse parameters.
if (args.isEmpty) {
sys.error(s"Usage: <main class> ${Parameters.experimentHelp} <plugin>(,<plugin>)* <input URL1> <input URL2> <#iterations>")
sys.exit(1)
}
implicit val configuration = new Configuration
implicit val experiment = Parameters.createExperiment(args(0), this)
val plugins = Parameters.loadPlugins(args(1))
experiment.getSubject.addConfiguration("plugins", args(1))
val inputUrl1 = args(2)
experiment.getSubject.addConfiguration("input1", inputUrl1)
val inputUrl2 = args(3)
experiment.getSubject.addConfiguration("input2", inputUrl2)
val numIterations = args(4).toInt
experiment.getSubject.addConfiguration("iterations", numIterations)
// Prepare the PageRank.
val pageRank = new CrocoPR(plugins: _*)
// Run the PageRank.
val pageRanks = pageRank(inputUrl1, inputUrl2, numIterations).toSeq.sortBy(-_._2)
// Store experiment data.
val inputFileSize1 = FileSystems.getFileSize(inputUrl1)
if (inputFileSize1.isPresent) experiment.getSubject.addConfiguration("inputSize1", inputFileSize1.getAsLong)
val inputFileSize2 = FileSystems.getFileSize(inputUrl2)
if (inputFileSize2.isPresent) experiment.getSubject.addConfiguration("inputSize2", inputFileSize2.getAsLong)
ProfileDBHelper.store(experiment, configuration)
// Print the result.
println(s"Calculated ${pageRanks.size} page ranks:")
StdOut.printLimited(pageRanks, formatter = (pr: (String, java.lang.Float)) => s"${pr._1} has a page rank of ${pr._2}")
}
}