blob: 4cfb9389c7896ebecd18e9df7e881894dd735d18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.dataquanta.builder
import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
import org.apache.wayang.basic.data.{Tuple2 => WT2}
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableFunction}
import org.apache.wayang.core.optimizer.costs.LoadEstimator
import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.ReflectionUtils
import scala.reflect.ClassTag
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
*
* @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
* @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
* @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
*/
class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
inputDataQuanta1: DataQuantaBuilder[_, In1],
keyUdf0: SerializableFunction[In0, Key],
keyUdf1: SerializableFunction[In1, Key])
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], WT2[In0, In1]] {
/** [[ClassTag]] or surrogate of [[Key]] */
implicit var keyTag: ClassTag[Key] = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
private var keyUdf0CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
private var keyUdf0RamEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
private var keyUdf1CpuEstimator: LoadEstimator = _
/** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
private var keyUdf1RamEstimator: LoadEstimator = _
// Try to infer the type classes from the UDFs.
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
locally {
val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
parameters.get("Input") match {
case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
case _ => logger.warn("Could not infer types from {}.", keyUdf0)
}
this.keyTag = parameters.get("Output") match {
case cls: Class[Key] => ClassTag(cls)
case _ =>
logger.warn("Could not infer types from {}.", keyUdf0)
ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
}
}
// Since we are currently not looking at type parameters, we can statically determine the output type.
locally {
this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
}
/**
* Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf0CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf0RamEstimator = udfRamEstimator
this
}
/**
* Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
*
* @param udfCpuEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
this.keyUdf1CpuEstimator = udfCpuEstimator
this
}
/**
* Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
*
* @param udfRamEstimator the [[LoadEstimator]]
* @return this instance
*/
def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
this.keyUdf1RamEstimator = udfRamEstimator
this
}
/**
* Assemble the joined elements to new elements.
*
* @param udf produces a joined element from two joinable elements
* @return a new [[DataQuantaBuilder]] representing the assembled join product
*/
def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
this.map(new SerializableFunction[WT2[In0, In1], NewOut] {
override def apply(joinTuple: WT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
})
override protected def build =
inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
}