blob: a49a9082159fc61c2b99a096c18e9d677c547a43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api
import org.apache.wayang.core.util.ReflectionUtils
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
class MultiContextPlanBuilder(private[api] val multiContexts: List[MultiContext]) {
private[api] var udfJars = scala.collection.mutable.Set[String]()
private val multiContextMap: Map[Long, MultiContext] = multiContexts.map(context => context.id -> context).toMap
private var dataQuantaMap: Map[Long, DataQuanta[_]] = Map()
private var planBuilderMap: Map[Long, PlanBuilder] = multiContexts.map(context => context.id -> new PlanBuilder(context)).toMap
def this(multiContexts: java.util.List[MultiContext]) =
this(multiContexts.asScala.toList)
/**
* Defines user-code JAR files that might be needed to transfer to execution platforms.
*
* @param paths paths to JAR files that should be transferred
* @return this instance
*/
def withUdfJars(paths: String*): MultiContextPlanBuilder = {
// For each planBuilder in planBuilderMap,
// call its `withUdfJars` method with the value of `this.udfJars` and update map with the new PlanBuilder
planBuilderMap = planBuilderMap.mapValues(_.withUdfJars(paths.toSeq: _*))
this
}
/**
* Defines user-code JAR files that might be needed to transfer to execution platforms.
*
* @param classes whose JAR files should be transferred
* @return this instance
*/
def withUdfJarsOf(classes: Class[_]*): MultiContextPlanBuilder = {
withUdfJars(classes.map(ReflectionUtils.getDeclaringJar).filterNot(_ == null): _*)
this
}
/**
* Applies a function `f` to each [[PlanBuilder]] in the contexts.
* Returns a [[MultiContextDataQuanta]] containing the [[DataQuanta]].
*
* @param f the function to apply to each [[PlanBuilder]]
* @tparam Out the type parameter for the output of the `f` function
* @return a [[MultiContextDataQuanta]] containing the results of applying `f` to each [[PlanBuilder]]
*/
def forEach[Out: ClassTag](f: PlanBuilder => DataQuanta[Out]): MultiContextDataQuanta[Out] = {
val dataQuantaMap = multiContexts.map(context => context.id -> f(planBuilderMap(context.id))).toMap
new MultiContextDataQuanta[Out](dataQuantaMap)(this)
}
/**
* Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param url The URL of the text file to be read.
* @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
*/
def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readTextFile(url))
new ReadTextFileMultiContextPlanBuilder(this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[String]]])
}
/**
* Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param url The URL of the object file to be read.
* @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
*/
def readObjectFile[T: ClassTag](multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).readObjectFile(url))
new ReadObjectFileMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
}
/**
* Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param iterable The collection to be loaded.
* @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
*/
def loadCollection[T: ClassTag](multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
dataQuantaMap += (multiContext.id -> planBuilderMap(multiContext.id).loadCollection(iterable))
new LoadCollectionMultiContextPlanBuilder[T](this, multiContextMap, dataQuantaMap.asInstanceOf[Map[Long, DataQuanta[T]]])
}
}
class ReadTextFileMultiContextPlanBuilder(private val multiContextPlanBuilder: MultiContextPlanBuilder,
private val multiContextMap: Map[Long, MultiContext],
private var dataQuantaMap: Map[Long, DataQuanta[String]] = Map()) {
/**
* Same as [[PlanBuilder.readTextFile]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param url The URL of the text file to be read.
* @return The ReadTextFileMultiContextPlanBuilder with the added data quanta.
*/
def readTextFile(multiContext: MultiContext, url: String): ReadTextFileMultiContextPlanBuilder = {
dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readTextFile(url))
this
}
}
object ReadTextFileMultiContextPlanBuilder {
implicit def toMultiContextDataQuanta(builder: ReadTextFileMultiContextPlanBuilder): MultiContextDataQuanta[String] = {
new MultiContextDataQuanta[String](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
}
}
class ReadObjectFileMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
private val multiContextMap: Map[Long, MultiContext],
private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
/**
* Same as [[PlanBuilder.readObjectFile()]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param url The URL of the object file to be read.
* @return The ReadObjectFileMultiContextPlanBuilder with the added data quanta.
*/
def readObjectFile(multiContext: MultiContext, url: String): ReadObjectFileMultiContextPlanBuilder[T] = {
dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).readObjectFile(url))
this
}
}
object ReadObjectFileMultiContextPlanBuilder {
implicit def toMultiContextDataQuanta[T: ClassTag](builder: ReadObjectFileMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
}
}
class LoadCollectionMultiContextPlanBuilder[T: ClassTag](private val multiContextPlanBuilder: MultiContextPlanBuilder,
private val multiContextMap: Map[Long, MultiContext],
private var dataQuantaMap: Map[Long, DataQuanta[T]] = Map()) {
/**
* Same as [[PlanBuilder.loadCollection]], but for specified `multiContext`
*
* @param multiContext The multi context.
* @param iterable The collection to be loaded.
* @return The LoadCollectionMultiContextPlanBuilder with the added data quanta.
*/
def loadCollection(multiContext: MultiContext, iterable: Iterable[T]): LoadCollectionMultiContextPlanBuilder[T] = {
dataQuantaMap += (multiContext.id -> multiContextMap(multiContext.id).loadCollection(iterable))
this
}
}
object LoadCollectionMultiContextPlanBuilder {
implicit def toMultiContextDataQuanta[T: ClassTag](builder: LoadCollectionMultiContextPlanBuilder[T]): MultiContextDataQuanta[T] = {
new MultiContextDataQuanta[T](builder.dataQuantaMap)(builder.multiContextPlanBuilder)
}
}