blob: 460fafb4862e4dd8b28969703415cceaa024df8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.sql.schema;
import java.io.File;
import java.io.IOException;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.codehaus.jettison.json.JSONException;
import org.apache.apex.malhar.lib.utils.ClassLoaderUtils;
import org.apache.apex.malhar.sql.codegen.BeanClassGenerator;
import org.apache.apex.malhar.sql.operators.OperatorUtils;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.commons.lang.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@InterfaceStability.Evolving
/**
* @since 3.6.0
*/
public class TupleSchemaRegistry
{
public static final String FQCN_PACKAGE = "org.apache.apex.generated.schema.";
private Map<String, Schema> schemas = new HashMap<>();
public Schema createNewSchema(String name)
{
if (schemas.containsKey(name)) {
return schemas.get(name);
}
Schema schema = new Schema();
schema.name = name;
schemas.put(name, schema);
return schema;
}
public Schema getSchemaDefinition(String name)
{
return schemas.get(name);
}
public String generateCommonJar() throws IOException
{
File file = File.createTempFile("schemaSQL", ".jar");
FileSystem fs = FileSystem.newInstance(file.toURI(), new Configuration());
FSDataOutputStream out = fs.create(new Path(file.getAbsolutePath()));
JarOutputStream jout = new JarOutputStream(out);
for (Schema schema : schemas.values()) {
jout.putNextEntry(new ZipEntry(schema.fqcn.replace(".", "/") + ".class"));
jout.write(schema.beanClassBytes);
jout.closeEntry();
}
jout.close();
out.close();
return file.getAbsolutePath();
}
public static Class getSchemaForRelDataType(TupleSchemaRegistry registry, String schemaName, RelDataType rowType)
{
if (rowType.isStruct()) {
TupleSchemaRegistry.Schema newSchema = registry.createNewSchema(schemaName);
for (RelDataTypeField field : rowType.getFieldList()) {
RelDataType type = field.getType();
newSchema.addField(OperatorUtils.getValidFieldName(field), convertPrimitiveToSqlType(type));
}
try {
newSchema.generateBean();
} catch (IOException | JSONException e) {
throw new RuntimeException("Failed to generate schema", e);
}
return newSchema.beanClass;
} else {
throw new UnsupportedOperationException("Non-struct row type is not implemented.");
}
}
private static Class convertPrimitiveToSqlType(RelDataType type)
{
/* I hope that following this method instead of calling value.value() is better
because we can catch any type mismatches. */
switch (type.getSqlTypeName()) {
case BOOLEAN:
return Boolean.class;
case TINYINT:
case SMALLINT:
case INTEGER:
return Integer.class;
case BIGINT:
return Long.class;
case REAL:
return Float.class;
case FLOAT:
case DOUBLE:
return Double.class;
case DATE:
return Date.class;
case TIME:
return Date.class;
case TIMESTAMP:
return Date.class;
case CHAR:
case VARCHAR:
return String.class;
case BINARY:
case VARBINARY:
return Byte.class;
case ANY:
case SYMBOL:
return Object.class;
default:
throw new RuntimeException(String.format("Unsupported type %s", type.getSqlTypeName()));
}
}
public enum Type
{
BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class),
FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class),
DATE(Date.class), TIME(Time.class);
private Class javaType;
Type(Class javaType)
{
this.javaType = javaType;
}
public static Type getFromJavaType(Class type)
{
for (Type supportType : Type.values()) {
if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) {
return supportType;
}
}
return OBJECT;
}
public Class getJavaType()
{
return javaType;
}
}
public static class Schema
{
public String name;
public String fqcn;
public List<SQLFieldInfo> fieldList = new ArrayList<>();
public Class beanClass;
public byte[] beanClassBytes;
public Schema addField(String fieldName, Class fieldType)
{
fieldList.add(new SQLFieldInfo(fieldName, Type.getFromJavaType(fieldType)));
return this;
}
public Schema generateBean() throws IOException, JSONException
{
// Generate
this.fqcn = FQCN_PACKAGE + name;
// Use Bean Class generator to generate the class
this.beanClassBytes = BeanClassGenerator.createAndWriteBeanClass(this.fqcn, fieldList);
this.beanClass = ClassLoaderUtils.readBeanClass(fqcn, beanClassBytes);
return this;
}
}
public static class SQLFieldInfo
{
String columnName;
Type type;
public SQLFieldInfo(String columnName, Type type)
{
this.columnName = columnName;
this.type = type;
}
public String getColumnName()
{
return columnName;
}
public Type getType()
{
return type;
}
}
}