blob: b07af17e2eb47c58ebcaf06d326022e2e88b1305 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.dataquanta.builder
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.api.JavaPlanBuilder
import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils}
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
/**
* Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
* Java API for Wayang that compensates for lacking default and named arguments.
*/
abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
extends Logging with DataQuantaBuilder[This, Out] {
/**
* Lazy-initialized. The [[DataQuanta]] product of this builder.
*/
private var result: DataQuanta[Out] = _
/**
* A name for the [[DataQuanta]] to be built.
*/
private var name: String = _
/**
* An [[Experiment]] for the [[DataQuanta]] to be built.
*/
private var experiment: Experiment = _
/**
* Broadcasts for the [[DataQuanta]] to be built.
*/
private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
/**
* [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
*/
private var cardinalityEstimator: CardinalityEstimator = _
/**
* Target [[Platform]]s for the [[DataQuanta]] to be built.
*/
private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
/**
* Paths of UDF JAR files for the [[DataQuanta]] to be built.
*/
private val udfJars: ListBuffer[String] = ListBuffer()
/**
* The type of the [[DataQuanta]] to be built.
*/
protected[api] val outputTypeTrap = getOutputTypeTrap
/**
* Retrieve an intialization value for [[outputTypeTrap]].
*
* @return the [[TypeTrap]]
*/
protected def getOutputTypeTrap = new TypeTrap
override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
override def withName(name: String): This = {
this.name = name
this.asInstanceOf[This]
}
override def withExperiment(experiment: Experiment): This = {
this.experiment = experiment
this.asInstanceOf[This]
}
override def withOutputType(outputType: DataSetType[Out]): This = {
this.outputTypeTrap.dataSetType = outputType
this.asInstanceOf[This]
}
override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
this.broadcasts += Tuple2(broadcastName, sender)
this.asInstanceOf[This]
}
override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
this.cardinalityEstimator = cardinalityEstimator
this.asInstanceOf[This]
}
override def withTargetPlatform(platform: Platform): This = {
this.targetPlatforms += platform
this.asInstanceOf[This]
}
def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
override def withUdfJar(path: String): This = {
this.udfJars += path
this.asInstanceOf[This]
}
override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
override protected[api] def dataQuanta(): DataQuanta[Out] = {
if (this.result == null) {
this.result = this.build
if (this.name != null) this.result.withName(this.name)
if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
if (this.experiment != null) this.result.withExperiment(experiment)
this.result.withUdfJars(this.udfJars: _*)
this.result.withTargetPlatforms(this.targetPlatforms: _*)
this.broadcasts.foreach {
case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
}
}
this.result
}
/**
* Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
*
* @return the created and partially configured [[DataQuanta]]
*/
protected def build: DataQuanta[Out]
}