blob: 248e926b24d0d739b8c632f746bd68163a689804 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch
import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable, Pair => CPair}
import java.lang.{Iterable => JIterable}
import scala.collection.{Iterable, Iterator}
import scala.collection.JavaConversions._
import Conversions._
class PGroupedTable[K, V](val native: JGroupedTable[K, V])
extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] {
import PGroupedTable._
def filter(f: (K, Iterable[V]) => Boolean) = {
parallelDo(filterFn[K, V](f), native.getPType())
}
def map[T, To](f: (K, Iterable[V]) => T)
(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, mapFn(f), pt.get(getTypeFamily()))
}
def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T])
(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
b(this, flatMapFn(f), pt.get(getTypeFamily()))
}
def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
def ungroup() = new PTable[K, V](native.ungroup())
def wrap(newNative: AnyRef): PGroupedTable[K, V] = {
new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
}
}
class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] {
override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()))))
}
}
trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] {
override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second()))
}
trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], Traversable[T]] {
override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
emitter.emit(v)
}
}
}
trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], T] {
override def map(input: CPair[K, JIterable[V]]) = {
apply(input.first(), iterableAsScalaIterable[V](input.second()))
}
}
object PGroupedTable {
def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
}
def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
}
def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = {
new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
}
}