blob: 51c413f25009d12a5f84c53aa256cf2fb45de1e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.dataquanta
import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType}
import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
import org.apache.wayang.basic.function.ProjectionDescriptor
import org.apache.wayang.basic.operators._
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
import org.apache.wayang.core.function._
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
import org.apache.wayang.core.plan.wayangplan._
import java.lang
import java.lang.{Iterable => JavaIterable}
import java.util.function.{IntUnaryOperator}
import scala.reflect._
/**
* Represents an intermediate result/data flow edge in a [[WayangPlan]].
*
* @param operator a unary [[Operator]] that produces this instance
* @param ev$1 the data type of the elements in this instance
* @param planBuilder keeps track of the [[WayangPlan]] being build
*/
class DataQuantaDefault[Out: ClassTag]
(override val operator: ElementaryOperator, outputIndex: Int = 0)
(implicit override val planBuilder: PlanBuilder)
extends DataQuanta[Out](operator, outputIndex) {
/**
* Feed this instance into a [[MapOperator]].
*
* @param udf a Java 8 lambda expression as UDF for the [[MapOperator]]
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[MapOperator]]'s output
*/
override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = {
val mapOperator = new MapOperator(new TransformationDescriptor(
udf, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
))
this.connectTo(mapOperator, 0)
DataQuantaDefault.wrap[NewOut](mapOperator)
}
/**
* Feed this instance into a [[MapPartitionsOperator]].
*
* @param udf a Java 8 lambda expression as UDF for the [[MapPartitionsOperator]]
* @param selectivity selectivity of the UDF
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[MapOperator]]'s output
*/
override def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]],
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = {
val mapOperator = new MapPartitionsOperator(
new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad)
)
this.connectTo(mapOperator, 0)
DataQuantaDefault.wrap[NewOut](mapOperator)
}
/**
* Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]].
*
* @param fieldNames names of the fields to be projected
* @return a new instance representing the [[MapOperator]]'s output
*/
override def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuantaDefault[NewOut] = {
val projectionOperator = new MapOperator(
new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*)
)
this.connectTo(projectionOperator, 0)
DataQuantaDefault.wrap[NewOut](projectionOperator)
}
/**
* Feed this instance into a [[FilterOperator]].
*
* @param udf UDF for the [[FilterOperator]]
* @param sqlUdf UDF as SQL `WHERE` clause
* @param selectivity selectivity of the UDF
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[FilterOperator]]'s output
*/
override def filterJava(udf: SerializablePredicate[Out],
sqlUdf: String = null,
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuantaDefault[Out] = {
val filterOperator = new FilterOperator(new PredicateDescriptor(
udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
).withSqlImplementation(sqlUdf))
this.connectTo(filterOperator, 0)
DataQuantaDefault.wrap[Out](filterOperator)
}
/**
* Feed this instance into a [[FlatMapOperator]].
*
* @param udf a Java 8 lambda expression as UDF for the [[FlatMapOperator]]
* @param selectivity selectivity of the UDF
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[FlatMapOperator]]'s output
*/
override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]],
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = {
val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
))
this.connectTo(flatMapOperator, 0)
DataQuantaDefault.wrap[NewOut](flatMapOperator)
}
/**
* Feed this instance into a [[SampleOperator]].
*
* @param sampleSizeFunction absolute size of the sample as a function of the current iteration number
* @param datasetSize optional size of the dataset to be sampled
* @param sampleMethod the [[SampleOperator.Methods]] to use for sampling
* @return a new instance representing the [[FlatMapOperator]]'s output
*/
override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
seed: Option[Long] = None,
sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuantaDefault[Out] = {
if (seed.isEmpty) {
val sampleOperator = new SampleOperator(
sampleSizeFunction,
dataSetType[Out],
sampleMethod
)
sampleOperator.setDatasetSize(datasetSize)
this.connectTo(sampleOperator, 0)
DataQuantaDefault.wrap[Out](sampleOperator)
}
else {
val sampleOperator = new SampleOperator(
sampleSizeFunction,
dataSetType[Out],
sampleMethod,
seed.get
)
sampleOperator.setDatasetSize(datasetSize)
this.connectTo(sampleOperator, 0)
DataQuantaDefault.wrap[Out](sampleOperator)
}
}
/**
* Assigns this instance a key extractor, which enables some key-based operations.
*
* @see KeyedDataQuanta
* @param keyExtractor extracts the key from the [[DataQuantaDefault]]
* @return the [[KeyedDataQuanta]]
*/
//TODO validate this implementation
override def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = {
new KeyedDataQuanta[Out, Key](this, keyExtractor)
}
/**
* Feed this instance into a [[ReduceByOperator]].
*
* @param keyUdf UDF to extract the grouping key from the data quanta
* @param udf aggregation UDF for the [[ReduceByOperator]]
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[ReduceByOperator]]'s output
*/
override def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
udf: SerializableBinaryOperator[Out],
udfLoad: LoadProfileEstimator = null)
: DataQuantaDefault[Out] = {
val reduceByOperator = new ReduceByOperator(
new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
)
this.connectTo(reduceByOperator, 0)
DataQuantaDefault.wrap[Out](reduceByOperator)
}
/**
* Feed this instance into a [[MaterializedGroupByOperator]].
*
* @param keyUdf UDF to extract the grouping key from the data quanta
* @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf`
* @return a new instance representing the [[MaterializedGroupByOperator]]'s output
*/
override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
keyUdfLoad: LoadProfileEstimator = null): DataQuantaDefault[java.lang.Iterable[Out]] = {
val groupByOperator = new MaterializedGroupByOperator(
new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
dataSetType[Out],
groupedDataSetType[Out]
)
this.connectTo(groupByOperator, 0)
DataQuantaDefault.wrap[java.lang.Iterable[Out]](groupByOperator)
}
/**
* Feed this instance into a [[GlobalReduceOperator]].
*
* @param udf aggregation UDF for the [[GlobalReduceOperator]]
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[GlobalReduceOperator]]'s output
*/
override def reduceJava(udf: SerializableBinaryOperator[Out],
udfLoad: LoadProfileEstimator = null): DataQuantaDefault[Out] = {
val globalReduceOperator = new GlobalReduceOperator(
new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
)
this.connectTo(globalReduceOperator, 0)
DataQuantaDefault.wrap[Out](globalReduceOperator)
}
/**
* Feeds this and a further instance into a [[JoinOperator]].
*
* @param thisKeyUdf UDF to extract keys from data quanta in this instance
* @param that the other instance
* @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
* @return a new instance representing the [[JoinOperator]]'s output
*/
override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[Out, ThatOut]] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val joinOperator = new JoinOperator(
new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
)
this.connectTo(joinOperator, 0)
that.connectTo(joinOperator, 1)
DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
}
/**
* Feeds this and a further instance into a [[CoGroupOperator]].
*
* @param thisKeyUdf UDF to extract keys from data quanta in this instance
* @param that the other instance
* @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
* @return a new instance representing the [[CoGroupOperator]]'s output
*/
override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val coGroupOperator = new CoGroupOperator(
new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
)
this.connectTo(coGroupOperator, 0)
that.connectTo(coGroupOperator, 1)
DataQuantaDefault.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
}
/**
* Feeds this and a further instance into a [[SortOperator]].
*
* @param keyUdf UDF to extract key from data quanta in this instance
* @return a new instance representing the [[SortOperator]]'s output
*/
override def sortJava[Key: ClassTag]
(keyUdf: SerializableFunction[Out, Key])
: DataQuantaDefault[Out] = {
val sortOperator = new SortOperator(new TransformationDescriptor(
keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
this.connectTo(sortOperator, 0)
DataQuantaDefault.wrap[Out](sortOperator)
}
/**
* Broadcasts the data quanta in this instance to a further instance.
*
* @param receiver the instance that receives the broadcast
* @param broadcastName the name with that the broadcast will be registered
*/
private def broadcast(receiver: DataQuantaDefault[_], broadcastName: String) =
receiver.registerBroadcast(this.operator, this.outputIndex, broadcastName)
/**
* Register a further instance as broadcast.
*
* @param sender provides the broadcast data quanta
* @param outputIndex identifies the output index of the sender
* @param broadcastName the name with that the broadcast will be registered
*/
private def registerBroadcast(sender: Operator, outputIndex: Int, broadcastName: String) =
sender.broadcastTo(outputIndex, this.operator, broadcastName)
/**
* Feed this instance into a [[GlobalMaterializedGroupOperator]].
*
* @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output
*/
override def group(): DataQuanta[JavaIterable[Out]] = {
val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
this.connectTo(groupOperator, 0)
DataQuantaDefault.wrap[JavaIterable[Out]](groupOperator)
}
/**
* Feed this instance and a further instance into a [[UnionAllOperator]].
*
* @param that the other instance to union with
* @return a new instance representing the [[UnionAllOperator]]'s output
*/
override def union(that: DataQuanta[Out]): DataQuanta[Out] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val unionAllOperator = new UnionAllOperator(dataSetType[Out])
this.connectTo(unionAllOperator, 0)
that.connectTo(unionAllOperator, 1)
DataQuantaDefault.wrap[Out](unionAllOperator)
}
/**
* Feed this instance and a further instance into a [[IntersectOperator]].
*
* @param that the other instance to intersect with
* @return a new instance representing the [[IntersectOperator]]'s output
*/
override def intersect(that: DataQuanta[Out]): DataQuanta[Out] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val intersectOperator = new IntersectOperator(dataSetType[Out])
this.connectTo(intersectOperator, 0)
that.connectTo(intersectOperator, 1)
DataQuantaDefault.wrap[Out](intersectOperator)
}
/**
* Feeds this and a further instance into a [[CartesianOperator]].
*
* @param that the other instance
* @return a new instance representing the [[CartesianOperator]]'s output
*/
override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuanta[WayangTuple2[Out, ThatOut]] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
this.connectTo(cartesianOperator, 0)
that.connectTo(cartesianOperator, 1)
DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
}
/**
* Feeds this instance into a [[ZipWithIdOperator]].
*
* @return a new instance representing the [[ZipWithIdOperator]]'s output
*/
override def zipWithId: DataQuanta[WayangTuple2[lang.Long, Out]] = {
val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
this.connectTo(zipWithIdOperator, 0)
DataQuantaDefault.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
}
/**
* Feeds this instance into a [[DistinctOperator]].
*
* @return a new instance representing the [[DistinctOperator]]'s output
*/
override def distinct: DataQuanta[Out] = {
val distinctOperator = new DistinctOperator(dataSetType[Out])
this.connectTo(distinctOperator, 0)
DataQuantaDefault.wrap[Out](distinctOperator)
}
/**
* Feeds this instance into a [[CountOperator]].
*
* @return a new instance representing the [[CountOperator]]'s output
*/
override def count: DataQuanta[lang.Long] = {
val countOperator = new CountOperator(dataSetType[Out])
this.connectTo(countOperator, 0)
DataQuantaDefault.wrap[lang.Long](countOperator)
}
}
object DataQuantaDefault extends DataQuantaCreator {
def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = {
new DataQuantaDefault[T](operator, outputIndex)
}
def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaDefault[_] =
new DataQuantaDefault(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder)
}