blob: 82604ac0662832dff771b4330d19c38e27ebdd83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.types;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;
import com.google.protobuf.ExtensionRegistry;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.util.SerializableSupplier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
/**
* Utility functions for creating common types of derived PTypes, e.g., for JSON
* data, protocol buffers, and Thrift records.
*
*/
public class PTypes {
/**
* A PType for Java's {@link BigInteger} type.
*/
public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
}
/**
* A PType for Java's {@link UUID} type.
*/
public static PType<UUID> uuid(PTypeFamily ptf) {
return ptf.derivedImmutable(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
}
/**
* Constructs a PType for reading a Java type from a JSON string using Jackson's {@link ObjectMapper}.
*/
public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily
.derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
}
/**
* Constructs a PType for the given protocol buffer.
*/
public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
}
/**
* Constructs a PType for a protocol buffer, using the given {@code SerializableSupplier} to provide
* an {@link ExtensionRegistry} to use in reading the given protobuf.
*/
public static <T extends Message> PType<T> protos(
Class<T> clazz,
PTypeFamily typeFamily,
SerializableSupplier<ExtensionRegistry> supplier) {
return typeFamily.derivedImmutable(clazz,
new ProtoInputMapFn<T>(clazz, supplier),
new ProtoOutputMapFn<T>(),
typeFamily.bytes());
}
/**
* Constructs a PType for a Thrift record.
*/
public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
}
/**
* Constructs a PType for a Java {@code Enum} type.
*/
public static <T extends Enum> PType<T> enums(Class<T> type, PTypeFamily typeFamily) {
return typeFamily.derivedImmutable(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(),
typeFamily.strings());
}
public static final MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
@Override
public BigInteger map(ByteBuffer input) {
return input == null ? null : new BigInteger(input.array());
}
};
public static final MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
@Override
public ByteBuffer map(BigInteger input) {
return input == null ? null : ByteBuffer.wrap(input.toByteArray());
}
};
private static class JacksonInputMapFn<T> extends MapFn<String, T> {
private final Class<T> clazz;
private transient ObjectMapper mapper;
JacksonInputMapFn(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public void initialize() {
this.mapper = new ObjectMapper();
}
@Override
public T map(String input) {
try {
return mapper.readValue(input, clazz);
} catch (Exception e) {
throw new CrunchRuntimeException(e);
}
}
}
private static class JacksonOutputMapFn<T> extends MapFn<T, String> {
private transient ObjectMapper mapper;
@Override
public void initialize() {
this.mapper = new ObjectMapper();
}
@Override
public String map(T input) {
try {
return mapper.writeValueAsString(input);
} catch (Exception e) {
throw new CrunchRuntimeException(e);
}
}
}
private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
private final SerializableSupplier<ExtensionRegistry> extensionSupplier;
private transient T instance;
private transient ExtensionRegistry registry;
ProtoInputMapFn(Class<T> clazz) {
this(clazz, null);
}
ProtoInputMapFn(Class<T> clazz, SerializableSupplier<ExtensionRegistry> extensionSupplier) {
this.clazz = clazz;
this.extensionSupplier = extensionSupplier;
}
@Override
public void initialize() {
this.instance = Protos.getDefaultInstance(clazz);
if (this.extensionSupplier != null) {
this.registry = extensionSupplier.get();
} else {
this.registry = ExtensionRegistry.getEmptyRegistry();
}
}
@Override
public T map(ByteBuffer bb) {
try {
return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit(), registry).build();
} catch (InvalidProtocolBufferException e) {
throw new CrunchRuntimeException(e);
}
}
}
private static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
ProtoOutputMapFn() {
}
@Override
public ByteBuffer map(T proto) {
return ByteBuffer.wrap(proto.toByteArray());
}
}
private static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
private transient T instance;
private transient TDeserializer deserializer;
private transient byte[] bytes;
ThriftInputMapFn(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public void initialize() {
this.instance = ReflectionUtils.newInstance(clazz, null);
this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
this.bytes = new byte[0];
}
@Override
public T map(ByteBuffer bb) {
T next = (T) instance.deepCopy();
int len = bb.limit() - bb.position();
if (len != bytes.length) {
bytes = new byte[len];
}
System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
try {
deserializer.deserialize(next, bytes);
} catch (TException e) {
throw new CrunchRuntimeException(e);
}
return next;
}
}
private static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
private transient TSerializer serializer;
ThriftOutputMapFn() {
}
@Override
public void initialize() {
this.serializer = new TSerializer(new TBinaryProtocol.Factory());
}
@Override
public ByteBuffer map(T t) {
try {
return ByteBuffer.wrap(serializer.serialize(t));
} catch (TException e) {
throw new CrunchRuntimeException(e);
}
}
}
private static class EnumInputMapper<T extends Enum> extends MapFn<String, T> {
private final Class<T> type;
EnumInputMapper(Class<T> type) {
this.type = type;
}
@Override
public T map(String input) {
return (T) Enum.valueOf(type, input);
}
}
private static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> {
@Override
public String map(T input) {
return input.name();
}
}
private static final MapFn<ByteBuffer, UUID> BYTE_TO_UUID = new MapFn<ByteBuffer, UUID>() {
@Override
public UUID map(ByteBuffer input) {
return new UUID(input.getLong(), input.getLong());
}
};
private static final MapFn<UUID, ByteBuffer> UUID_TO_BYTE = new MapFn<UUID, ByteBuffer>() {
@Override
public ByteBuffer map(UUID input) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.asLongBuffer().put(input.getMostSignificantBits()).put(input.getLeastSignificantBits());
return bb;
}
};
}