blob: 6f4a19fced28ae01282efa9a6010a41203ed9089 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch
import org.apache.crunch.{PCollection => JCollection, Pair => JPair, _}
import org.apache.crunch.types.{PType, PTableType}
import org.apache.crunch.types.writable.WritableTypeFamily
import org.apache.crunch.lib.Shard
/**
* Base trait for PCollection-like entities in Scrunch, including PTables and PGroupedTables.
*
* @tparam S the data type of the underlying object contained in this instance
* @tparam FullType the Scrunch PCollection type of this object
* @tparam NativeType the corresponding Crunch PCollection type of this object
*/
trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
type FunctionType[T]
type CtxtFunctionType[T]
protected def wrapFlatMapFn[T](fmt: FunctionType[TraversableOnce[T]]): DoFn[S, T]
protected def wrapMapFn[T](fmt: FunctionType[T]): MapFn[S, T]
protected def wrapFilterFn(fmt: FunctionType[Boolean]): FilterFn[S]
protected def wrapFlatMapWithCtxtFn[T](fmt: CtxtFunctionType[TraversableOnce[T]]): DoFn[S, T]
protected def wrapMapWithCtxtFn[T](fmt: CtxtFunctionType[T]): MapFn[S, T]
protected def wrapFilterWithCtxtFn(fmt: CtxtFunctionType[Boolean]): FilterFn[S]
protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S, JPair[K, V]]
protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]]
protected def setupRun() {
PipelineLike.setupConf(native.getPipeline().getConfiguration())
}
/**
* Returns the underlying PCollection wrapped by this instance.
*/
val native: NativeType
protected def wrap(newNative: JCollection[_]): FullType
/**
* Write the data in this instance to the given target.
*/
def write(target: Target): FullType = wrap(native.write(target))
/**
* Write the data in this instance to the given target with the given {@link Target.WriteMode}.
*/
def write(target: Target, writeMode: Target.WriteMode): FullType = {
wrap(native.write(target, writeMode))
}
/**
* Cache the data in this instance using the default caching mechanism for the
* underlying Pipeline type.
*
* @return this instance
*/
def cache() = wrap(native.cache())
/**
* Cache the data in this instance using the given caching options, if they are
* applicable for the underlying Pipeline type.
*
* @return this instance
*/
def cache(opts: CachingOptions) = wrap(native.cache(opts))
/**
* Apply a flatMap operation to this instance, returning a {@code PTable} if the return
* type of the function is a {@code TraversableOnce[Tuple2]} and a {@code PCollection} otherwise.
*/
def flatMap[T, To](f: FunctionType[TraversableOnce[T]])
(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, wrapFlatMapFn(f), pt.get(getTypeFamily()))
}
/**
* Apply a map operation to this instance, returning a {@code PTable} if the return
* type of the function is a {@code Tuple2} and a {@code PCollection} otherwise.
*/
def map[T, To](f: FunctionType[T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, wrapMapFn(f), pt.get(getTypeFamily()))
}
/**
* Apply the given filter function to the elements of this instance and return an new
* instance that contains the items that pass the filter.
*/
def filter(f: FunctionType[Boolean]): FullType = {
wrap(native.filter(wrapFilterFn(f)))
}
def filter(name: String, f: FunctionType[Boolean]): FullType = {
wrap(native.filter(name, wrapFilterFn(f)))
}
def flatMapWithContext[T, To](f: CtxtFunctionType[TraversableOnce[T]])
(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, wrapFlatMapWithCtxtFn(f), pt.get(getTypeFamily()))
}
def mapWithContext[T, To](f: CtxtFunctionType[T])
(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, wrapMapWithCtxtFn(f), pt.get(getTypeFamily()))
}
def filterWithContext(f: CtxtFunctionType[Boolean]): FullType = {
wrap(native.filter(wrapFilterWithCtxtFn(f)))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
new PCollection[T](native.parallelDo(fn, ptype))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = {
new PCollection[T](native.parallelDo(name, fn, ptype))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T], opts: ParallelDoOptions) = {
new PCollection[T](native.parallelDo(name, fn, ptype, opts))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
new PTable[K, V](native.parallelDo(fn, ptype))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
new PTable[K, V](native.parallelDo(name, fn, ptype))
}
/**
* Applies the given doFn to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V], opts: ParallelDoOptions) = {
new PTable[K, V](native.parallelDo(name, fn, ptype, opts))
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def flatMap[T](f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = {
parallelDo(wrapFlatMapFn(f), ptype)
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = {
parallelDo(name, wrapFlatMapFn(f), ptype)
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T], opts: ParallelDoOptions) = {
parallelDo(name, wrapFlatMapFn(f), ptype, opts)
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def flatMap[K, V](f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V]) = {
parallelDo(wrapPairFlatMapFn(f), ptype)
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V]) = {
parallelDo(name, wrapPairFlatMapFn(f), ptype)
}
/**
* Applies the given flatMap function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V],
opts: ParallelDoOptions) = {
parallelDo(name, wrapPairFlatMapFn(f), ptype, opts)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def map[T](f: FunctionType[T], ptype: PType[T]) = {
parallelDo(wrapMapFn(f), ptype)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def map[T](name: String, f: FunctionType[T], ptype: PType[T]) = {
parallelDo(name, wrapMapFn(f), ptype)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PCollection} that is the output of this processing.
*/
def map[T](name: String, f: FunctionType[T], ptype: PType[T], opts: ParallelDoOptions) = {
parallelDo(name, wrapMapFn(f), ptype, opts)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def map[K, V](f: FunctionType[(K, V)], ptype: PTableType[K, V]) = {
parallelDo(wrapPairMapFn(f), ptype)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V]) = {
parallelDo(name, wrapPairMapFn(f), ptype)
}
/**
* Applies the given map function to the elements of this instance and
* returns a new {@code PTable} that is the output of this processing.
*/
def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V],
opts: ParallelDoOptions) = {
parallelDo(name, wrapPairMapFn(f), ptype, opts)
}
/**
* Re-partitions this instance into the given number of partitions.
*
* @param numPartitions the number of partitions to use
* @return a re-partitioned version of the data in this instance
*/
def shard(numPartitions: Int) = wrap(Shard.shard(native, numPartitions))
/**
* Adds this PCollection as a dependency for the given PipelineCallable
* and then registers it to the Pipeline associated with this instance.
*/
def sequentialDo[Output](label: String, fn: PipelineCallable[Output]) = {
native.sequentialDo(label, fn)
}
/**
* Gets the number of elements represented by this PCollection.
*
* @return The number of elements in this PCollection.
*/
def length(): PObject[Long] = {
setupRun()
PObject(native.length())
}
/**
* Returns a {@code PObject} containing the elements of this instance as a {@code Seq}.
* @return
*/
def asSeq(): PObject[Seq[S]] = {
setupRun()
PObject(native.asCollection())
}
/**
* Returns the {@code PTypeFamily} of this instance.
*/
def getTypeFamily() = {
if (native.getTypeFamily == WritableTypeFamily.getInstance()) {
Writables
} else {
Avros
}
}
}