blob: 8715e2855b551fb9bb6da5704adfb2fab9fbf7bd [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.crunch.types;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.util.SerializableSupplier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
* Utility functions for creating common types of derived PTypes, e.g., for JSON
* data, protocol buffers, and Thrift records.
public class PTypes {
* A PType for Java's {@link BigInteger} type.
public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
* A PType for Java's {@link BigDecimal} type.
public static PType<BigDecimal> bigDecimal(PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(BigDecimal.class, BYTE_TO_BIGDECIMAL, BIGDECIMAL_TO_BYTE, typeFamily.bytes());
* A PType for Java's {@link UUID} type.
public static PType<UUID> uuid(PTypeFamily ptf) {
return ptf.derivedImmutable(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
* Constructs a PType for reading a Java type from a JSON string using Jackson's {@link ObjectMapper}.
public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily
.derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
* Constructs a PType for the given protocol buffer.
public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
* Constructs a PType for a protocol buffer, using the given {@code SerializableSupplier} to provide
* an {@link ExtensionRegistry} to use in reading the given protobuf.
public static <T extends Message> PType<T> protos(
Class<T> clazz,
PTypeFamily typeFamily,
SerializableSupplier<ExtensionRegistry> supplier) {
return typeFamily.derivedImmutable(clazz,
new ProtoInputMapFn<T>(clazz, supplier),
new ProtoOutputMapFn<T>(),
* Constructs a PType for a Thrift record.
public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
* Constructs a PType for a Java {@code Enum} type.
public static <T extends Enum> PType<T> enums(Class<T> type, PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(),
public static final MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
public BigInteger map(ByteBuffer input) {
return input == null ? null : new BigInteger(input.array());
public static final MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
public ByteBuffer map(BigInteger input) {
return input == null ? null : ByteBuffer.wrap(input.toByteArray());
public static final MapFn<ByteBuffer, BigDecimal> BYTE_TO_BIGDECIMAL = new MapFn<ByteBuffer, BigDecimal>() {
public BigDecimal map(ByteBuffer input) {
return input == null ? null : byteBufferToBigDecimal(input);
public static final MapFn<BigDecimal, ByteBuffer> BIGDECIMAL_TO_BYTE = new MapFn<BigDecimal, ByteBuffer>() {
public ByteBuffer map(BigDecimal input) {
return input == null ? null : bigDecimalToByteBuffer(input);
private static class JacksonInputMapFn<T> extends MapFn<String, T> {
private final Class<T> clazz;
private transient ObjectMapper mapper;
JacksonInputMapFn(Class<T> clazz) {
this.clazz = clazz;
public void initialize() {
this.mapper = new ObjectMapper();
public T map(String input) {
try {
return mapper.readValue(input, clazz);
} catch (Exception e) {
throw new CrunchRuntimeException(e);
private static class JacksonOutputMapFn<T> extends MapFn<T, String> {
private transient ObjectMapper mapper;
public void initialize() {
this.mapper = new ObjectMapper();
public String map(T input) {
try {
return mapper.writeValueAsString(input);
} catch (Exception e) {
throw new CrunchRuntimeException(e);
private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
private final SerializableSupplier<ExtensionRegistry> extensionSupplier;
private transient T instance;
private transient ExtensionRegistry registry;
ProtoInputMapFn(Class<T> clazz) {
this(clazz, null);
ProtoInputMapFn(Class<T> clazz, SerializableSupplier<ExtensionRegistry> extensionSupplier) {
this.clazz = clazz;
this.extensionSupplier = extensionSupplier;
public void initialize() {
this.instance = Protos.getDefaultInstance(clazz);
if (this.extensionSupplier != null) {
this.registry = extensionSupplier.get();
} else {
this.registry = ExtensionRegistry.getEmptyRegistry();
public T map(ByteBuffer bb) {
try {
return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit(), registry).build();
} catch (InvalidProtocolBufferException e) {
throw new CrunchRuntimeException(e);
private static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
ProtoOutputMapFn() {
public ByteBuffer map(T proto) {
return ByteBuffer.wrap(proto.toByteArray());
private static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
private transient T instance;
private transient TDeserializer deserializer;
private transient byte[] bytes;
ThriftInputMapFn(Class<T> clazz) {
this.clazz = clazz;
public void initialize() {
this.instance = ReflectionUtils.newInstance(clazz, null);
this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
this.bytes = new byte[0];
public T map(ByteBuffer bb) {
T next = (T) instance.deepCopy();
int len = bb.limit() - bb.position();
if (len != bytes.length) {
bytes = new byte[len];
System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
try {
deserializer.deserialize(next, bytes);
} catch (TException e) {
throw new CrunchRuntimeException(e);
return next;
private static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
private transient TSerializer serializer;
ThriftOutputMapFn() {
public void initialize() {
this.serializer = new TSerializer(new TBinaryProtocol.Factory());
public ByteBuffer map(T t) {
try {
return ByteBuffer.wrap(serializer.serialize(t));
} catch (TException e) {
throw new CrunchRuntimeException(e);
private static class EnumInputMapper<T extends Enum> extends MapFn<String, T> {
private final Class<T> type;
EnumInputMapper(Class<T> type) {
this.type = type;
public T map(String input) {
return (T) Enum.valueOf(type, input);
private static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> {
public String map(T input) {
private static final MapFn<ByteBuffer, UUID> BYTE_TO_UUID = new MapFn<ByteBuffer, UUID>() {
public UUID map(ByteBuffer input) {
return new UUID(input.getLong(), input.getLong());
private static final MapFn<UUID, ByteBuffer> UUID_TO_BYTE = new MapFn<UUID, ByteBuffer>() {
public ByteBuffer map(UUID input) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
return bb;
private static BigDecimal byteBufferToBigDecimal(ByteBuffer input) {
int scale = input.getInt();
byte[] bytes = new byte[input.remaining()];
input.get(bytes, 0, input.remaining());
BigInteger bi = new BigInteger(bytes);
BigDecimal bigDecValue = new BigDecimal(bi, scale);
return bigDecValue;
private static ByteBuffer bigDecimalToByteBuffer(BigDecimal input) {
byte[] unScaledBytes = input.unscaledValue().toByteArray();
byte[] scaleBytes = ByteBuffer.allocate(4).putInt(input.scale()).array();
byte[] bytes = new byte[scaleBytes.length + unScaledBytes.length];
System.arraycopy(scaleBytes, 0, bytes, 0, scaleBytes.length);
System.arraycopy(unScaledBytes, 0, bytes, scaleBytes.length, unScaledBytes.length);
return ByteBuffer.wrap(bytes);