blob: 7a393cca86c25ed943c2b6307397d2ed82c8e4ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.flink.util;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.DecimalNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.descriptors.Avro;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.utils.TypeStringUtils;
import org.apache.flink.types.Row;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
public final class SchemaUtil {
private static final Pattern DASH_COMPILE = Pattern.compile("-");
private SchemaUtil() {
}
public static void setSchema(Schema schema, Object info, FormatType format) {
switch (format) {
case JSON:
getJsonSchema(schema, (ObjectNode) info);
break;
case CSV:
getCsvSchema(schema, (ArrayNode) info);
break;
case ORC:
getOrcSchema(schema, (ObjectNode) info);
break;
case AVRO:
getAvroSchema(schema, (ObjectNode) info);
break;
case PARQUET:
getParquetSchema(schema, (ObjectNode) info);
break;
default:
}
}
public static FormatDescriptor setFormat(FormatType format, Config config) throws Exception {
FormatDescriptor formatDescriptor = null;
switch (format) {
case JSON:
formatDescriptor = new Json().failOnMissingField(false).deriveSchema();
break;
case CSV:
Csv csv = new Csv().deriveSchema();
Field interPro = csv.getClass().getDeclaredField("internalProperties");
interPro.setAccessible(true);
Object desc = interPro.get(csv);
Class<DescriptorProperties> descCls = DescriptorProperties.class;
Method putMethod = descCls.getDeclaredMethod("put", String.class, String.class);
putMethod.setAccessible(true);
for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
String key = entry.getKey();
if (key.startsWith("format.") && !StringUtils.equals(key, "format.type")) {
String value = config.getString(key);
putMethod.invoke(desc, key, value);
}
}
formatDescriptor = csv;
break;
case AVRO:
formatDescriptor = new Avro().avroSchema(config.getString("schema"));
break;
case ORC:
case PARQUET:
default:
break;
}
return formatDescriptor;
}
private static void getJsonSchema(Schema schema, ObjectNode json) {
Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
while (nodeIterator.hasNext()) {
Map.Entry<String, JsonNode> entry = nodeIterator.next();
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof TextNode) {
schema.field(key, Types.STRING());
} else if (value instanceof IntNode) {
schema.field(key, Types.INT());
} else if (value instanceof LongNode) {
schema.field(key, Types.LONG());
} else if (value instanceof DecimalNode) {
schema.field(key, Types.JAVA_BIG_DEC());
} else if (value instanceof FloatNode) {
schema.field(key, Types.FLOAT());
} else if (value instanceof DoubleNode) {
schema.field(key, Types.DOUBLE());
} else if (value instanceof ObjectNode) {
schema.field(key, getTypeInformation((ObjectNode) value));
} else if (value instanceof ArrayNode) {
Object obj = ((ArrayNode) value).get(0);
if (obj instanceof ObjectNode) {
schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((ObjectNode) obj)));
} else {
schema.field(key, ObjectArrayTypeInfo.getInfoFor(Object[].class, TypeInformation.of(Object.class)));
}
}
}
}
private static void getCsvSchema(Schema schema, ArrayNode schemaList) {
Iterator<JsonNode> iterator = schemaList.elements();
while (iterator.hasNext()) {
JsonNode jsonNode = iterator.next();
String field = jsonNode.get("field").textValue();
String type = jsonNode.get("type").textValue().toUpperCase();
schema.field(field, type);
}
}
public static TypeInformation<?>[] getCsvType(List<Map<String, String>> schemaList) {
TypeInformation<?>[] typeInformation = new TypeInformation[schemaList.size()];
int i = 0;
for (Map<String, String> map : schemaList) {
String type = map.get("type").toUpperCase();
typeInformation[i++] = TypeStringUtils.readTypeInfo(type);
}
return typeInformation;
}
/**
* todo
*
* @param schema schema
* @param json json
*/
private static void getOrcSchema(Schema schema, ObjectNode json) {
}
/**
* todo
*
* @param schema schema
* @param json json
*/
private static void getParquetSchema(Schema schema, ObjectNode json) {
}
private static void getAvroSchema(Schema schema, ObjectNode json) {
RowTypeInfo typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(json.toString());
String[] fieldNames = typeInfo.getFieldNames();
for (String name : fieldNames) {
schema.field(name, typeInfo.getTypeAt(name));
}
}
public static RowTypeInfo getTypeInformation(ObjectNode json) {
int size = json.size();
String[] fields = new String[size];
TypeInformation<?>[] informations = new TypeInformation[size];
int i = 0;
Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
while (nodeIterator.hasNext()) {
Map.Entry<String, JsonNode> entry = nodeIterator.next();
String key = entry.getKey();
Object value = entry.getValue();
fields[i] = key;
if (value instanceof TextNode) {
informations[i] = Types.STRING();
} else if (value instanceof IntNode) {
informations[i] = Types.INT();
} else if (value instanceof LongNode) {
informations[i] = Types.LONG();
} else if (value instanceof DecimalNode) {
informations[i] = Types.JAVA_BIG_DEC();
} else if (value instanceof FloatNode) {
informations[i] = Types.FLOAT();
} else if (value instanceof DoubleNode) {
informations[i] = Types.DOUBLE();
} else if (value instanceof ObjectNode) {
informations[i] = getTypeInformation((ObjectNode) value);
} else if (value instanceof ArrayNode) {
ObjectNode demo = (ObjectNode) ((ArrayNode) value).get(0);
informations[i] = ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation(demo));
}
i++;
}
return new RowTypeInfo(informations, fields);
}
public static String getUniqueTableName() {
return DASH_COMPILE.matcher(UUID.randomUUID().toString()).replaceAll("_");
}
}