blob: 794de225a5e67502508eea316ce65192cd1c9367 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.schema.converter;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.StringUtils.getSuffixBy;
import static org.apache.hudi.common.util.StringUtils.removeSuffixBy;
public class JsonToAvroSchemaConverter implements SchemaRegistryProvider.SchemaConverter {
private static final ObjectMapper MAPPER = JsonUtils.getObjectMapper();
private static final Map<String, String> JSON_TO_AVRO_TYPE = Stream.of(new String[][] {
{"string", "string"},
{"null", "null"},
{"boolean", "boolean"},
{"integer", "long"},
{"number", "double"}
}).collect(Collectors.collectingAndThen(Collectors.toMap(p -> p[0], p -> p[1]), Collections::<String, String>unmodifiableMap));
private static final Pattern SYMBOL_REGEX = Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
@Override
public String convert(String jsonSchema) throws IOException {
JsonNode jsonNode = MAPPER.readTree(jsonSchema);
ObjectNode avroRecord = MAPPER.createObjectNode()
.put("type", "record")
.put("name", getAvroSchemaRecordName(jsonNode))
.put("doc", getAvroDoc(jsonNode));
Option<String> namespace = getAvroSchemaRecordNamespace(jsonNode);
if (namespace.isPresent()) {
avroRecord.put("namespace", namespace.get());
}
if (jsonNode.hasNonNull("properties")) {
avroRecord.set("fields", convertProperties(jsonNode.get("properties"), getRequired(jsonNode)));
} else {
avroRecord.set("fields", MAPPER.createArrayNode());
}
return avroRecord.toString();
}
private static ArrayNode convertProperties(JsonNode jsonProperties, Set<String> required) {
List<JsonNode> avroFields = new ArrayList<>();
jsonProperties.fieldNames().forEachRemaining(name ->
avroFields.add(tryConvertNestedProperty(name, jsonProperties.get(name))
.or(tryConvertArrayProperty(name, jsonProperties.get(name)))
.or(tryConvertEnumProperty(name, jsonProperties.get(name)))
.orElse(convertProperty(name, jsonProperties.get(name), required.contains(name)))));
return MAPPER.createArrayNode().addAll(avroFields);
}
private static Option<JsonNode> tryConvertNestedProperty(String name, JsonNode jsonProperty) {
if (!isJsonNestedType(jsonProperty)) {
return Option.empty();
}
JsonNode avroNode = MAPPER.createObjectNode()
.put("name", sanitizeAsAvroName(name))
.put("doc", getAvroDoc(jsonProperty))
.set("type", MAPPER.createObjectNode()
.put("type", "record")
.put("name", getAvroTypeName(jsonProperty, name))
.set("fields", convertProperties(jsonProperty.get("properties"), getRequired(jsonProperty))));
return Option.of(avroNode);
}
private static Option<JsonNode> tryConvertArrayProperty(String name, JsonNode jsonProperty) {
if (!isJsonArrayType(jsonProperty)) {
return Option.empty();
}
JsonNode avroItems;
JsonNode jsonItems = jsonProperty.get("items");
String itemName = getAvroTypeName(jsonItems, name) + "_child";
if (isJsonNestedType(jsonItems)) {
avroItems = MAPPER.createObjectNode()
.put("type", "record")
.put("name", itemName)
.set("fields", convertProperties(jsonItems.get("properties"), getRequired(jsonItems)));
} else {
avroItems = convertProperty(itemName, jsonItems, true);
}
JsonNode avroNode = MAPPER.createObjectNode()
.put("name", sanitizeAsAvroName(name))
.put("doc", getAvroDoc(jsonProperty))
.set("type", MAPPER.createObjectNode()
.put("type", "array")
.set("items", avroItems));
return Option.of(avroNode);
}
private static Option<JsonNode> tryConvertEnumProperty(String name, JsonNode jsonProperty) {
if (!isJsonEnumType(jsonProperty)) {
return Option.empty();
}
List<String> enums = new ArrayList<>();
jsonProperty.get("enum").iterator().forEachRemaining(e -> enums.add(e.asText()));
JsonNode avroType = enums.stream().allMatch(e -> SYMBOL_REGEX.matcher(e).matches())
? MAPPER.createObjectNode()
.put("type", "enum")
.put("name", getAvroTypeName(jsonProperty, name))
.set("symbols", jsonProperty.get("enum"))
: TextNode.valueOf("string");
JsonNode avroNode = MAPPER.createObjectNode()
.put("name", sanitizeAsAvroName(name))
.put("doc", getAvroDoc(jsonProperty))
.set("type", avroType);
return Option.of(avroNode);
}
private static JsonNode convertProperty(String name, JsonNode jsonProperty, boolean isRequired) {
ObjectNode avroNode = MAPPER.createObjectNode()
.put("name", sanitizeAsAvroName(name))
.put("doc", getAvroDoc(jsonProperty));
// infer `default`
boolean nullable = !isRequired;
if (jsonProperty.has("default")) {
avroNode.set("default", jsonProperty.get("default"));
} else if (nullable) {
avroNode.set("default", NullNode.getInstance());
}
// infer `types`
Set<String> avroTypeSet = new HashSet<>();
if (jsonProperty.hasNonNull("oneOf") || jsonProperty.hasNonNull("allOf")) {
// prefer to look for `oneOf` and `allOf` for types
Option<JsonNode> oneOfTypes = Option.ofNullable(jsonProperty.get("oneOf"));
if (oneOfTypes.isPresent()) {
oneOfTypes.get().elements().forEachRemaining(e -> avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
}
Option<JsonNode> allOfTypes = Option.ofNullable(jsonProperty.get("allOf"));
if (allOfTypes.isPresent()) {
allOfTypes.get().elements().forEachRemaining(e -> avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.get("type").asText())));
}
} else if (jsonProperty.has("type")) {
// fall back to `type` parameter
JsonNode jsonType = jsonProperty.get("type");
if (jsonType.isArray()) {
jsonType.elements().forEachRemaining(e -> avroTypeSet.add(JSON_TO_AVRO_TYPE.get(e.asText())));
} else {
avroTypeSet.add(JSON_TO_AVRO_TYPE.get(jsonType.asText()));
}
}
List<String> avroTypes = new ArrayList<>();
if (nullable || avroTypeSet.contains("null")) {
avroTypes.add("null");
}
avroTypeSet.remove("null");
avroTypes.addAll(avroTypeSet);
avroNode.set("type", avroTypes.size() > 1
? MAPPER.createArrayNode().addAll(avroTypes.stream().map(TextNode::valueOf).collect(Collectors.toList()))
: TextNode.valueOf(avroTypes.get(0)));
return avroNode;
}
private static boolean isJsonNestedType(JsonNode jsonNode) {
return jsonNode.has("type") && Objects.equals(jsonNode.get("type").asText(), "object");
}
private static boolean isJsonArrayType(JsonNode jsonNode) {
return jsonNode.has("type") && Objects.equals(jsonNode.get("type").asText(), "array");
}
private static boolean isJsonEnumType(JsonNode jsonNode) {
return jsonNode.hasNonNull("enum") && jsonNode.get("enum").isArray();
}
private static Option<String> getAvroSchemaRecordNamespace(JsonNode jsonNode) {
if (jsonNode.hasNonNull("$id")) {
String host = URI.create(jsonNode.get("$id").asText()).getHost();
String avroNamespace = Stream.of(host.split("\\."))
.map(JsonToAvroSchemaConverter::sanitizeAsAvroName)
.collect(Collectors.joining("."));
return Option.of(avroNamespace);
}
return Option.empty();
}
private static String getAvroSchemaRecordName(JsonNode jsonNode) {
if (jsonNode.hasNonNull("title")) {
return sanitizeAsAvroName(jsonNode.get("title").asText());
}
if (jsonNode.hasNonNull("$id")) {
// infer name from host: http://www.my-example.com => "my_example"
String host = URI.create(jsonNode.get("$id").asText()).getHost();
String domain = removeSuffixBy(host, '.');
return sanitizeAsAvroName(getSuffixBy(domain, '.'));
}
// avro schema requires non-empty record name
return "no_name";
}
private static String sanitizeAsAvroName(String s) {
return s.replaceAll("[^A-Za-z0-9_]+", "_");
}
private static Set<String> getRequired(JsonNode jsonNode) {
if (!jsonNode.hasNonNull("required")) {
return Collections.emptySet();
}
JsonNode requiredNode = jsonNode.get("required");
Set<String> required = new HashSet<>(requiredNode.size());
jsonNode.get("required").elements().forEachRemaining(e -> required.add(e.asText()));
return required;
}
private static String getAvroTypeName(JsonNode jsonNode, String defaultName) {
return jsonNode.hasNonNull("title") ? jsonNode.get("title").asText() : defaultName;
}
private static String getAvroDoc(JsonNode jsonNode) {
return jsonNode.hasNonNull("description") ? jsonNode.get("description").asText() : "";
}
}