blob: a6151e61022eb30193d43981e9c4d8832dbc1326 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api
/**
* TODO: add unitary test to the elements in the file org.apache.wayang.api.PlanBuilder.scala
* labels: unitary-test,todo
*/
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.api.dataquanta.DataQuanta
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.util.ReflectionUtils
import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.reflect._
/**
* Utility to build [[WayangPlan]]s.
*/
class PlanBuilder(wayangContext: WayangContext, private var jobName: String = null) {
private[api] val sinks = ListBuffer[Operator]()
private val udfJars = scala.collection.mutable.Set[String]()
private var experiment: Experiment = _
// We need to ensure that this module is shipped to Spark etc. in particular because of the Scala-to-Java function wrappers.
ReflectionUtils.getDeclaringJar(this) match {
case path: String => udfJars += path
case _ =>
}
/**
* Defines user-code JAR files that might be needed to transfer to execution platforms.
*
* @param paths paths to JAR files that should be transferred
* @return this instance
*/
def withUdfJars(paths: String*) = {
this.udfJars ++= paths
this
}
/**
* Defines the [[Experiment]] that should collects metrics of the [[WayangPlan]].
*
* @param experiment the [[Experiment]]
* @return this instance
*/
def withExperiment(experiment: Experiment) = {
this.experiment = experiment
this
}
/**
* Defines the name for the [[WayangPlan]] that is being created.
*
* @param jobName the name
* @return this instance
*/
def withJobName(jobName: String) = {
this.jobName = jobName
this
}
/**
* Defines user-code JAR files that might be needed to transfer to execution platforms.
*
* @param classes whose JAR files should be transferred
* @return this instance
*/
def withUdfJarsOf(classes: Class[_]*) =
withUdfJars(classes.map(ReflectionUtils.getDeclaringJar).filterNot(_ == null): _*)
/**
* Build the [[org.apache.wayang.core.api.Job]] and execute it.
*/
def buildAndExecute(): Unit = {
val plan: WayangPlan = new WayangPlan(this.sinks.toArray: _*)
if (this.experiment == null) this.wayangContext.execute(jobName, plan, this.udfJars.toArray: _*)
else this.wayangContext.execute(jobName, plan, this.experiment, this.udfJars.toArray: _*)
}
/**
* Read a text file and provide it as a dataset of [[String]]s, one per line.
*
* @param url the URL of the text file
* @return [[DataQuanta]] representing the file
*/
def readTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url))
/**
* Read a object's file and provide it as a dataset of [[Object]]s.
*
* @param url the URL of the Object's file
* @return [[DataQuanta]] representing the file
*/
def readObjectFile[T: ClassTag](url: String): DataQuanta[T] = load(new ObjectFileSource(url, dataSetType[T]))
/**
* Reads a database table and provides them as a dataset of [[Record]]s.
*
* @param source from that the [[Record]]s should be read
* @return [[DataQuanta]] of [[Record]]s in the table
*/
def readTable(source: TableSource): DataQuanta[Record] = load(source)
/**
* Loads a [[java.util.Collection]] into Wayang and represents them as [[DataQuanta]].
*
* @param collection to be loaded
* @return [[DataQuanta]] the `collection`
*/
def loadCollection[T: ClassTag](collection: java.util.Collection[T]): DataQuanta[T] =
load(new CollectionSource[T](collection, dataSetType[T]))
/**
* Loads a [[Iterable]] into Wayang and represents it as [[DataQuanta]].
*
* @param iterable to be loaded
* @return [[DataQuanta]] the `iterable`
*/
def loadCollection[T: ClassTag](iterable: Iterable[T]): DataQuanta[T] =
loadCollection(JavaConversions.asJavaCollection(iterable))
/**
* Load [[DataQuanta]] from an arbitrary [[UnarySource]].
*
* @param source that should be loaded from
* @return the [[DataQuanta]]
*/
def load[T: ClassTag](source: UnarySource[T]): DataQuanta[T] = wrap(source)
/**
* Execute a custom [[Operator]].
*
* @param operator that should be executed
* @param inputs the input [[DataQuanta]] of the `operator`, aligned with its [[InputSlot]]s
* @return an [[IndexedSeq]] of the `operator`s output [[DataQuanta]], aligned with its [[OutputSlot]]s
*/
def customOperator(operator: Operator, inputs: DataQuanta[_]*): IndexedSeq[DataQuanta[_]] = {
Validate.isTrue(operator.getNumRegularInputs == inputs.size)
// Set up inputs.
inputs.zipWithIndex.foreach(zipped => zipped._1.connectTo(operator, zipped._2))
// Set up outputs.
for (outputIndex <- 0 until operator.getNumOutputs) yield DataQuanta.create(operator.getOutput(outputIndex))(this)
}
implicit private[api] def wrap[T: ClassTag](operator: ElementaryOperator): DataQuanta[T] =
PlanBuilder.wrap[T](operator)(classTag[T], this)
}
object PlanBuilder {
implicit private[api] def wrap[T: ClassTag](operator: ElementaryOperator)(implicit planBuilder: PlanBuilder): DataQuanta[T] =
api.wrap[T](operator)
}