blob: a140acdf76d1f52aae0c47ee995a729bf0f6f642 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.scrunch
import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, TupleN, Union, MapFn}
import org.apache.crunch.types.{PType, PTypeFamily => PTF, PTypes}
import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables}
import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros}
import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean}
import java.lang.reflect.{Array => RArray}
import java.util.{Collection => JCollection}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ListBuffer, Set => MSet, Map => MMap}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
import org.apache.hadoop.io.Writable
import org.apache.avro.specific.SpecificRecord
import java.nio.ByteBuffer
import com.google.common.collect.Lists
class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] {
override def initialize() {
if (!pt.isEmpty && getConfiguration() != null) {
pt.get.initialize(getConfiguration())
init = true
}
}
override def map(input: S): T = {
if (input == null) {
return null.asInstanceOf[T]
} else if (init) {
return f(pt.get.getDetachedValue(input))
} else {
return f(input)
}
}
}
object GeneratedTupleHelper {
def tupleN(args: Any*) = {
TupleN.of(args.map(_.asInstanceOf[AnyRef]): _*)
}
}
class TypeMapFn[P <: Product](val rc: Class[_], @transient var ctor: java.lang.reflect.Constructor[_] = null)
extends MapFn[TupleN, P] {
override def initialize {
this.ctor = rc.getConstructors().apply(0)
}
override def map(x: TupleN): P = {
if (x == null) {
return null.asInstanceOf[P]
}
ctor.newInstance(x.getValues : _*).asInstanceOf[P]
}
}
trait BasePTypeFamily {
def ptf: PTF
def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt)
}
}
trait PTypeFamily extends GeneratedTuplePTypeFamily {
def writables[T <: Writable](clazz: Class[T]): PType[T]
def writables[T <: Writable : ClassTag]: PType[T] = {
writables[T](implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}
def as[T](ptype: PType[T]) = ptf.as(ptype)
val strings = ptf.strings()
val bytes = ptf.bytes()
def records[T: ClassTag]: PType[T] = records(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
def records[T](clazz: Class[T]): PType[T] = ptf.records(clazz)
def derivedImmutable[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
ptf.derivedImmutable(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
}
val jlongs = ptf.longs()
val longs = {
val in = (x: JLong) => x.longValue()
val out = (x: Long) => new JLong(x)
derivedImmutable(classOf[Long], in, out, ptf.longs())
}
val jints = ptf.ints()
val ints = {
val in = (x: JInt) => x.intValue()
val out = (x: Int) => new JInt(x)
derivedImmutable(classOf[Int], in, out, ptf.ints())
}
val jfloats = ptf.floats()
val floats = {
val in = (x: JFloat) => x.floatValue()
val out = (x: Float) => new JFloat(x)
derivedImmutable(classOf[Float], in, out, ptf.floats())
}
val jdoubles = ptf.doubles()
val doubles = {
val in = (x: JDouble) => x.doubleValue()
val out = (x: Double) => new JDouble(x)
derivedImmutable(classOf[Double], in, out, ptf.doubles())
}
val jbooleans = ptf.booleans()
val booleans = {
val in = (x: JBoolean) => x.booleanValue()
val out = (x: Boolean) => new JBoolean(x)
derivedImmutable(classOf[Boolean], in, out, ptf.booleans())
}
def jenums[E <: java.lang.Enum[E] : ClassTag]: PType[E] = {
PTypes.enums(implicitly[ClassTag[E]].runtimeClass.asInstanceOf[Class[E]], ptf).asInstanceOf[PType[E]]
}
def options[T](ptype: PType[T]) = {
val in: Union => Option[T] = (x: Union) => { if (x.getIndex() == 0) None else Some(x.getValue.asInstanceOf[T]) }
val out = (x: Option[T]) => { if (x.isEmpty) new Union(0, null) else new Union(1, x.get) }
derived(classOf[Option[T]], in, out, ptf.unionOf(ptf.nulls(), ptype))
}
def eithers[L, R](left: PType[L], right: PType[R]): PType[Either[L, R]] = {
val in: Union => Either[L, R] = (x: Union) => {
if (x.getIndex() == 0) {
Left[L, R](x.getValue.asInstanceOf[L])
} else {
Right[L, R](x.getValue.asInstanceOf[R])
}
}
val out = (x: Either[L, R]) => { if (x.isLeft) new Union(0, x.left.get) else new Union(1, x.right.get) }
derived(classOf[Either[L, R]], in, out, ptf.unionOf(left, right))
}
def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
def collections[T](ptype: PType[T]) = {
derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
}
def maps[T](ptype: PType[T]): PType[Map[String, T]] = maps(strings, ptype)
def maps[K, V](keyType: PType[K], valueType: PType[V]): PType[Map[K, V]] = {
if (classOf[String].equals(keyType.getTypeClass)) {
derived(classOf[Map[String, V]],
{ x: java.util.Map[String, V] => mapAsScalaMap(x).toMap},
mapAsJavaMap[String, V],
ptf.maps(valueType)).asInstanceOf[PType[Map[K, V]]]
} else {
derived(classOf[Map[K, V]],
{x: JCollection[CPair[K, V]] => Map[K, V](x.map(y => (y.first(), y.second())).toArray : _*)},
{x: Map[K, V] => asJavaCollection(x.toIterable.map(y => CPair.of(y._1, y._2)))},
ptf.collections(ptf.pairs(keyType, valueType)))
}
}
def mutableMaps[K, V](keyType: PType[K], valueType: PType[V]): PType[MMap[K, V]] = {
derived(classOf[MMap[K, V]],
{x: JCollection[CPair[K, V]] => MMap[K, V](x.map(y => (y.first(), y.second())).toArray : _*)},
{x: MMap[K, V] => asJavaCollection(x.toIterable.map(y => CPair.of(y._1, y._2)))},
ptf.collections(ptf.pairs(keyType, valueType)))
}
def arrays[T](ptype: PType[T]): PType[Array[T]] = {
val in = (x: JCollection[_]) => {
val ret = RArray.newInstance(ptype.getTypeClass, x.size())
var i = 0
val iter = x.iterator()
while (iter.hasNext) {
RArray.set(ret, i, iter.next())
i += 1
}
ret.asInstanceOf[Array[T]]
}
val out = (x: Array[T]) => Lists.newArrayList(x: _*).asInstanceOf[JCollection[_]]
derived(classOf[Array[T]],
in, out,
ptf.collections(ptype).asInstanceOf[PType[JCollection[_]]])
.asInstanceOf[PType[Array[T]]]
}
def lists[T](ptype: PType[T]) = {
val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toList
val out = (x: List[T]) => asJavaCollection[T](x)
derived(classOf[List[T]], in, out, ptf.collections(ptype))
}
def listbuffers[T](ptype: PType[T]) = {
val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).to[ListBuffer]
val out = (x: ListBuffer[T]) => asJavaCollection[T](x)
derived(classOf[ListBuffer[T]], in, out, ptf.collections(ptype))
}
def sets[T](ptype: PType[T]) = {
val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toSet
val out = (x: Set[T]) => asJavaCollection[T](x)
derived(classOf[Set[T]], in, out, ptf.collections(ptype))
}
def mutableSets[T](ptype: PType[T]) = {
val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).to[MSet]
val out = (x: MSet[T]) => asJavaCollection[T](x)
derived(classOf[MSet[T]], in, out, ptf.collections(ptype))
}
def tuple2[T1, T2](p1: PType[T1], p2: PType[T2]) = {
val in = (x: CPair[T1, T2]) => (x.first(), x.second())
val out = (x: (T1, T2)) => CPair.of(x._1, x._2)
derived(classOf[(T1, T2)], in, out, ptf.pairs(p1, p2))
}
def tuple3[T1, T2, T3](p1: PType[T1], p2: PType[T2], p3: PType[T3]) = {
val in = (x: CTuple3[T1, T2, T3]) => (x.first(), x.second(), x.third())
val out = (x: (T1, T2, T3)) => CTuple3.of(x._1, x._2, x._3)
derived(classOf[(T1, T2, T3)], in, out, ptf.triples(p1, p2, p3))
}
def tuple4[T1, T2, T3, T4](p1: PType[T1], p2: PType[T2], p3: PType[T3], p4: PType[T4]) = {
val in = (x: CTuple4[T1, T2, T3, T4]) => (x.first(), x.second(), x.third(), x.fourth())
val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
}
def namedTuples(tupleName: String, fields: List[(String, PType[_])]): PType[TupleN]
def caseClasses[T <: Product : TypeTag]: PType[T] = products[T](implicitly[TypeTag[T]])
private def products[T <: Product](typeTag: TypeTag[T]): PType[T] = {
products(typeTag.tpe, typeTag.mirror)
}
private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = {
val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod
val args = ctor.paramLists.head.map(x => (x.name.toString,
typeToPType(x.typeSignature, mirror)))
val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*)
val rtc = mirror.runtimeClass(tpe)
val base = namedTuples(getName(rtc) + "_", args) // See CRUNCH-495
ptf.derived(rtc.asInstanceOf[Class[T]], new TypeMapFn[T](rtc), new TMapFn[T, TupleN](out), base)
}
private def getName(rtc: RuntimeClass): String = {
try {
rtc.getCanonicalName
} catch {
case e: InternalError => rtc.getName.replaceAllLiterally("$", "") // see CRUNCH-561
}
}
private val classToPrimitivePType = Map(
classOf[Int] -> ints,
classOf[java.lang.Integer] -> jints,
classOf[Long] -> longs,
classOf[java.lang.Long] -> jlongs,
classOf[Boolean] -> booleans,
classOf[java.lang.Boolean] -> jbooleans,
classOf[Double] -> doubles,
classOf[java.lang.Double] -> jdoubles,
classOf[Float] -> floats,
classOf[java.lang.Float] -> jfloats,
classOf[String] -> strings,
classOf[ByteBuffer] -> bytes
)
private val typeToPTypeCache: collection.mutable.Map[Type, PType[_]] = new collection.mutable.HashMap()
private def encache[T](tpe: Type, pt: PType[_]) = {
typeToPTypeCache.put(tpe, pt)
pt.asInstanceOf[PType[T]]
}
private def typeToPType[T](tpe: Type, mirror: Mirror): PType[T] = {
val cpt = typeToPTypeCache.get(tpe)
if (cpt.isDefined) {
return cpt.get.asInstanceOf[PType[T]]
}
val rtc = mirror.runtimeClass(tpe)
val ret = classToPrimitivePType.get(rtc)
if (ret != null) {
return ret.asInstanceOf[PType[T]]
} else if (classOf[Writable].isAssignableFrom(rtc)) {
return writables(rtc.asInstanceOf[Class[Writable]]).asInstanceOf[PType[T]]
} else if (tpe.typeSymbol.asClass.isCaseClass) {
return encache(tpe, products(tpe, mirror))
} else {
val targs = if (tpe.isInstanceOf[TypeRefApi]) {
tpe.asInstanceOf[TypeRefApi].args
} else {
List()
}
if (targs.isEmpty) {
return encache(tpe, records(rtc))
} else if (targs.size == 1) {
if (rtc.isArray) {
return encache(tpe, arrays(typeToPType(targs(0), mirror)))
} else if (classOf[List[_]].isAssignableFrom(rtc)) {
return encache(tpe, lists(typeToPType(targs(0), mirror)))
} else if (classOf[Set[_]].isAssignableFrom(rtc)) {
return encache(tpe, sets(typeToPType(targs(0), mirror)))
} else if (classOf[Option[_]].isAssignableFrom(rtc)) {
return encache(tpe, options(typeToPType(targs(0), mirror)))
} else if (classOf[Iterable[_]].isAssignableFrom(rtc)) {
return encache(tpe, collections(typeToPType(targs(0), mirror)))
}
} else if (targs.size == 2) {
if (classOf[Either[_, _]].isAssignableFrom(rtc)) {
return encache(tpe, eithers(typeToPType(targs(0), mirror), typeToPType(targs(1), mirror)))
} else if (classOf[Map[_, _]].isAssignableFrom(rtc)) {
return encache(tpe, maps(typeToPType(targs(0), mirror), typeToPType(targs(1), mirror)))
}
}
}
throw new IllegalArgumentException("Could not handle class type = " + tpe)
}
}
object Writables extends PTypeFamily {
override def ptf = WritableTypeFamily.getInstance()
override def writables[T <: Writable](clazz: Class[T]) = CWritables.writables(clazz)
override def namedTuples(tupleName: String, fields: List[(String, PType[_])]) = {
ptf.tuples(fields.map(_._2).toArray :_*)
}
}
object Avros extends PTypeFamily {
override def ptf = AvroTypeFamily.getInstance()
override def writables[T <: Writable](clazz: Class[T]) = CAvros.writables(clazz)
override def records[T: ClassTag] = reflects()(implicitly[ClassTag[T]])
def specifics[T <: SpecificRecord : ClassTag]() = {
CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}
def reflects[T: ClassTag](): AvroType[T] = {
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
val schema = ScalaSafeReflectData.getInstance().getSchema(clazz)
CAvros.reflects(clazz, schema)
}
override def namedTuples(tupleName: String, fields: List[(String, PType[_])]) = {
CAvros.namedTuples(tupleName, fields.map(_._1).toArray, fields.map(_._2).toArray)
}
}