blob: bd482029f649ebd28eb440930579e8e5d28b052f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch
import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, Pair => CPair, _}
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
import org.apache.hadoop.mapreduce.TaskInputOutputContext
class PGroupedTable[K, V](val native: JGroupedTable[K, V])
extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] {
import PGroupedTable._
type FunctionType[T] = (K, TraversableOnce[V]) => T
type CtxtFunctionType[T] = (K, TraversableOnce[V], TIOC) => T
protected def wrapFlatMapFn[T](fmt: (K, TraversableOnce[V]) => TraversableOnce[T]) = flatMapFn(fmt)
protected def wrapMapFn[T](fmt: (K, TraversableOnce[V]) => T) = mapFn(fmt)
protected def wrapFilterFn(fmt: (K, TraversableOnce[V]) => Boolean) = filterFn(fmt)
protected def wrapFlatMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => TraversableOnce[T]) = {
flatMapWithCtxtFn(fmt)
}
protected def wrapMapWithCtxtFn[T](fmt: (K, TraversableOnce[V], TIOC) => T) = mapWithCtxtFn(fmt)
protected def wrapFilterWithCtxtFn(fmt: (K, TraversableOnce[V], TIOC) => Boolean) = filterWithCtxtFn(fmt)
protected def wrapPairFlatMapFn[S, T](fmt: (K, TraversableOnce[V]) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt)
protected def wrapPairMapFn[S, T](fmt: (K, TraversableOnce[V]) => (S, T)) = pairMapFn(fmt)
def combine(f: TraversableOnce[V] => V) = combineValues(new TraversableOnceCombineFn[K, V](f))
def combineValues(agg: Aggregator[V]) = new PTable[K, V](native.combineValues(agg))
def combineValues(combineAgg: Aggregator[V], reduceAgg: Aggregator[V]) = {
new PTable[K, V](native.combineValues(combineAgg, reduceAgg))
}
def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
def ungroup() = new PTable[K, V](native.ungroup())
protected def wrap(newNative: JCollection[_]): PGroupedTable[K, V] = {
new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
}
}
class TraversableOnceCombineFn[K, V](f: TraversableOnce[V] => V) extends CombineFn[K, V] {
override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()).iterator)))
}
}
trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V], TraversableOnce[T]] {
override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)) {
emitter.emit(v)
}
}
}
trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, TraversableOnce[V], T] {
override def map(input: CPair[K, JIterable[V]]) = {
apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
}
}
trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, TraversableOnce[V], Boolean] {
override def accept(input: CPair[K, JIterable[V]]) = {
apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
}
}
class SDoGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T])
extends DoFn[CPair[K, JIterable[V]], T] {
override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)) {
emitter.emit(v)
}
}
}
class SMapGroupedWithCtxtFn[K, V, T](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => T)
extends MapFn[CPair[K, JIterable[V]], T] {
override def map(input: CPair[K, JIterable[V]]) = {
f(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)
}
}
class SFilterGroupedWithCtxtFn[K, V](val f: (K, TraversableOnce[V], TaskInputOutputContext[_, _, _, _]) => Boolean)
extends FilterFn[CPair[K, JIterable[V]]] {
override def accept(input: CPair[K, JIterable[V]]) = {
f.apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator, getContext)
}
}
trait SDoPairGroupedFn[K, V, S, T] extends DoFn[CPair[K, JIterable[V]], CPair[S, T]]
with Function2[K, TraversableOnce[V], TraversableOnce[(S, T)]] {
override def process(input: CPair[K, JIterable[V]], emitter: Emitter[CPair[S, T]]) {
for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)) {
emitter.emit(CPair.of(v._1, v._2))
}
}
}
trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[S, T]]
with Function2[K, TraversableOnce[V], (S, T)] {
override def map(input: CPair[K, JIterable[V]]) = {
val t = apply(input.first(), iterableAsScalaIterable[V](input.second()).iterator)
CPair.of(t._1, t._2)
}
}
object PGroupedTable {
type TIOC = TaskInputOutputContext[_, _, _, _]
def flatMapFn[K, V, T](fn: (K, TraversableOnce[V]) => TraversableOnce[T]) = {
new SDoGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
}
def mapFn[K, V, T](fn: (K, TraversableOnce[V]) => T) = {
new SMapGroupedFn[K, V, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
}
def filterFn[K, V](fn: (K, TraversableOnce[V]) => Boolean) = {
new SFilterGroupedFn[K, V] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
}
def flatMapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => TraversableOnce[T]) = {
new SDoGroupedWithCtxtFn[K, V, T](fn)
}
def mapWithCtxtFn[K, V, T](fn: (K, TraversableOnce[V], TIOC) => T) = {
new SMapGroupedWithCtxtFn[K, V, T](fn)
}
def filterWithCtxtFn[K, V](fn: (K, TraversableOnce[V], TIOC) => Boolean) = {
new SFilterGroupedWithCtxtFn[K, V](fn)
}
def pairMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => (S, T)) = {
new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
}
def pairFlatMapFn[K, V, S, T](fn: (K, TraversableOnce[V]) => TraversableOnce[(S, T)]) = {
new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: TraversableOnce[V]) = fn(k, v) }
}
}