blob: df9090009e4024bcf5362346bf4ad060d18b67c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fury.format.encoder;
import static org.apache.fury.type.TypeUtils.getRawType;
import com.google.common.reflect.TypeToken;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.fury.Fury;
import org.apache.fury.codegen.CodeGenerator;
import org.apache.fury.codegen.CompileUnit;
import org.apache.fury.collection.Tuple2;
import org.apache.fury.exception.ClassNotCompatibleException;
import org.apache.fury.format.row.binary.BinaryArray;
import org.apache.fury.format.row.binary.BinaryMap;
import org.apache.fury.format.row.binary.BinaryRow;
import org.apache.fury.format.row.binary.writer.BinaryArrayWriter;
import org.apache.fury.format.row.binary.writer.BinaryRowWriter;
import org.apache.fury.format.type.DataTypes;
import org.apache.fury.format.type.TypeInference;
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.apache.fury.type.TypeUtils;
/**
* Factory to create {@link Encoder}.
*
* <p>, ganrunsheng
*/
public class Encoders {
private static final Logger LOG = LoggerFactory.getLogger(Encoders.class);
public static <T> RowEncoder<T> bean(Class<T> beanClass) {
return bean(beanClass, 16);
}
public static <T> RowEncoder<T> bean(Class<T> beanClass, int initialBufferSize) {
return bean(beanClass, null, initialBufferSize);
}
public static <T> RowEncoder<T> bean(Class<T> beanClass, Fury fury) {
return bean(beanClass, fury, 16);
}
public static <T> RowEncoder<T> bean(Class<T> beanClass, Fury fury, int initialBufferSize) {
Schema schema = TypeInference.inferSchema(beanClass);
BinaryRowWriter writer = new BinaryRowWriter(schema);
RowEncoder<T> encoder = bean(beanClass, writer, fury);
return new RowEncoder<T>() {
@Override
public Schema schema() {
return encoder.schema();
}
@Override
public T fromRow(BinaryRow row) {
return encoder.fromRow(row);
}
@Override
public BinaryRow toRow(T obj) {
writer.setBuffer(MemoryUtils.buffer(initialBufferSize));
writer.reset();
return encoder.toRow(obj);
}
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
}
@Override
public byte[] encode(T obj) {
return encoder.encode(obj);
}
};
}
public static <T> RowEncoder<T> bean(Class<T> beanClass, BinaryRowWriter writer) {
return bean(beanClass, writer, null);
}
/**
* Creates an encoder for Java Bean of type T.
*
* <p>T must be publicly accessible.
*
* <p>supported types for java bean field: - primitive types: boolean, int, double, etc. - boxed
* types: Boolean, Integer, Double, etc. - String - java.math.BigDecimal, java.math.BigInteger -
* time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant -
* collection types: only array and java.util.List currently, map support is in progress - nested
* java bean.
*/
public static <T> RowEncoder<T> bean(Class<T> beanClass, BinaryRowWriter writer, Fury fury) {
Schema schema = writer.getSchema();
try {
Class<?> rowCodecClass = loadOrGenRowCodecClass(beanClass);
Object references = new Object[] {schema, writer, fury};
GeneratedRowEncoder codec =
rowCodecClass
.asSubclass(GeneratedRowEncoder.class)
.getConstructor(Object[].class)
.newInstance(references);
long schemaHash = DataTypes.computeSchemaHash(schema);
return new RowEncoder<T>() {
private final MemoryBuffer buffer = MemoryUtils.buffer(16);
@Override
public Schema schema() {
return schema;
}
@SuppressWarnings("unchecked")
@Override
public T fromRow(BinaryRow row) {
return (T) codec.fromRow(row);
}
@Override
public BinaryRow toRow(T obj) {
return codec.toRow(obj);
}
@Override
public T decode(byte[] bytes) {
MemoryBuffer buffer = MemoryUtils.wrap(bytes);
long peerSchemaHash = buffer.readInt64();
if (peerSchemaHash != schemaHash) {
throw new ClassNotCompatibleException(
String.format(
"Schema is not consistent, encoder schema is %s. "
+ "self/peer schema hash are %s/%s. "
+ "Please check writer schema.",
schema, schemaHash, peerSchemaHash));
}
BinaryRow row = new BinaryRow(schema);
row.pointTo(buffer, buffer.readerIndex(), buffer.size());
return fromRow(row);
}
@Override
public byte[] encode(T obj) {
buffer.writerIndex(0);
buffer.writeInt64(schemaHash);
writer.setBuffer(buffer);
writer.reset();
BinaryRow row = toRow(obj);
return buffer.getBytes(0, 8 + row.getSizeInBytes());
}
};
} catch (Exception e) {
String msg = String.format("Create encoder failed, \nbeanClass: %s", beanClass);
throw new EncoderException(msg, e);
}
}
/**
* Supported nested list format. For instance, nest collection can be expressed as Collection in
* Collection. Input param must explicit specified type, like this: <code>
* new TypeToken</code> instance with Collection in Collection type.
*
* @param token TypeToken instance which explicit specified the type.
* @param <T> T is a array type, can be a nested list type.
* @return
*/
public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeToken<T> token) {
return arrayEncoder(token, (Fury) null);
}
public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeToken<T> token, Fury fury) {
Schema schema = TypeInference.inferSchema(token, false);
Field field = DataTypes.fieldOfSchema(schema, 0);
BinaryArrayWriter writer = new BinaryArrayWriter(field);
Set<TypeToken<?>> set = new HashSet<>();
findBeanToken(token, set);
if (set.isEmpty()) {
throw new IllegalArgumentException("can not find bean class.");
}
TypeToken<?> typeToken = null;
for (TypeToken<?> tt : set) {
typeToken = set.iterator().next();
Encoders.loadOrGenRowCodecClass(getRawType(tt));
}
ArrayEncoder<T> encoder = arrayEncoder(token, typeToken, writer, fury);
return new ArrayEncoder<T>() {
@Override
public Field field() {
return encoder.field();
}
@Override
public T fromArray(BinaryArray array) {
return encoder.fromArray(array);
}
@Override
public BinaryArray toArray(T obj) {
return encoder.toArray(obj);
}
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
}
@Override
public byte[] encode(T obj) {
return encoder.encode(obj);
}
};
}
/**
* The underlying implementation uses array, only supported {@link Collection} format, because
* generic type such as List is erased to simply List, so a bean class input param is required.
*
* @return
*/
public static <T extends Collection, B> ArrayEncoder<T> arrayEncoder(
Class<? extends Collection> arrayCls, Class<B> elementType) {
Preconditions.checkNotNull(elementType);
return (ArrayEncoder<T>) arrayEncoder(TypeUtils.listOf(elementType), null);
}
/**
* Creates an encoder for Java Bean of type T.
*
* <p>T must be publicly accessible.
*
* <p>supported types for java bean field: - primitive types: boolean, int, double, etc. - boxed
* types: Boolean, Integer, Double, etc. - String - java.math.BigDecimal, java.math.BigInteger -
* time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant -
* collection types: only array and java.util.List currently, map support is in progress - nested
* java bean.
*/
public static <T extends Collection, B> ArrayEncoder<T> arrayEncoder(
TypeToken<? extends Collection> arrayToken,
TypeToken<B> elementType,
BinaryArrayWriter writer,
Fury fury) {
Field field = writer.getField();
try {
Class<?> rowCodecClass = loadOrGenArrayCodecClass(arrayToken, elementType);
Object references = new Object[] {field, writer, fury};
GeneratedArrayEncoder codec =
rowCodecClass
.asSubclass(GeneratedArrayEncoder.class)
.getConstructor(Object[].class)
.newInstance(references);
return new ArrayEncoder<T>() {
@Override
public Field field() {
return field;
}
@SuppressWarnings("unchecked")
@Override
public T fromArray(BinaryArray array) {
return (T) codec.fromArray(array);
}
@Override
public BinaryArray toArray(T obj) {
return codec.toArray(obj);
}
@Override
public T decode(byte[] bytes) {
MemoryBuffer buffer = MemoryUtils.wrap(bytes);
BinaryArray array = new BinaryArray(field);
array.pointTo(buffer, buffer.readerIndex(), buffer.size());
return fromArray(array);
}
@Override
public byte[] encode(T obj) {
writer.reset(obj.size());
BinaryArray array = toArray(obj);
return writer.getBuffer().getBytes(0, 8 + array.getSizeInBytes());
}
};
} catch (Exception e) {
String msg = String.format("Create encoder failed, \nelementType: %s", elementType);
throw new EncoderException(msg, e);
}
}
/**
* Supported nested map format. For instance, nest map can be expressed as Map in Map. Input param
* must explicit specified type, like this: <code>
* new TypeToken</code> instance with Collection in Collection type.
*
* @param token TypeToken instance which explicit specified the type.
* @param <T> T is a array type, can be a nested list type.
* @return
*/
public static <T extends Map> MapEncoder<T> mapEncoder(TypeToken<T> token) {
return mapEncoder(token, (Fury) null);
}
/**
* The underlying implementation uses array, only supported {@link Map} format, because generic
* type such as List is erased to simply List, so a bean class input param is required.
*
* @return
*/
public static <T extends Map, K, V> MapEncoder<T> mapEncoder(
Class<? extends Map> mapCls, Class<K> keyType, Class<V> valueType) {
Preconditions.checkNotNull(keyType);
Preconditions.checkNotNull(valueType);
return (MapEncoder<T>) mapEncoder(TypeUtils.mapOf(keyType, valueType), null);
}
public static <T extends Map> MapEncoder<T> mapEncoder(TypeToken<T> token, Fury fury) {
Preconditions.checkNotNull(token);
Tuple2<TypeToken<?>, TypeToken<?>> tuple2 = TypeUtils.getMapKeyValueType(token);
Set<TypeToken<?>> set1 = beanSet(tuple2.f0);
Set<TypeToken<?>> set2 = beanSet(tuple2.f1);
LOG.info("Find beans to load: {}, {}", set1, set2);
if (set1.isEmpty() && set2.isEmpty()) {
throw new IllegalArgumentException("can not find bean class.");
}
TypeToken<?> keyToken = token4BeanLoad(set1, tuple2.f0);
TypeToken<?> valToken = token4BeanLoad(set2, tuple2.f1);
MapEncoder<T> encoder = mapEncoder(token, keyToken, valToken, fury);
return createMapEncoder(encoder);
}
/**
* Creates an encoder for Java Bean of type T.
*
* <p>T must be publicly accessible.
*
* <p>supported types for java bean field: - primitive types: boolean, int, double, etc. - boxed
* types: Boolean, Integer, Double, etc. - String - java.math.BigDecimal, java.math.BigInteger -
* time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant -
* collection types: only array and java.util.List currently, map support is in progress - nested
* java bean.
*/
public static <T extends Map, K, V> MapEncoder<T> mapEncoder(
TypeToken<? extends Map> mapToken, TypeToken<K> keyToken, TypeToken<V> valToken, Fury fury) {
Preconditions.checkNotNull(mapToken);
Preconditions.checkNotNull(keyToken);
Preconditions.checkNotNull(valToken);
Schema schema = TypeInference.inferSchema(mapToken, false);
Field field = DataTypes.fieldOfSchema(schema, 0);
Field keyField = DataTypes.keyArrayFieldForMap(field);
Field valField = DataTypes.itemArrayFieldForMap(field);
BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyField);
BinaryArrayWriter valWriter = new BinaryArrayWriter(valField);
try {
Class<?> rowCodecClass = loadOrGenMapCodecClass(mapToken, keyToken, valToken);
Object references = new Object[] {keyField, valField, keyWriter, valWriter, fury, field};
GeneratedMapEncoder codec =
rowCodecClass
.asSubclass(GeneratedMapEncoder.class)
.getConstructor(Object[].class)
.newInstance(references);
return new MapEncoder<T>() {
@Override
public Field keyField() {
return keyField;
}
@Override
public Field valueField() {
return valField;
}
@SuppressWarnings("unchecked")
@Override
public T fromMap(BinaryArray key, BinaryArray value) {
return (T) codec.fromMap(key, value);
}
@Override
public BinaryMap toMap(T obj) {
return codec.toMap(obj);
}
@Override
public T decode(byte[] bytes) {
MemoryBuffer buffer = MemoryUtils.wrap(bytes);
BinaryMap map = new BinaryMap(field);
map.pointTo(buffer, 0, buffer.size());
return fromMap(map);
}
@Override
public byte[] encode(T obj) {
BinaryMap map = toMap(obj);
return map.getBuf().readBytes(map.getBuf().size());
}
};
} catch (Exception e) {
String msg =
String.format("Create encoder failed, \nkeyType: %s, valueType: %s", keyToken, valToken);
throw new EncoderException(msg, e);
}
}
private static Set<TypeToken<?>> beanSet(TypeToken<?> token) {
Set<TypeToken<?>> set = new HashSet<>();
if (TypeUtils.isBean(token)) {
set.add(token);
return set;
}
findBeanToken(token, set);
return set;
}
private static TypeToken<?> token4BeanLoad(Set<TypeToken<?>> set, TypeToken<?> init) {
TypeToken<?> keyToken = init;
for (TypeToken<?> tt : set) {
keyToken = tt;
Encoders.loadOrGenRowCodecClass(getRawType(tt));
LOG.info("bean {} load finished", getRawType(tt));
}
return keyToken;
}
private static <T> MapEncoder<T> createMapEncoder(MapEncoder<T> encoder) {
return new MapEncoder<T>() {
@Override
public Field keyField() {
return encoder.keyField();
}
@Override
public Field valueField() {
return encoder.valueField();
}
@Override
public T fromMap(BinaryArray key, BinaryArray value) {
return encoder.fromMap(key, value);
}
@Override
public BinaryMap toMap(T obj) {
return encoder.toMap(obj);
}
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
}
@Override
public byte[] encode(T obj) {
return encoder.encode(obj);
}
};
}
private static void findBeanToken(TypeToken<?> typeToken, java.util.Set<TypeToken<?>> set) {
while (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeToken)
|| TypeUtils.MAP_TYPE.isSupertypeOf(typeToken)) {
if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeToken)) {
typeToken = TypeUtils.getElementType(typeToken);
if (TypeUtils.isBean(typeToken)) {
set.add(typeToken);
}
} else {
Tuple2<TypeToken<?>, TypeToken<?>> tuple2 = TypeUtils.getMapKeyValueType(typeToken);
if (TypeUtils.isBean(tuple2.f0)) {
set.add(tuple2.f0);
} else {
typeToken = tuple2.f0;
findBeanToken(tuple2.f0, set);
}
if (TypeUtils.isBean(tuple2.f1)) {
set.add(tuple2.f1);
} else {
typeToken = tuple2.f1;
findBeanToken(tuple2.f1, set);
}
}
}
}
public static Class<?> loadOrGenRowCodecClass(Class<?> beanClass) {
Set<Class<?>> classes = TypeUtils.listBeansRecursiveInclusive(beanClass);
LOG.info("Create RowCodec for classes {}", classes);
CompileUnit[] compileUnits =
classes.stream()
.map(
cls -> {
RowEncoderBuilder codecBuilder = new RowEncoderBuilder(cls);
// use genCodeFunc to avoid gen code repeatedly
return new CompileUnit(
CodeGenerator.getPackage(cls),
codecBuilder.codecClassName(cls),
codecBuilder::genCode);
})
.toArray(CompileUnit[]::new);
return loadCls(compileUnits);
}
private static <B> Class<?> loadOrGenArrayCodecClass(
TypeToken<? extends Collection> arrayCls, TypeToken<B> elementType) {
LOG.info("Create ArrayCodec for classes {}", elementType);
Class<?> cls = getRawType(elementType);
// class name prefix
String prefix = TypeInference.inferTypeName(arrayCls);
ArrayEncoderBuilder codecBuilder = new ArrayEncoderBuilder(arrayCls, elementType);
CompileUnit compileUnit =
new CompileUnit(
CodeGenerator.getPackage(cls),
codecBuilder.codecClassName(cls, prefix),
codecBuilder::genCode);
return loadCls(compileUnit);
}
private static <K, V> Class<?> loadOrGenMapCodecClass(
TypeToken<? extends Map> mapCls, TypeToken<K> keyToken, TypeToken<V> valueToken) {
LOG.info("Create MapCodec for classes {}, {}", keyToken, valueToken);
boolean keyIsBean = TypeUtils.isBean(keyToken);
boolean valIsBean = TypeUtils.isBean(valueToken);
TypeToken<?> beanToken;
Class<?> cls;
if (keyIsBean) {
cls = getRawType(keyToken);
beanToken = keyToken;
} else if (valIsBean) {
cls = getRawType(valueToken);
beanToken = valueToken;
} else {
throw new IllegalArgumentException("not find bean class.");
}
// class name prefix
String prefix = TypeInference.inferTypeName(mapCls);
MapEncoderBuilder codecBuilder = new MapEncoderBuilder(mapCls, beanToken);
CompileUnit compileUnit =
new CompileUnit(
CodeGenerator.getPackage(cls),
codecBuilder.codecClassName(cls, prefix),
codecBuilder::genCode);
return loadCls(compileUnit);
}
private static Class<?> loadCls(CompileUnit... compileUnit) {
CodeGenerator codeGenerator =
CodeGenerator.getSharedCodeGenerator(Thread.currentThread().getContextClassLoader());
ClassLoader classLoader = codeGenerator.compile(compileUnit);
String className = compileUnit[0].getQualifiedClassName();
try {
return classLoader.loadClass(className);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Impossible because we just compiled class", e);
}
}
}