| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.api |
| |
| |
| import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction} |
| import java.util.{Collection => JavaCollection} |
| import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator} |
| import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap} |
| import org.apache.wayang.basic.data.{Record, Tuple2 => RT2} |
| import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator} |
| import org.apache.wayang.commons.util.profiledb.model.Experiment |
| import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate} |
| import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator |
| import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator} |
| import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan} |
| import org.apache.wayang.core.platform.Platform |
| import org.apache.wayang.core.types.DataSetType |
| import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple} |
| |
| import scala.collection.mutable.ListBuffer |
| import scala.reflect.ClassTag |
| |
| /** |
| * Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient |
| * Java API for Wayang that compensates for lacking default and named arguments. |
| */ |
| trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging { |
| |
| /** |
| * The type of the [[DataQuanta]] to be built. |
| */ |
| protected[api] def outputTypeTrap: TypeTrap |
| |
| /** |
| * Provide a [[JavaPlanBuilder]] to which this instance is associated. |
| */ |
| protected[api] implicit def javaPlanBuilder: JavaPlanBuilder |
| |
| /** |
| * Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s. |
| * |
| * @param name the name |
| * @return this instance |
| */ |
| def withName(name: String): This |
| |
| /** |
| * Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]]. |
| * |
| * @param experiment the [[Experiment]] |
| * @return this instance |
| */ |
| def withExperiment(experiment: Experiment): This |
| |
| /** |
| * Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not |
| * always necessary to set it and that it can be inferred in some situations. |
| * |
| * @param outputType the output [[DataSetType]] |
| * @return this instance |
| */ |
| def withOutputType(outputType: DataSetType[Out]): This |
| |
| /** |
| * Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not |
| * always necessary to set it and that it can be inferred in some situations. |
| * |
| * @param cls the output [[Class]] |
| * @return this instance |
| */ |
| def withOutputClass(cls: Class[Out]): This |
| |
| /** |
| * Register a broadcast with the [[DataQuanta]] to be built |
| * |
| * @param sender a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]] |
| * @param broadcastName the name of the broadcast |
| * @return this instance |
| */ |
| def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This |
| |
| /** |
| * Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]]. |
| * |
| * @param cardinalityEstimator the [[CardinalityEstimator]] |
| * @return this instance |
| */ |
| def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This |
| |
| /** |
| * Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked |
| * multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions. |
| * |
| * @param platform the [[CardinalityEstimator]] |
| * @return this instance |
| */ |
| def withTargetPlatform(platform: Platform): This |
| |
| /** |
| * Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]]. |
| * |
| * @param cls the [[Class]] |
| * @return this instance |
| */ |
| def withUdfJarOf(cls: Class[_]): This |
| |
| /** |
| * Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]]. |
| * |
| * @param path the path of the JAR file |
| * @return this instance |
| */ |
| def withUdfJar(path: String): This |
| |
| /** |
| * Provide a [[ClassTag]] for the constructed [[DataQuanta]]. |
| * |
| * @return the [[ClassTag]] |
| */ |
| protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[MapOperator]]. |
| * |
| * @param udf the UDF for the [[MapOperator]] |
| * @return a [[MapDataQuantaBuilder]] |
| */ |
| def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]]. |
| * |
| * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]] |
| * @return a [[MapDataQuantaBuilder]] |
| */ |
| def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]]. |
| * |
| * @param udf filter UDF |
| * @return a [[FilterDataQuantaBuilder]] |
| */ |
| def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]]. |
| * |
| * @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]] |
| * @return a [[FlatMapDataQuantaBuilder]] |
| */ |
| def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]]. |
| * |
| * @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]] |
| * @return a [[MapPartitionsDataQuantaBuilder]] |
| */ |
| def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) = |
| new MapPartitionsDataQuantaBuilder(this, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]]. |
| * |
| * @param sampleSize the absolute size of the sample |
| * @return a [[SampleDataQuantaBuilder]] |
| */ |
| def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator { |
| override def applyAsInt(operand: Int): Int = sampleSize |
| }) |
| |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]]. |
| * |
| * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number |
| * @return a [[SampleDataQuantaBuilder]] |
| */ |
| def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction) |
| |
| /** |
| * Annotates a key to this instance. |
| * @param keyExtractor extracts the key from the data quanta |
| * @return a [[KeyedDataQuantaBuilder]] |
| */ |
| def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]]. |
| * |
| * @param udf the UDF for the [[GlobalReduceOperator]] |
| * @return a [[GlobalReduceDataQuantaBuilder]] |
| */ |
| def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]]. |
| * |
| * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]] |
| * @param udf the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]] |
| * @return a [[ReduceByDataQuantaBuilder]] |
| */ |
| def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) = |
| new ReduceByDataQuantaBuilder(this, keyUdf, udf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]. |
| * |
| * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]] |
| * @return a [[GroupByDataQuantaBuilder]] |
| */ |
| def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) = |
| new GroupByDataQuantaBuilder(this, keyUdf) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]. |
| * |
| * @return a [[GlobalGroupDataQuantaBuilder]] |
| */ |
| def group() = new GlobalGroupDataQuantaBuilder(this) |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.UnionAllOperator]]. |
| * |
| * @param that the other [[DataQuantaBuilder]] to union with |
| * @return a [[UnionDataQuantaBuilder]] |
| */ |
| def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that) |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.IntersectOperator]]. |
| * |
| * @param that the other [[DataQuantaBuilder]] to intersect with |
| * @return an [[IntersectDataQuantaBuilder]] |
| */ |
| def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that) |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.JoinOperator]]. |
| * |
| * @param thisKeyUdf the key extraction UDF for this instance |
| * @param that the other [[DataQuantaBuilder]] to join with |
| * @param thatKeyUdf the key extraction UDF for `that` instance |
| * @return a [[JoinDataQuantaBuilder]] |
| */ |
| def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key], |
| that: DataQuantaBuilder[_, ThatOut], |
| thatKeyUdf: SerializableFunction[ThatOut, Key]) = |
| new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf) |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.CoGroupOperator]]. |
| * |
| * @param thisKeyUdf the key extraction UDF for this instance |
| * @param that the other [[DataQuantaBuilder]] to join with |
| * @param thatKeyUdf the key extraction UDF for `that` instance |
| * @return a [[CoGroupDataQuantaBuilder]] |
| */ |
| def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key], |
| that: DataQuantaBuilder[_, ThatOut], |
| thatKeyUdf: SerializableFunction[ThatOut, Key]) = |
| new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf) |
| |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.SortOperator]]. |
| * |
| * @param keyUdf the key extraction UDF for this instance |
| * @return a [[SortDataQuantaBuilder]] |
| */ |
| def sort[Key](keyUdf: SerializableFunction[Out, Key]) = |
| new SortDataQuantaBuilder(this, keyUdf) |
| |
| /** |
| * Feed the built [[DataQuanta]] of this and the given instance into a |
| * [[org.apache.wayang.basic.operators.CartesianOperator]]. |
| * |
| * @return a [[CartesianDataQuantaBuilder]] |
| */ |
| def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]]. |
| * |
| * @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output |
| */ |
| def zipWithId = new ZipWithIdDataQuantaBuilder(this) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]]. |
| * |
| * @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output |
| */ |
| def distinct = new DistinctDataQuantaBuilder(this) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]]. |
| * |
| * @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output |
| */ |
| def count = new CountDataQuantaBuilder(this) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]]. |
| * |
| * @return a [[DoWhileDataQuantaBuilder]] |
| */ |
| def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]], |
| bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) = |
| new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]]. |
| * |
| * @return a [[DoWhileDataQuantaBuilder]] |
| */ |
| def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) = |
| new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]] |
| * and a single [[OutputSlot]]. |
| * |
| * @param operator the custom [[Operator]] |
| * @tparam T the type of the output [[DataQuanta]] |
| * @return a [[CustomOperatorDataQuantaBuilder]] |
| */ |
| def customOperator[T](operator: Operator) = { |
| assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.") |
| assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.") |
| new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this) |
| } |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers |
| * execution of the constructed [[WayangPlan]]. |
| * |
| * @return the collected data quanta |
| */ |
| def collect(): JavaCollection[Out] = { |
| import scala.collection.JavaConversions._ |
| this.dataQuanta().collect() |
| } |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers |
| * execution of the constructed [[WayangPlan]]. |
| * |
| * @param f the [[JavaFunction]] |
| * @return the collected data quanta |
| */ |
| def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers |
| * execution of the constructed [[WayangPlan]]. |
| * |
| * @param url the URL of the file to be written |
| * @param jobName optional name for the [[WayangPlan]] |
| * @return the collected data quanta |
| */ |
| def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit = |
| this.writeTextFile(url, formatterUdf, jobName, null) |
| |
| /** |
| * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers |
| * execution of the constructed [[WayangPlan]]. |
| * |
| * @param url the URL of the file to be written |
| * @return the collected data quanta |
| */ |
| def writeTextFile(url: String, |
| formatterUdf: SerializableFunction[Out, String], |
| jobName: String, |
| udfLoadProfileEstimator: LoadProfileEstimator): Unit = { |
| this.javaPlanBuilder.withJobName(jobName) |
| this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator) |
| } |
| |
| /** |
| * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of |
| * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this |
| * operation is applicable. |
| * |
| * @return a [[RecordDataQuantaBuilder]] |
| */ |
| def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = { |
| this match { |
| case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]] |
| case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]]) |
| } |
| } |
| |
| /** |
| * Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of |
| * type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this |
| * operation is applicable. |
| * |
| * @return a [[EdgeDataQuantaBuilder]] |
| */ |
| def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = { |
| this match { |
| case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]] |
| case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]]) |
| } |
| } |
| |
| /** |
| * Get or create the [[DataQuanta]] built by this instance. |
| * |
| * @return the [[DataQuanta]] |
| */ |
| protected[api] def dataQuanta(): DataQuanta[Out] |
| |
| } |
| |
| /** |
| * Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient |
| * Java API for Wayang that compensates for lacking default and named arguments. |
| */ |
| abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder) |
| extends Logging with DataQuantaBuilder[This, Out] { |
| |
| /** |
| * Lazy-initialized. The [[DataQuanta]] product of this builder. |
| */ |
| private var result: DataQuanta[Out] = _ |
| |
| /** |
| * A name for the [[DataQuanta]] to be built. |
| */ |
| private var name: String = _ |
| |
| /** |
| * An [[Experiment]] for the [[DataQuanta]] to be built. |
| */ |
| private var experiment: Experiment = _ |
| |
| /** |
| * Broadcasts for the [[DataQuanta]] to be built. |
| */ |
| private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer() |
| |
| /** |
| * [[CardinalityEstimator]] for the [[DataQuanta]] to be built. |
| */ |
| private var cardinalityEstimator: CardinalityEstimator = _ |
| |
| /** |
| * Target [[Platform]]s for the [[DataQuanta]] to be built. |
| */ |
| private val targetPlatforms: ListBuffer[Platform] = ListBuffer() |
| |
| /** |
| * Paths of UDF JAR files for the [[DataQuanta]] to be built. |
| */ |
| private val udfJars: ListBuffer[String] = ListBuffer() |
| |
| /** |
| * The type of the [[DataQuanta]] to be built. |
| */ |
| protected[api] val outputTypeTrap = getOutputTypeTrap |
| |
| /** |
| * Retrieve an intialization value for [[outputTypeTrap]]. |
| * |
| * @return the [[TypeTrap]] |
| */ |
| protected def getOutputTypeTrap = new TypeTrap |
| |
| override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder |
| |
| override def withName(name: String): This = { |
| this.name = name |
| this.asInstanceOf[This] |
| } |
| |
| override def withExperiment(experiment: Experiment): This = { |
| this.experiment = experiment |
| this.asInstanceOf[This] |
| } |
| |
| override def withOutputType(outputType: DataSetType[Out]): This = { |
| this.outputTypeTrap.dataSetType = outputType |
| this.asInstanceOf[This] |
| } |
| |
| override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls)) |
| |
| override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = { |
| this.broadcasts += Tuple2(broadcastName, sender) |
| this.asInstanceOf[This] |
| } |
| |
| override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = { |
| this.cardinalityEstimator = cardinalityEstimator |
| this.asInstanceOf[This] |
| } |
| |
| override def withTargetPlatform(platform: Platform): This = { |
| this.targetPlatforms += platform |
| this.asInstanceOf[This] |
| } |
| |
| def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls)) |
| |
| override def withUdfJar(path: String): This = { |
| this.udfJars += path |
| this.asInstanceOf[This] |
| } |
| |
| override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass) |
| |
| override protected[api] def dataQuanta(): DataQuanta[Out] = { |
| if (this.result == null) { |
| this.result = this.build |
| if (this.name != null) this.result.withName(this.name) |
| if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator) |
| if (this.experiment != null) this.result.withExperiment(experiment) |
| this.result.withUdfJars(this.udfJars: _*) |
| this.result.withTargetPlatforms(this.targetPlatforms: _*) |
| this.broadcasts.foreach { |
| case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName) |
| } |
| } |
| this.result |
| } |
| |
| /** |
| * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]]. |
| * |
| * @return the created and partially configured [[DataQuanta]] |
| */ |
| protected def build: DataQuanta[Out] |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s. |
| * |
| * @param source the [[UnarySource]] |
| * @param javaPlanBuilder the [[JavaPlanBuilder]] |
| */ |
| class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[This, Out] { |
| |
| override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s. |
| * |
| * @param collection the [[JavaCollection]] to be loaded |
| * @param javaPlanBuilder the [[JavaPlanBuilder]] |
| */ |
| class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] { |
| |
| // Try to infer the type class from the collection. |
| locally { |
| if (!collection.isEmpty) { |
| val any = WayangCollections.getAny(collection) |
| if (any != null) { |
| this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass) |
| } |
| } |
| } |
| |
| override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[MapOperator]] |
| */ |
| class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], |
| udf: SerializableFunction[In, Out]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] { |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| // Try to infer the type classes from the udf. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| parameters.get("Output") match { |
| case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| } |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with |
| * [[org.apache.wayang.basic.function.ProjectionDescriptor]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]] |
| */ |
| class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] { |
| |
| override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq) |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[MapOperator]] |
| */ |
| class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| /** Selectivity of the filter predicate. */ |
| private var selectivity: ProbabilisticDoubleInterval = _ |
| |
| /** SQL UDF implementing the filter predicate. */ |
| private var sqlUdf: String = _ |
| |
| // Try to infer the type classes from the udf. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| } |
| |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| /** |
| * Add a SQL implementation of the UDF. |
| * |
| * @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause |
| * @return this instance |
| */ |
| def withSqlUdf(sqlUdf: String) = { |
| this.sqlUdf = sqlUdf |
| this |
| } |
| |
| /** |
| * Specify the selectivity of the UDF. |
| * |
| * @param lowerEstimate the lower bound of the expected selectivity |
| * @param upperEstimate the upper bound of the expected selectivity |
| * @param confidence the probability of the actual selectivity being within these bounds |
| * @return this instance |
| */ |
| def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = { |
| this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence) |
| this |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().filterJava( |
| udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator |
| ) |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param keyUdf UDF for the [[org.apache.wayang.basic.operators.SortOperator]] |
| */ |
| class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T], |
| keyUdf: SerializableFunction[T, Key]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| /** [[ClassTag]] or surrogate of [[Key]] */ |
| implicit var keyTag: ClassTag[Key] = _ |
| |
| /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */ |
| private var keyUdfCpuEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */ |
| private var keyUdfRamEstimator: LoadEstimator = _ |
| |
| |
| // Try to infer the type classes from the UDFs. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| |
| |
| /** |
| * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless. |
| * |
| * @param udfCpuEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = { |
| this.keyUdfCpuEstimator = udfCpuEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless. |
| * |
| * @param udfRamEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = { |
| this.keyUdfRamEstimator = udfRamEstimator |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag) |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]] |
| */ |
| class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], |
| udf: SerializableFunction[In, java.lang.Iterable[Out]]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] { |
| |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| /** Selectivity of the filter predicate. */ |
| private var selectivity: ProbabilisticDoubleInterval = _ |
| |
| // Try to infer the type classes from the udf. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0) |
| originalClass match { |
| case cls: Class[Out] => { |
| this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls) |
| } |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| } |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| /** |
| * Specify the selectivity of the UDF. |
| * |
| * @param lowerEstimate the lower bound of the expected selectivity |
| * @param upperEstimate the upper bound of the expected selectivity |
| * @param confidence the probability of the actual selectivity being within these bounds |
| * @return this instance |
| */ |
| def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = { |
| this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence) |
| this |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().flatMapJava( |
| udf, this.selectivity, this.udfLoadProfileEstimator |
| ) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]] |
| */ |
| class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], |
| udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] { |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| /** Selectivity of the filter predicate. */ |
| private var selectivity: ProbabilisticDoubleInterval = _ |
| |
| // Try to infer the type classes from the udf. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In] => { |
| inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| } |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0) |
| originalClass match { |
| case cls: Class[Out] => { |
| this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls) |
| } |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| } |
| |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| /** |
| * Specify the selectivity of the UDF. |
| * |
| * @param lowerEstimate the lower bound of the expected selectivity |
| * @param upperEstimate the upper bound of the expected selectivity |
| * @param confidence the probability of the actual selectivity being within these bounds |
| * @return this instance |
| */ |
| def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = { |
| this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence) |
| this |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava( |
| udf, this.selectivity, this.udfLoadProfileEstimator |
| ) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number |
| */ |
| class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] { |
| |
| /** |
| * Size of the dataset to be sampled. |
| */ |
| private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE |
| |
| /** |
| * Sampling method to use. |
| */ |
| private var sampleMethod = SampleOperator.Methods.ANY |
| |
| /** |
| * Seed to use. |
| */ |
| private var seed: Option[Long] = None |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| /** |
| * Set the size of the dataset that should be sampled. |
| * |
| * @param datasetSize the size of the dataset |
| * @return this instance |
| */ |
| def withDatasetSize(datasetSize: Long) = { |
| this.datasetSize = datasetSize |
| this |
| } |
| |
| /** |
| * Set the sample method to be used. |
| * |
| * @param sampleMethod the sample method |
| * @return this instance |
| */ |
| def withSampleMethod(sampleMethod: SampleOperator.Methods) = { |
| this.sampleMethod = sampleMethod |
| this |
| } |
| |
| /** |
| * Set the sample method to be used. |
| * |
| * @param seed |
| * @return this instance |
| */ |
| def withSeed(seed: Long) = { |
| this.seed = Some(seed) |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]] |
| * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]] |
| */ |
| class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], |
| keyUdf: SerializableFunction[T, Key], |
| udf: SerializableBinaryOperator[T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| implicit var keyTag: ClassTag[Key] = _ |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| // TODO: Add these estimators. |
| // /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */ |
| // private var keyUdfCpuEstimator: LoadEstimator = _ |
| // |
| // /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */ |
| // private var keyUdfRamEstimator: LoadEstimator = _ |
| |
| // Try to infer the type classes from the UDFs. |
| locally { |
| var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]]) |
| parameters.get("Type") match { |
| case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| |
| parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf) |
| } |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]] |
| */ |
| class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] { |
| |
| implicit var keyTag: ClassTag[Key] = _ |
| |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */ |
| private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| // Try to infer the type classes from the UDFs. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| */ |
| class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] { |
| |
| override protected def build = inputDataQuanta.dataQuanta().group() |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param udf UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]] |
| */ |
| class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], |
| udf: SerializableBinaryOperator[T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| // Try to infer the type classes from the udf. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]]) |
| parameters.get("Type") match { |
| case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", udf) |
| } |
| } |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s. |
| * |
| * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| */ |
| class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T], |
| inputDataQuanta1: DataQuantaBuilder[_, T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] { |
| |
| override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap |
| |
| override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta()) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s. |
| * |
| * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| */ |
| class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T], |
| inputDataQuanta1: DataQuantaBuilder[_, T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] { |
| |
| override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap |
| |
| override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta()) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s. |
| * |
| * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]] |
| * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]] |
| */ |
| class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0], |
| inputDataQuanta1: DataQuantaBuilder[_, In1], |
| keyUdf0: SerializableFunction[In0, Key], |
| keyUdf1: SerializableFunction[In1, Key]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], RT2[In0, In1]] { |
| |
| /** [[ClassTag]] or surrogate of [[Key]] */ |
| implicit var keyTag: ClassTag[Key] = _ |
| |
| /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */ |
| private var keyUdf0CpuEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */ |
| private var keyUdf0RamEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */ |
| private var keyUdf1CpuEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */ |
| private var keyUdf1RamEstimator: LoadEstimator = _ |
| |
| // Try to infer the type classes from the UDFs. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf0) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf0) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf0) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf0) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| // Since we are currently not looking at type parameters, we can statically determine the output type. |
| locally { |
| this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]] |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless. |
| * |
| * @param udfCpuEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = { |
| this.keyUdf0CpuEstimator = udfCpuEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless. |
| * |
| * @param udfRamEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = { |
| this.keyUdf0RamEstimator = udfRamEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless. |
| * |
| * @param udfCpuEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = { |
| this.keyUdf1CpuEstimator = udfCpuEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless. |
| * |
| * @param udfRamEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = { |
| this.keyUdf1RamEstimator = udfRamEstimator |
| this |
| } |
| |
| /** |
| * Assemble the joined elements to new elements. |
| * |
| * @param udf produces a joined element from two joinable elements |
| * @return a new [[DataQuantaBuilder]] representing the assembled join product |
| */ |
| def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) = |
| this.map(new SerializableFunction[RT2[In0, In1], NewOut] { |
| override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1) |
| }) |
| |
| override protected def build = |
| inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s. |
| * |
| * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]] |
| * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]] |
| */ |
| class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0], |
| inputDataQuanta1: DataQuantaBuilder[_, In1], |
| keyUdf0: SerializableFunction[In0, Key], |
| keyUdf1: SerializableFunction[In1, Key]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], RT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] { |
| |
| /** [[ClassTag]] or surrogate of [[Key]] */ |
| implicit var keyTag: ClassTag[Key] = _ |
| |
| /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */ |
| private var keyUdf0CpuEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */ |
| private var keyUdf0RamEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */ |
| private var keyUdf1CpuEstimator: LoadEstimator = _ |
| |
| /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */ |
| private var keyUdf1RamEstimator: LoadEstimator = _ |
| |
| // Try to infer the type classes from the UDFs. |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf0) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf0) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| locally { |
| val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]]) |
| parameters.get("Input") match { |
| case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls) |
| case _ => logger.warn("Could not infer types from {}.", keyUdf0) |
| } |
| |
| this.keyTag = parameters.get("Output") match { |
| case cls: Class[Key] => ClassTag(cls) |
| case _ => |
| logger.warn("Could not infer types from {}.", keyUdf0) |
| ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| } |
| } |
| // Since we are currently not looking at type parameters, we can statically determine the output type. |
| locally { |
| this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]] |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless. |
| * |
| * @param udfCpuEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = { |
| this.keyUdf0CpuEstimator = udfCpuEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless. |
| * |
| * @param udfRamEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = { |
| this.keyUdf0RamEstimator = udfRamEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless. |
| * |
| * @param udfCpuEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = { |
| this.keyUdf1CpuEstimator = udfCpuEstimator |
| this |
| } |
| |
| /** |
| * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless. |
| * |
| * @param udfRamEstimator the [[LoadEstimator]] |
| * @return this instance |
| */ |
| def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = { |
| this.keyUdf1RamEstimator = udfRamEstimator |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag) |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s. |
| * |
| * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]] |
| */ |
| class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0], |
| inputDataQuanta1: DataQuantaBuilder[_, In1]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], RT2[In0, In1]] { |
| |
| // Since we are currently not looking at type parameters, we can statically determine the output type. |
| locally { |
| this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]] |
| } |
| |
| override protected def build = |
| inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag) |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| */ |
| class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], RT2[java.lang.Long, T]] { |
| |
| // Since we are currently not looking at type parameters, we can statically determine the output type. |
| locally { |
| this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]] |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().zipWithId |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| */ |
| class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| override protected def build = inputDataQuanta.dataQuanta().distinct |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| */ |
| class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] { |
| |
| // We can statically determine the output type. |
| locally { |
| this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long] |
| } |
| |
| override protected def build = inputDataQuanta.dataQuanta().count |
| |
| } |
| |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer |
| * any convenience methods, though. |
| * |
| * @param operator the custom [[org.apache.wayang.core.plan.wayangplan.Operator]] |
| * @param outputIndex index of the [[OutputSlot]] addressed by the new instance |
| * @param buildCache a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]] |
| * @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]] |
| * @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]] |
| */ |
| class CustomOperatorDataQuantaBuilder[T](operator: Operator, |
| outputIndex: Int, |
| buildCache: DataQuantaBuilderCache, |
| inputDataQuanta: DataQuantaBuilder[_, _]*) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] { |
| |
| override protected def build = { |
| // If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once. |
| if (!buildCache.hasCached) { |
| val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*) |
| buildCache.cache(dataQuanta) |
| } |
| buildCache(outputIndex) |
| } |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param conditionUdf UDF for the looping condition |
| * @param bodyBuilder builds the loop body |
| */ |
| class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T], |
| conditionUdf: SerializablePredicate[JavaCollection[ConvOut]], |
| bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] { |
| |
| // TODO: Get the ClassTag right. |
| implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass) |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| // TODO: We could improve by combining the TypeTraps in the body loop. |
| |
| /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */ |
| private var udfLoadProfileEstimator: LoadProfileEstimator = _ |
| |
| /** Number of expected iterations. */ |
| private var numExpectedIterations = 20 |
| |
| /** |
| * Set a [[LoadProfileEstimator]] for the load of the UDF. |
| * |
| * @param udfLoadProfileEstimator the [[LoadProfileEstimator]] |
| * @return this instance |
| */ |
| def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = { |
| this.udfLoadProfileEstimator = udfLoadProfileEstimator |
| this |
| } |
| |
| /** |
| * Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not |
| * always necessary to set it and that it can be inferred in some situations. |
| * |
| * @param outputType the output [[DataSetType]] |
| * @return this instance |
| */ |
| def withConditionType(outputType: DataSetType[ConvOut]) = { |
| this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass) |
| this |
| } |
| |
| /** |
| * Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not |
| * always necessary to set it and that it can be inferred in some situations. |
| * |
| * @param cls the output [[Class]] |
| * @return this instance |
| */ |
| def withConditionClass(cls: Class[ConvOut]) = { |
| this.convOutClassTag = ClassTag(cls) |
| this |
| } |
| |
| /** |
| * Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]]. |
| * |
| * @param numExpectedIterations the expected number of iterations |
| * @return this instance |
| */ |
| def withExpectedNumberOfIterations(numExpectedIterations: Int) = { |
| this.numExpectedIterations = numExpectedIterations |
| this |
| } |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().doWhileJava[ConvOut]( |
| conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator |
| )(this.convOutClassTag) |
| |
| |
| /** |
| * Create a loop body builder that is based on [[DataQuanta]]. |
| * |
| * @return the loop body builder |
| */ |
| private def dataQuantaBodyBuilder = |
| new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] { |
| override def apply(loopStart: DataQuanta[T]) = { |
| val loopStartBuilder = new FakeDataQuantaBuilder(loopStart) |
| val loopEndBuilders = bodyBuilder(loopStartBuilder) |
| new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta()) |
| } |
| } |
| |
| } |
| |
| /** |
| * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s. |
| * |
| * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]] |
| * @param numRepetitions number of repetitions of the loop |
| * @param bodyBuilder builds the loop body |
| */ |
| class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], |
| numRepetitions: Int, |
| bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] { |
| |
| // Reuse the input TypeTrap to enforce type equality between input and output. |
| override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap |
| |
| // TODO: We could improve by combining the TypeTraps in the body loop. |
| |
| override protected def build = |
| inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => { |
| val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta) |
| bodyBuilder(loopStartbuilder).dataQuanta() |
| }) |
| |
| } |
| |
| /** |
| * Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter. |
| * |
| * @param _dataQuanta the wrapped [[DataQuanta]] |
| */ |
| class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder) |
| extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] { |
| |
| override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass) |
| |
| override def dataQuanta() = _dataQuanta |
| |
| /** |
| * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]]. |
| * |
| * @return the created and partially configured [[DataQuanta]] |
| */ |
| override protected def build: DataQuanta[T] = _dataQuanta |
| } |
| |
| /** |
| * This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key. |
| */ |
| class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out], |
| private val keyExtractor: SerializableFunction[Out, Key]) |
| (implicit javaPlanBuilder: JavaPlanBuilder) { |
| |
| /** |
| * Joins this instance with the given one via their keys. |
| * |
| * @param that the instance to join with |
| * @return a [[DataQuantaBuilder]] representing the join product |
| */ |
| def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) = |
| dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor) |
| |
| /** |
| * Co-groups this instance with the given one via their keys. |
| * |
| * @param that the instance to join with |
| * @return a [[DataQuantaBuilder]] representing the co-group product |
| */ |
| def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) = |
| dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor) |
| |
| } |