blob: f189f7038cffc5790c350cbe81ac206dc71ce2e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.apps.sindy
import java.util
import org.apache.wayang.api._
import org.apache.wayang.apps.sindy.Sindy.{CellCreator, CellMerger, IndCandidateGenerator, IndCandidateMerger}
import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper, StdOut}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction}
import org.apache.wayang.core.plugin.Plugin
import org.apache.wayang.core.util.fs.FileSystems
/**
* This is a Apache Wayang-based implementation of the SINDY algorithm.
*/
class Sindy(plugins: Plugin*) {
/**
* Execute the SINDY algorithm.
*
* @param paths input file or directory URLs
* @param seperator CSV separator in the files
* @param configuration Apache Wayang configuration
* @param experiment the experiment to log measurements to
* @return the INDs
*/
def apply(paths: Seq[String], seperator: Char = ';')
(implicit configuration: Configuration, experiment: Experiment) = {
val wayangContext = new WayangContext(configuration)
plugins.foreach(wayangContext.register)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"Sindy ($paths)")
.withExperiment(experiment)
.withUdfJarsOf(classOf[Sindy])
val fileColumnIdOffsets = paths.flatMap(resolveDirs).zipWithIndex.map { case (url, index) => (url, index * 1000) }
val allCells = fileColumnIdOffsets
.map { case (path, offset) =>
planBuilder
.readTextFile(path).withName(s"Load $path")
.flatMapJava(new CellCreator(offset, seperator)).withName(s"Create cells for $path")
}
.reduce(_ union _)
val rawInds = allCells
.map(cell => (cell._1, Array(cell._2))).withName("Prepare cell merging")
.reduceByKeyJava(toSerializableFunction(_._1), new CellMerger).withName("Merge cells")
.flatMapJava(new IndCandidateGenerator).withName("Generate IND candidate sets")
.reduceByKeyJava(toSerializableFunction(_._1), new IndCandidateMerger).withName("Merge IND candidate sets")
.filter(_._2.length > 0).withName("Filter empty candidate sets")
.collect()
def resolveColumnId(id: Int) = fileColumnIdOffsets
.find { case (file, offset) => offset <= id && offset + 1000 > id } match {
case Some((file, offset)) => s"$file[${id - offset}]"
case _ => s"???[$id]"
}
rawInds.map {
case (dep, refs) => (s"${resolveColumnId(dep)}", refs.map(resolveColumnId).toSeq)
}
}
/**
* If the given URL is a directory, list all its files recursively.
*
* @param url the URL
* @return the file URLs
*/
def resolveDirs(url: String): scala.Iterable[String] = {
import scala.collection.JavaConversions._
val fs = FileSystems.requireFileSystem(url)
if (fs.isDirectory(url)) fs.listChildren(url).flatMap(resolveDirs) else Seq(url)
}
}
/**
* Companion object for [[Sindy]].
*/
object Sindy extends ExperimentDescriptor {
def version = "0.1.0"
def main(args: Array[String]): Unit = {
// Parse parameters.
if (args.isEmpty) {
sys.error(s"Usage: <main class> ${Parameters.experimentHelp} <plugin>(,<plugin>)* <CSV separator> <input URL>(;<input URL>)*")
sys.exit(1)
}
implicit val configuration = new Configuration
implicit val experiment = Parameters.createExperiment(args(0), this)
val plugins = Parameters.loadPlugins(args(1))
experiment.getSubject.addConfiguration("plugins", args(1))
val separator = if (args(2).length == 1) args(2).charAt(0) else args(2) match {
case "tab" => '\t'
case "\\t" => '\t'
case "comma" => ','
case "semicolon" => ';'
case "\\|" => '|'
case "pipe" => '|'
case other: String => throw new IllegalArgumentException("Unknown separator.")
}
val inputUrls = args(3).split(";")
experiment.getSubject.addConfiguration("inputs", inputUrls)
// Prepare the PageRank.
val sindy = new Sindy(plugins: _*)
// Run the PageRank.
val inds = sindy(inputUrls.toSeq, separator).toSeq
// Store experiment data.
val inputFileSizes = inputUrls.map(url => FileSystems.getFileSize(url))
if (inputFileSizes.forall(_.isPresent))
experiment.getSubject.addConfiguration("inputSize", inputFileSizes.map(_.getAsLong).sum)
ProfileDBHelper.store(experiment, configuration)
// Print the result.
StdOut.printLimited(inds)
}
/**
* UDF to parse a CSV row and create cells.
*
* @param offset the column ID offset for the input CSV rows
*/
class CellCreator(val offset: Int, val separator: Char) extends SerializableFunction[String, java.lang.Iterable[(String, Int)]] {
override def apply(row: String): java.lang.Iterable[(String, Int)] = {
val fields = row.split(separator)
val cells = new util.ArrayList[(String, Int)](fields.length)
var columnId = offset
for (field <- fields) {
cells.add((field, columnId))
columnId += 1
}
cells
}
}
/**
* UDF to merge the column IDs of two cells.
*/
class CellMerger extends SerializableBinaryOperator[(String, Array[Int])] {
import scala.collection.mutable
lazy val merger = mutable.Set[Int]()
override def apply(cell1: (String, Array[Int]), cell2: (String, Array[Int])): (String, Array[Int]) = {
merger.clear()
for (columnId <- cell1._2) merger += columnId
for (columnId <- cell2._2) merger += columnId
(cell1._1, merger.toArray)
}
}
/**
* UDF to create IND candidates from a cell group.
*/
class IndCandidateGenerator extends SerializableFunction[(String, Array[Int]), java.lang.Iterable[(Int, Array[Int])]] {
override def apply(cellGroup: (String, Array[Int])): java.lang.Iterable[(Int, Array[Int])] = {
val columnIds = cellGroup._2
val result = new util.ArrayList[(Int, Array[Int])](columnIds.length)
for (i <- columnIds.indices) {
val refColumnIds = new Array[Int](columnIds.length - 1)
java.lang.System.arraycopy(columnIds, 0, refColumnIds, 0, i)
java.lang.System.arraycopy(columnIds, i + 1, refColumnIds, i, refColumnIds.length - i)
result.add((columnIds(i), refColumnIds))
}
result
}
}
/**
* UDF to merge two IND candidates.
*/
class IndCandidateMerger extends SerializableBinaryOperator[(Int, Array[Int])] {
import scala.collection.mutable
lazy val merger = mutable.Set[Int]()
override def apply(indc1: (Int, Array[Int]), indc2: (Int, Array[Int])): (Int, Array[Int]) = {
merger.clear()
for (columnId <- indc1._2) merger += columnId
(indc1._1, indc2._2.filter(merger.contains))
}
}
}