blob: 29b4965582fa4349523526048aec3d36c39112d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api
import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
import java.util.{Collection => JavaCollection}
import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan}
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
/**
* Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
* Java API for Wayang that compensates for lacking default and named arguments.
*/
trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging {
/**
* The type of the [[DataQuanta]] to be built.
*/
protected[api] def outputTypeTrap: TypeTrap
/**
* Provide a [[JavaPlanBuilder]] to which this instance is associated.
*/
protected[api] implicit def javaPlanBuilder: JavaPlanBuilder
/**
* Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s.
*
* @param name the name
* @return this instance
*/
def withName(name: String): This
/**
* Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]].
*
* @param experiment the [[Experiment]]
* @return this instance
*/
def withExperiment(experiment: Experiment): This
/**
* Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not
* always necessary to set it and that it can be inferred in some situations.
*
* @param outputType the output [[DataSetType]]
* @return this instance
*/
def withOutputType(outputType: DataSetType[Out]): This
/**
* Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not
* always necessary to set it and that it can be inferred in some situations.
*
* @param cls the output [[Class]]
* @return this instance
*/
def withOutputClass(cls: Class[Out]): This
/**
* Register a broadcast with the [[DataQuanta]] to be built
*
* @param sender a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]]
* @param broadcastName the name of the broadcast
* @return this instance
*/
def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This
/**
* Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]].
*
* @param cardinalityEstimator the [[CardinalityEstimator]]
* @return this instance
*/
def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This
/**
* Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked
* multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions.
*
* @param platform the [[CardinalityEstimator]]
* @return this instance
*/
def withTargetPlatform(platform: Platform): This
/**
* Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]].
*
* @param cls the [[Class]]
* @return this instance
*/
def withUdfJarOf(cls: Class[_]): This
/**
* Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]].
*
* @param path the path of the JAR file
* @return this instance
*/
def withUdfJar(path: String): This
/**
* Provide a [[ClassTag]] for the constructed [[DataQuanta]].
*
* @return the [[ClassTag]]
*/
protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
/**
* Feed the built [[DataQuanta]] into a [[MapOperator]].
*
* @param udf the UDF for the [[MapOperator]]
* @return a [[MapDataQuantaBuilder]]
*/
def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf)
/**
* Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]].
*
* @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
* @return a [[MapDataQuantaBuilder]]
*/
def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]].
*
* @param udf filter UDF
* @return a [[FilterDataQuantaBuilder]]
*/
def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]].
*
* @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
* @return a [[FlatMapDataQuantaBuilder]]
*/
def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]].
*
* @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
* @return a [[MapPartitionsDataQuantaBuilder]]
*/
def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) =
new MapPartitionsDataQuantaBuilder(this, udf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
*
* @param sampleSize the absolute size of the sample
* @return a [[SampleDataQuantaBuilder]]
*/
def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
override def applyAsInt(operand: Int): Int = sampleSize
})
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
*
* @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
* @return a [[SampleDataQuantaBuilder]]
*/
def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
/**
* Annotates a key to this instance.
* @param keyExtractor extracts the key from the data quanta
* @return a [[KeyedDataQuantaBuilder]]
*/
def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor)
/**
* Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]].
*
* @param udf the UDF for the [[GlobalReduceOperator]]
* @return a [[GlobalReduceDataQuantaBuilder]]
*/
def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]].
*
* @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
* @param udf the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
* @return a [[ReduceByDataQuantaBuilder]]
*/
def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) =
new ReduceByDataQuantaBuilder(this, keyUdf, udf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]].
*
* @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
* @return a [[GroupByDataQuantaBuilder]]
*/
def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) =
new GroupByDataQuantaBuilder(this, keyUdf)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]].
*
* @return a [[GlobalGroupDataQuantaBuilder]]
*/
def group() = new GlobalGroupDataQuantaBuilder(this)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.UnionAllOperator]].
*
* @param that the other [[DataQuantaBuilder]] to union with
* @return a [[UnionDataQuantaBuilder]]
*/
def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.IntersectOperator]].
*
* @param that the other [[DataQuantaBuilder]] to intersect with
* @return an [[IntersectDataQuantaBuilder]]
*/
def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.JoinOperator]].
*
* @param thisKeyUdf the key extraction UDF for this instance
* @param that the other [[DataQuantaBuilder]] to join with
* @param thatKeyUdf the key extraction UDF for `that` instance
* @return a [[JoinDataQuantaBuilder]]
*/
def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
that: DataQuantaBuilder[_, ThatOut],
thatKeyUdf: SerializableFunction[ThatOut, Key]) =
new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.CoGroupOperator]].
*
* @param thisKeyUdf the key extraction UDF for this instance
* @param that the other [[DataQuantaBuilder]] to join with
* @param thatKeyUdf the key extraction UDF for `that` instance
* @return a [[CoGroupDataQuantaBuilder]]
*/
def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
that: DataQuantaBuilder[_, ThatOut],
thatKeyUdf: SerializableFunction[ThatOut, Key]) =
new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.SortOperator]].
*
* @param keyUdf the key extraction UDF for this instance
* @return a [[SortDataQuantaBuilder]]
*/
def sort[Key](keyUdf: SerializableFunction[Out, Key]) =
new SortDataQuantaBuilder(this, keyUdf)
/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.CartesianOperator]].
*
* @return a [[CartesianDataQuantaBuilder]]
*/
def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]].
*
* @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output
*/
def zipWithId = new ZipWithIdDataQuantaBuilder(this)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]].
*
* @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output
*/
def distinct = new DistinctDataQuantaBuilder(this)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]].
*
* @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output
*/
def count = new CountDataQuantaBuilder(this)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]].
*
* @return a [[DoWhileDataQuantaBuilder]]
*/
def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]],
bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) =
new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]].
*
* @return a [[DoWhileDataQuantaBuilder]]
*/
def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) =
new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder)
/**
* Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]]
* and a single [[OutputSlot]].
*
* @param operator the custom [[Operator]]
* @tparam T the type of the output [[DataQuanta]]
* @return a [[CustomOperatorDataQuantaBuilder]]
*/
def customOperator[T](operator: Operator) = {
assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.")
assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.")
new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this)
}
/**
* Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers
* execution of the constructed [[WayangPlan]].
*
* @return the collected data quanta
*/
def collect(): JavaCollection[Out] = {
import scala.collection.JavaConversions._
this.dataQuanta().collect()
}
/**
* Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers
* execution of the constructed [[WayangPlan]].
*
* @param f the [[JavaFunction]]
* @return the collected data quanta
*/
def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
* execution of the constructed [[WayangPlan]].
*
* @param url the URL of the file to be written
* @param jobName optional name for the [[WayangPlan]]
* @return the collected data quanta
*/
def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit =
this.writeTextFile(url, formatterUdf, jobName, null)
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
* execution of the constructed [[WayangPlan]].
*
* @param url the URL of the file to be written
* @return the collected data quanta
*/
def writeTextFile(url: String,
formatterUdf: SerializableFunction[Out, String],
jobName: String,
udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
this.javaPlanBuilder.withJobName(jobName)
this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
}
/**
* Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
* type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
* operation is applicable.
*
* @return a [[RecordDataQuantaBuilder]]
*/
def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = {
this match {
case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]]
case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]])
}
}
/**
* Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of
* type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this
* operation is applicable.
*
* @return a [[EdgeDataQuantaBuilder]]
*/
def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = {
this match {
case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]]
case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]])
}
}
/**
* Get or create the [[DataQuanta]] built by this instance.
*
* @return the [[DataQuanta]]
*/
protected[api] def dataQuanta(): DataQuanta[Out]
}
/**
* Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
* Java API for Wayang that compensates for lacking default and named arguments.
*/
abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
extends Logging with DataQuantaBuilder[This, Out] {
/**
* Lazy-initialized. The [[DataQuanta]] product of this builder.
*/
private var result: DataQuanta[Out] = _
/**
* A name for the [[DataQuanta]] to be built.
*/
private var name: String = _
/**
* An [[Experiment]] for the [[DataQuanta]] to be built.
*/
private var experiment: Experiment = _
/**
* Broadcasts for the [[DataQuanta]] to be built.
*/
private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
/**
* [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
*/
private var cardinalityEstimator: CardinalityEstimator = _
/**
* Target [[Platform]]s for the [[DataQuanta]] to be built.
*/
private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
/**
* Paths of UDF JAR files for the [[DataQuanta]] to be built.
*/
private val udfJars: ListBuffer[String] = ListBuffer()
/**
* The type of the [[DataQuanta]] to be built.
*/
protected[api] val outputTypeTrap = getOutputTypeTrap
/**
* Retrieve an intialization value for [[outputTypeTrap]].
*
* @return the [[TypeTrap]]
*/
protected def getOutputTypeTrap = new TypeTrap
override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
override def withName(name: String): This = {
this.name = name
this.asInstanceOf[This]
}
override def withExperiment(experiment: Experiment): This = {
this.experiment = experiment
this.asInstanceOf[This]
}
override def withOutputType(outputType: DataSetType[Out]): This = {
this.outputTypeTrap.dataSetType = outputType
this.asInstanceOf[This]
}
override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
this.broadcasts += Tuple2(broadcastName, sender)
this.asInstanceOf[This]
}
override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
this.cardinalityEstimator = cardinalityEstimator
this.asInstanceOf[This]
}
override def withTargetPlatform(platform: Platform): This = {
this.targetPlatforms += platform
this.asInstanceOf[This]
}
def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
override def withUdfJar(path: String): This = {
this.udfJars += path
this.asInstanceOf[This]
}
override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
override protected[api] def dataQuanta(): DataQuanta[Out] = {
if (this.result == null) {
this.result = this.build
if (this.name != null) this.result.withName(this.name)
if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
if (this.experiment != null) this.result.withExperiment(experiment)
this.result.withUdfJars(this.udfJars: _*)
this.result.withTargetPlatforms(this.targetPlatforms: _*)
this.broadcasts.foreach {
case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
}
}
this.result
}
/**
* Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
*
* @return the created and partially configured [[DataQuanta]]
*/
protected def build: DataQuanta[Out]
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s.
*
* @param source the [[UnarySource]]
* @param javaPlanBuilder the [[JavaPlanBuilder]]
*/
class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[This, Out] {
override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s.
*
* @param collection the [[JavaCollection]] to be loaded
* @param javaPlanBuilder the [[JavaPlanBuilder]]
*/
class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] {
// Try to infer the type class from the collection.
locally {
if (!collection.isEmpty) {
val any = WayangCollections.getAny(collection)
if (any != null) {
this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass)
}
}
}
override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[MapOperator]]
*/
class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
udf: SerializableFunction[In, Out])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] {
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
// Try to infer the type classes from the udf.
locally {
val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
parameters.get("Output") match {
case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
* [[org.apache.wayang.basic.function.ProjectionDescriptor]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
*/
class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] {
override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[MapOperator]]
*/
class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
/** Selectivity of the filter predicate. */
private var selectivity: ProbabilisticDoubleInterval = _
/** SQL UDF implementing the filter predicate. */
private var sqlUdf: String = _
// Try to infer the type classes from the udf.
locally {
val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
/**
* Add a SQL implementation of the UDF.
*
* @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause
* @return this instance
*/
def withSqlUdf(sqlUdf: String) = {
this.sqlUdf = sqlUdf
this
}
/**
* Specify the selectivity of the UDF.
*
* @param lowerEstimate the lower bound of the expected selectivity
* @param upperEstimate the upper bound of the expected selectivity
* @param confidence the probability of the actual selectivity being within these bounds
* @return this instance
*/
def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
this
}
override protected def build = inputDataQuanta.dataQuanta().filterJava(
udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator
)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param keyUdf UDF for the [[org.apache.wayang.basic.operators.SortOperator]]
*/
class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T],
keyUdf: SerializableFunction[T, Key])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
/** [[ClassTag]] or surrogate of [[Key]] */
implicit var keyTag: ClassTag[Key] = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
private var keyUdfCpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
private var keyUdfRamEstimator: LoadEstimator = _
// Try to infer the type classes from the UDFs.
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
/**
* Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdfCpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdfRamEstimator = udfRamEstimator
this
}
override protected def build =
inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
*/
class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
udf: SerializableFunction[In, java.lang.Iterable[Out]])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] {
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
/** Selectivity of the filter predicate. */
private var selectivity: ProbabilisticDoubleInterval = _
// Try to infer the type classes from the udf.
locally {
val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
originalClass match {
case cls: Class[Out] => {
this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
}
case _ => logger.warn("Could not infer types from {}.", udf)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
/**
* Specify the selectivity of the UDF.
*
* @param lowerEstimate the lower bound of the expected selectivity
* @param upperEstimate the upper bound of the expected selectivity
* @param confidence the probability of the actual selectivity being within these bounds
* @return this instance
*/
def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
this
}
override protected def build = inputDataQuanta.dataQuanta().flatMapJava(
udf, this.selectivity, this.udfLoadProfileEstimator
)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
*/
class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] {
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
/** Selectivity of the filter predicate. */
private var selectivity: ProbabilisticDoubleInterval = _
// Try to infer the type classes from the udf.
locally {
val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In] => {
inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
}
case _ => logger.warn("Could not infer types from {}.", udf)
}
val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
originalClass match {
case cls: Class[Out] => {
this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
}
case _ => logger.warn("Could not infer types from {}.", udf)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
/**
* Specify the selectivity of the UDF.
*
* @param lowerEstimate the lower bound of the expected selectivity
* @param upperEstimate the upper bound of the expected selectivity
* @param confidence the probability of the actual selectivity being within these bounds
* @return this instance
*/
def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
this
}
override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava(
udf, this.selectivity, this.udfLoadProfileEstimator
)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
*/
class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
/**
* Size of the dataset to be sampled.
*/
private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE
/**
* Sampling method to use.
*/
private var sampleMethod = SampleOperator.Methods.ANY
/**
* Seed to use.
*/
private var seed: Option[Long] = None
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
/**
* Set the size of the dataset that should be sampled.
*
* @param datasetSize the size of the dataset
* @return this instance
*/
def withDatasetSize(datasetSize: Long) = {
this.datasetSize = datasetSize
this
}
/**
* Set the sample method to be used.
*
* @param sampleMethod the sample method
* @return this instance
*/
def withSampleMethod(sampleMethod: SampleOperator.Methods) = {
this.sampleMethod = sampleMethod
this
}
/**
* Set the sample method to be used.
*
* @param seed
* @return this instance
*/
def withSeed(seed: Long) = {
this.seed = Some(seed)
this
}
override protected def build =
inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
* @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
*/
class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T],
keyUdf: SerializableFunction[T, Key],
udf: SerializableBinaryOperator[T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
implicit var keyTag: ClassTag[Key] = _
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
// TODO: Add these estimators.
// /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
// private var keyUdfCpuEstimator: LoadEstimator = _
//
// /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
// private var keyUdfRamEstimator: LoadEstimator = _
// Try to infer the type classes from the UDFs.
locally {
var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
parameters.get("Type") match {
case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
override protected def build =
inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
*/
class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] {
implicit var keyTag: ClassTag[Key] = _
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */
private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _
// Try to infer the type classes from the UDFs.
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator
this
}
override protected def build =
inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
*/
class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] {
override protected def build = inputDataQuanta.dataQuanta().group()
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param udf UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]]
*/
class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
udf: SerializableBinaryOperator[T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
// Try to infer the type classes from the udf.
locally {
val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
parameters.get("Type") match {
case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", udf)
}
}
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
*/
class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
inputDataQuanta1: DataQuantaBuilder[_, T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] {
override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta())
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
*/
class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
inputDataQuanta1: DataQuantaBuilder[_, T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] {
override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta())
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
* @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
*/
class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
inputDataQuanta1: DataQuantaBuilder[_, In1],
keyUdf0: SerializableFunction[In0, Key],
keyUdf1: SerializableFunction[In1, Key])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], RT2[In0, In1]] {
/** [[ClassTag]] or surrogate of [[Key]] */
implicit var keyTag: ClassTag[Key] = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
private var keyUdf0CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
private var keyUdf0RamEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
private var keyUdf1CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
private var keyUdf1RamEstimator: LoadEstimator = _
// Try to infer the type classes from the UDFs.
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
// Since we are currently not looking at type parameters, we can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
}
/**
* Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf0CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf0RamEstimator = udfRamEstimator
this
}
/**
* Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf1CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf1RamEstimator = udfRamEstimator
this
}
/**
* Assemble the joined elements to new elements.
*
* @param udf produces a joined element from two joinable elements
* @return a new [[DataQuantaBuilder]] representing the assembled join product
*/
def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
this.map(new SerializableFunction[RT2[In0, In1], NewOut] {
override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
})
override protected def build =
inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
* @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
*/
class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
inputDataQuanta1: DataQuantaBuilder[_, In1],
keyUdf0: SerializableFunction[In0, Key],
keyUdf1: SerializableFunction[In1, Key])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], RT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] {
/** [[ClassTag]] or surrogate of [[Key]] */
implicit var keyTag: ClassTag[Key] = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
private var keyUdf0CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
private var keyUdf0RamEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
private var keyUdf1CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
private var keyUdf1RamEstimator: LoadEstimator = _
// Try to infer the type classes from the UDFs.
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
// Since we are currently not looking at type parameters, we can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
}
/**
* Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf0CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf0RamEstimator = udfRamEstimator
this
}
/**
* Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf1CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf1RamEstimator = udfRamEstimator
this
}
override protected def build =
inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
*/
class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
inputDataQuanta1: DataQuantaBuilder[_, In1])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], RT2[In0, In1]] {
// Since we are currently not looking at type parameters, we can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
}
override protected def build =
inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag)
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
*/
class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], RT2[java.lang.Long, T]] {
// Since we are currently not looking at type parameters, we can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
}
override protected def build = inputDataQuanta.dataQuanta().zipWithId
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
*/
class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
override protected def build = inputDataQuanta.dataQuanta().distinct
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
*/
class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] {
// We can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long]
}
override protected def build = inputDataQuanta.dataQuanta().count
}
/**
* [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer
* any convenience methods, though.
*
* @param operator the custom [[org.apache.wayang.core.plan.wayangplan.Operator]]
* @param outputIndex index of the [[OutputSlot]] addressed by the new instance
* @param buildCache a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]]
* @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]]
* @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]]
*/
class CustomOperatorDataQuantaBuilder[T](operator: Operator,
outputIndex: Int,
buildCache: DataQuantaBuilderCache,
inputDataQuanta: DataQuantaBuilder[_, _]*)
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] {
override protected def build = {
// If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once.
if (!buildCache.hasCached) {
val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*)
buildCache.cache(dataQuanta)
}
buildCache(outputIndex)
}
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param conditionUdf UDF for the looping condition
* @param bodyBuilder builds the loop body
*/
class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T],
conditionUdf: SerializablePredicate[JavaCollection[ConvOut]],
bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] {
// TODO: Get the ClassTag right.
implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
// TODO: We could improve by combining the TypeTraps in the body loop.
/** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */
private var udfLoadProfileEstimator: LoadProfileEstimator = _
/** Number of expected iterations. */
private var numExpectedIterations = 20
/**
* Set a [[LoadProfileEstimator]] for the load of the UDF.
*
* @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
* @return this instance
*/
def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
this.udfLoadProfileEstimator = udfLoadProfileEstimator
this
}
/**
* Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not
* always necessary to set it and that it can be inferred in some situations.
*
* @param outputType the output [[DataSetType]]
* @return this instance
*/
def withConditionType(outputType: DataSetType[ConvOut]) = {
this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass)
this
}
/**
* Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not
* always necessary to set it and that it can be inferred in some situations.
*
* @param cls the output [[Class]]
* @return this instance
*/
def withConditionClass(cls: Class[ConvOut]) = {
this.convOutClassTag = ClassTag(cls)
this
}
/**
* Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]].
*
* @param numExpectedIterations the expected number of iterations
* @return this instance
*/
def withExpectedNumberOfIterations(numExpectedIterations: Int) = {
this.numExpectedIterations = numExpectedIterations
this
}
override protected def build =
inputDataQuanta.dataQuanta().doWhileJava[ConvOut](
conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator
)(this.convOutClassTag)
/**
* Create a loop body builder that is based on [[DataQuanta]].
*
* @return the loop body builder
*/
private def dataQuantaBodyBuilder =
new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] {
override def apply(loopStart: DataQuanta[T]) = {
val loopStartBuilder = new FakeDataQuantaBuilder(loopStart)
val loopEndBuilders = bodyBuilder(loopStartBuilder)
new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta())
}
}
}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
*
* @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
* @param numRepetitions number of repetitions of the loop
* @param bodyBuilder builds the loop body
*/
class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
numRepetitions: Int,
bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] {
// Reuse the input TypeTrap to enforce type equality between input and output.
override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
// TODO: We could improve by combining the TypeTraps in the body loop.
override protected def build =
inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => {
val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta)
bodyBuilder(loopStartbuilder).dataQuanta()
})
}
/**
* Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter.
*
* @param _dataQuanta the wrapped [[DataQuanta]]
*/
class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] {
override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass)
override def dataQuanta() = _dataQuanta
/**
* Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
*
* @return the created and partially configured [[DataQuanta]]
*/
override protected def build: DataQuanta[T] = _dataQuanta
}
/**
* This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key.
*/
class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out],
private val keyExtractor: SerializableFunction[Out, Key])
(implicit javaPlanBuilder: JavaPlanBuilder) {
/**
* Joins this instance with the given one via their keys.
*
* @param that the instance to join with
* @return a [[DataQuantaBuilder]] representing the join product
*/
def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
/**
* Co-groups this instance with the given one via their keys.
*
* @param that the instance to join with
* @return a [[DataQuantaBuilder]] representing the co-group product
*/
def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
}