blob: d3611c278a4a20889b947eeee5b2c02f534ab629 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.types.orc;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.crunch.MapFn;
import org.apache.crunch.Tuple;
import org.apache.crunch.TupleN;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.io.orc.OrcWritable;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.TupleFactory;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* Utilities to create PTypes for ORC serialization / deserialization
*
*/
public class Orcs {
/**
* Create a PType to directly use OrcStruct as the deserialized format. This
* is the fastest way for serialization/deserializations. However, users
* need to use ObjectInspectors to handle the OrcStruct. Currently, void and
* union types are not supported.
*
* @param typeInfo
* @return
*/
public static final PType<OrcStruct> orcs(TypeInfo typeInfo) {
return Writables.derived(OrcStruct.class, new OrcInFn(typeInfo), new OrcOutFn(typeInfo),
Writables.writables(OrcWritable.class));
}
/**
* Create a PType which uses reflection to serialize/deserialize java POJOs
* to/from ORC. There are some restrictions of the POJO: 1) it must have a
* default, no-arg constructor; 2) All of its fields must be Hive primitive
* types or collection types that have Hive equivalents; 3) Void and Union
* are not supported yet.
*
* @param clazz
* @return
*/
public static final <T> PType<T> reflects(Class<T> clazz) {
TypeInfo reflectInfo = createReflectTypeInfo(clazz);
return Writables.derived(clazz, new ReflectInFn<T>(clazz),
new ReflectOutFn<T>(clazz), orcs(reflectInfo));
}
private static TypeInfo createReflectTypeInfo(Class<?> clazz) {
ObjectInspector reflectOi = ObjectInspectorFactory
.getReflectionObjectInspector(clazz, ObjectInspectorOptions.JAVA);
return TypeInfoUtils.getTypeInfoFromObjectInspector(reflectOi);
}
/**
* Create a tuple-based PType. Users can use other Crunch PTypes (such as
* Writables.ints(), Orcs.reflects(), Writables.pairs(), ...) to construct
* the PType. Currently, nulls and unions are not supported.
*
* @param ptypes
* @return
*/
public static final PType<TupleN> tuples(PType... ptypes) {
TypeInfo tupleInfo = createTupleTypeInfo(ptypes);
return derived(TupleN.class, new TupleInFn<TupleN>(TupleFactory.TUPLEN, ptypes),
new TupleOutFn<TupleN>(ptypes), orcs(tupleInfo), ptypes);
}
// derived, but override subtypes
static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
PType<S> base, PType[] subTypes) {
WritableType<S, ?> wt = (WritableType<S, ?>) base;
MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
return new WritableType(clazz, wt.getSerializationClass(), input, output, subTypes);
}
private static TypeInfo createTupleTypeInfo(PType... ptypes) {
ObjectInspector tupleOi = new TupleObjectInspector(null, ptypes);
return TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
}
private static class OrcInFn extends MapFn<OrcWritable, OrcStruct> {
private TypeInfo typeInfo;
private transient ObjectInspector oi;
private transient BinarySortableSerDe serde;
public OrcInFn(TypeInfo typeInfo) {
this.typeInfo = typeInfo;
}
@Override
public void initialize() {
oi = OrcStruct.createObjectInspector(typeInfo);
serde = OrcUtils.createBinarySerde(typeInfo);
}
@Override
public OrcStruct map(OrcWritable input) {
input.setObjectInspector(oi);
input.setSerde(serde);
return input.get();
}
}
private static class OrcOutFn extends MapFn<OrcStruct, OrcWritable> {
private TypeInfo typeInfo;
private transient ObjectInspector oi;
private transient BinarySortableSerDe serde;
public OrcOutFn(TypeInfo typeInfo) {
this.typeInfo = typeInfo;
}
@Override
public void initialize() {
oi = OrcStruct.createObjectInspector(typeInfo);
serde = OrcUtils.createBinarySerde(typeInfo);
}
@Override
public OrcWritable map(OrcStruct input) {
OrcWritable output = new OrcWritable();
output.setObjectInspector(oi);
output.setSerde(serde);
output.set(input);
return output;
}
}
private static Map<Class<?>, Field[]> fieldsCache = new HashMap<Class<?>, Field[]>();
private static class ReflectInFn<T> extends MapFn<OrcStruct, T> {
private Class<T> typeClass;
private TypeInfo typeInfo;
private transient ObjectInspector reflectOi;
private transient ObjectInspector orcOi;
@Override
public void initialize() {
reflectOi = ObjectInspectorFactory
.getReflectionObjectInspector(typeClass, ObjectInspectorOptions.JAVA);
orcOi = OrcStruct.createObjectInspector(typeInfo);
}
public ReflectInFn(Class<T> typeClass) {
this.typeClass = typeClass;
typeInfo = createReflectTypeInfo(typeClass);
}
@Override
public T map(OrcStruct input) {
return (T) OrcUtils.convert(input, orcOi, reflectOi);
}
}
private static class ReflectOutFn<T> extends MapFn<T, OrcStruct> {
private Class<T> typeClass;
private TypeInfo typeInfo;
private transient ObjectInspector reflectOi;
private transient SettableStructObjectInspector orcOi;
@Override
public void initialize() {
reflectOi = ObjectInspectorFactory.getReflectionObjectInspector(typeClass,
ObjectInspectorOptions.JAVA);
orcOi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(typeInfo);
}
public ReflectOutFn(Class<T> typeClass) {
this.typeClass = typeClass;
typeInfo = createReflectTypeInfo(typeClass);
}
@Override
public OrcStruct map(T input) {
return (OrcStruct) OrcUtils.convert(input, reflectOi, orcOi);
}
}
private static class TupleInFn<T extends Tuple> extends MapFn<OrcStruct, T> {
private PType[] ptypes;
private TupleFactory<T> tupleFactory;
private transient ObjectInspector tupleOi;
private transient ObjectInspector orcOi;
public TupleInFn(TupleFactory<T> tupleFactory, PType... ptypes) {
this.tupleFactory = tupleFactory;
this.ptypes = ptypes;
}
@Override
public void initialize() {
tupleOi = new TupleObjectInspector<T>(tupleFactory, ptypes);
TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
orcOi = OrcStruct.createObjectInspector(info);
}
@Override
public T map(OrcStruct input) {
return (T) OrcUtils.convert(input, orcOi, tupleOi);
}
}
private static class TupleOutFn<T extends Tuple> extends MapFn<T, OrcStruct> {
private PType[] ptypes;
private transient ObjectInspector tupleOi;
private transient ObjectInspector orcOi;
public TupleOutFn(PType... ptypes) {
this.ptypes = ptypes;
}
@Override
public void initialize() {
tupleOi = new TupleObjectInspector<T>(null, ptypes);
TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
orcOi = OrcStruct.createObjectInspector(info);
}
@Override
public OrcStruct map(T input) {
return (OrcStruct) OrcUtils.convert(input, tupleOi, orcOi);
}
}
}