blob: 758620b3bda71d91ab38c5b9ad5d3ca34bac0940 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch
import scala.collection.JavaConversions
import org.apache.crunch.{PCollection => JCollection, Pair => CPair, _}
import org.apache.crunch.lib.{Aggregate, Cartesian, Sample}
import org.apache.crunch.scrunch.Conversions._
import org.apache.hadoop.mapreduce.TaskInputOutputContext
import org.apache.crunch.types.PType
import org.apache.crunch.fn.IdentityFn
class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]]
with Incrementable[PCollection[S]] {
import PCollection._
type FunctionType[T] = S => T
type CtxtFunctionType[T] = (S, TIOC) => T
protected def wrapFlatMapFn[T](fmt: S => TraversableOnce[T]) = flatMapFn(fmt)
protected def wrapMapFn[T](fmt: S => T) = mapFn(fmt)
protected def wrapFilterFn(fmt: S => Boolean) = filterFn(fmt)
protected def wrapFlatMapWithCtxtFn[T](fmt: (S, TIOC) => TraversableOnce[T]) = flatMapWithCtxtFn(fmt)
protected def wrapMapWithCtxtFn[T](fmt: (S, TIOC) => T) = mapWithCtxtFn(fmt)
protected def wrapFilterWithCtxtFn(fmt: (S, TIOC) => Boolean) = filterWithCtxtFn(fmt)
protected def wrapPairFlatMapFn[K, V](fmt: S => TraversableOnce[(K, V)]) = pairFlatMapFn(fmt)
protected def wrapPairMapFn[K, V](fmt: S => (K, V)) = pairMapFn(fmt)
def withPType(pt: PType[S]): PCollection[S] = {
val ident: MapFn[S, S] = IdentityFn.getInstance()
wrap(native.parallelDo("withPType", ident, pt))
}
def union(others: PCollection[S]*) = {
new PCollection[S](native.union(others.map(_.native) : _*))
}
def aggregate(agg: Aggregator[S]) = wrap(native.aggregate(agg))
def by[K: PTypeH](f: S => K): PTable[K, S] = {
val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
parallelDo(mapKeyFn[S, K](f), ptype)
}
def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
by(f).groupByKey
}
def cross[S2](other: PCollection[S2]): PCollection[(S, S2)] = {
val inter = Cartesian.cross(this.native, other.native)
val f = (in: CPair[S, S2]) => (in.first(), in.second())
inter.parallelDo(mapFn(f), getTypeFamily().tuple2(pType, other.pType))
}
def materialize() = {
setupRun()
JavaConversions.iterableAsScalaIterable[S](native.materialize)
}
protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
def collect[T, To](pf: PartialFunction[S, T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = {
filter(pf.isDefinedAt(_)).map(pf)(pt, b)
}
def increment(groupName: String, counterName: String, count: Long) = {
new IncrementPCollection[S](this).apply(groupName, counterName, count)
}
def incrementIf(f: S => Boolean) = new IncrementIfPCollection[S](this, f)
def count() = {
val count = new PTable[S, java.lang.Long](Aggregate.count(native))
count.mapValues(_.longValue())
}
def max()(implicit converter: Converter[S, S]) = {
setupRun()
PObject(Aggregate.max(native))(converter)
}
def min()(implicit converter: Converter[S, S]) = {
setupRun()
PObject(Aggregate.min(native))(converter)
}
def sample(acceptanceProbability: Double) = {
wrap(Sample.sample(native, acceptanceProbability))
}
def sample(acceptanceProbability: Double, seed: Long) = {
wrap(Sample.sample(native, seed, acceptanceProbability))
}
def pType() = native.getPType()
}
trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, TraversableOnce[T]] {
override def process(input: S, emitter: Emitter[T]) {
for (v <- apply(input)) {
emitter.emit(v)
}
}
}
trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
override def map(input: S) = apply(input)
}
trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] {
override def accept(input: T) = apply(input)
}
class SDoWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) extends DoFn[S, T] {
override def process(input: S, emitter: Emitter[T]) {
for (v <- f(input, getContext)) {
emitter.emit(v)
}
}
}
class SMapWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => T) extends MapFn[S, T] {
override def map(input: S) = f(input, getContext)
}
class SFilterWithCtxtFn[T](val f: (T, TaskInputOutputContext[_, _, _, _]) => Boolean) extends FilterFn[T] {
override def accept(input: T) = f.apply(input, getContext)
}
trait SDoPairFn[S, K, V] extends DoFn[S, CPair[K, V]] with Function1[S, TraversableOnce[(K, V)]] {
override def process(input: S, emitter: Emitter[CPair[K, V]]) {
for (v <- apply(input)) {
emitter.emit(CPair.of(v._1, v._2))
}
}
}
trait SMapPairFn[S, K, V] extends MapFn[S, CPair[K, V]] with Function1[S, (K, V)] {
override def map(input: S): CPair[K, V] = {
val t = apply(input)
CPair.of(t._1, t._2)
}
}
trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] {
override def map(input: S): CPair[K, S] = {
CPair.of(apply(input), input)
}
}
object PCollection {
type TIOC = TaskInputOutputContext[_, _, _, _]
def flatMapFn[S, T](fn: S => TraversableOnce[T]) = {
new SDoFn[S, T] { def apply(s: S) = fn(s) }
}
def mapFn[S, T](fn: S => T) = {
new SMapFn[S, T] { def apply(s: S) = fn(s) }
}
def filterFn[S](fn: S => Boolean) = {
new SFilterFn[S] { def apply(x: S) = fn(x) }
}
def flatMapWithCtxtFn[S, T](fn: (S, TIOC) => TraversableOnce[T]) = {
new SDoWithCtxtFn[S, T](fn)
}
def mapWithCtxtFn[S, T](fn: (S, TIOC) => T) = {
new SMapWithCtxtFn[S, T](fn)
}
def filterWithCtxtFn[S](fn: (S, TIOC) => Boolean) = {
new SFilterWithCtxtFn[S](fn)
}
def mapKeyFn[S, K](fn: S => K) = {
new SMapKeyFn[S, K] { def apply(x: S) = fn(x) }
}
def pairMapFn[S, K, V](fn: S => (K, V)) = {
new SMapPairFn[S, K, V] { def apply(s: S) = fn(s) }
}
def pairFlatMapFn[S, K, V](fn: S => TraversableOnce[(K, V)]) = {
new SDoPairFn[S, K, V] { def apply(s: S) = fn(s) }
}
}