blob: 8ce07b9275d26b9da1d6fa2c324f80ea2006615f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message;
import java.io.BufferedWriter;
import java.util.Iterator;
/**
* Generates Kafka MessageData classes.
*/
public final class JsonConverterGenerator implements MessageClassGenerator {
private final static String SUFFIX = "JsonConverter";
private final String packageName;
private final StructRegistry structRegistry;
private final HeaderGenerator headerGenerator;
private final CodeBuffer buffer;
JsonConverterGenerator(String packageName) {
this.packageName = packageName;
this.structRegistry = new StructRegistry();
this.headerGenerator = new HeaderGenerator(packageName);
this.buffer = new CodeBuffer();
}
@Override
public String outputName(MessageSpec spec) {
return spec.dataClassName() + SUFFIX;
}
@Override
public void generateAndWrite(MessageSpec message, BufferedWriter writer)
throws Exception {
structRegistry.register(message);
headerGenerator.addStaticImport(String.format("%s.%s.*",
packageName, message.dataClassName()));
buffer.printf("public class %s {%n",
MessageGenerator.capitalizeFirst(outputName(message)));
buffer.incrementIndent();
generateConverters(message.dataClassName(), message.struct(),
message.validVersions());
for (Iterator<StructRegistry.StructInfo> iter = structRegistry.structs();
iter.hasNext(); ) {
StructRegistry.StructInfo info = iter.next();
buffer.printf("%n");
buffer.printf("public static class %s {%n",
MessageGenerator.capitalizeFirst(info.spec().name() + SUFFIX));
buffer.incrementIndent();
generateConverters(MessageGenerator.capitalizeFirst(info.spec().name()),
info.spec(), info.parentVersions());
buffer.decrementIndent();
buffer.printf("}%n");
}
buffer.decrementIndent();
buffer.printf("}%n");
headerGenerator.generate();
headerGenerator.buffer().write(writer);
buffer.write(writer);
}
private void generateConverters(String name,
StructSpec spec,
Versions parentVersions) {
generateRead(name, spec, parentVersions);
generateWrite(name, spec, parentVersions);
generateOverloadWrite(name);
}
private void generateRead(String className,
StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("public static %s read(JsonNode _node, short _version) {%n",
className);
buffer.incrementIndent();
buffer.printf("%s _object = new %s();%n", className, className);
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't read " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
Versions curVersions = parentVersions.intersect(struct.versions());
for (FieldSpec field : struct.fields()) {
String sourceVariable = String.format("_%sNode", field.camelCaseName());
buffer.printf("JsonNode %s = _node.get(\"%s\");%n",
sourceVariable,
field.camelCaseName());
buffer.printf("if (%s == null) {%n", sourceVariable);
buffer.incrementIndent();
Versions mandatoryVersions = field.versions().subtract(field.taggedVersions());
VersionConditional.forVersions(mandatoryVersions, curVersions).
ifMember(__ -> {
buffer.printf("throw new RuntimeException(\"%s: unable to locate " +
"field \'%s\', which is mandatory in version \" + _version);%n",
className, field.camelCaseName());
}).
ifNotMember(__ -> {
buffer.printf("_object.%s = %s;%n", field.camelCaseName(),
field.fieldDefault(headerGenerator, structRegistry));
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), curVersions).
ifMember(presentVersions -> {
generateTargetFromJson(new Target(field,
sourceVariable,
className,
input -> String.format("_object.%s = %s", field.camelCaseName(), input)),
curVersions);
}).ifNotMember(__ -> {
buffer.printf("throw new RuntimeException(\"%s: field \'%s\' is not " +
"supported in version \" + _version);%n",
className, field.camelCaseName());
}).generate(buffer);
buffer.decrementIndent();
buffer.printf("}%n");
}
buffer.printf("return _object;%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateTargetFromJson(Target target, Versions curVersions) {
if (target.field().type() instanceof FieldType.BoolFieldType) {
buffer.printf("if (!%s.isBoolean()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected Boolean type, " +
"but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(
target.sourceVariable() + ".asBoolean()"));
} else if (target.field().type() instanceof FieldType.Int8FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToByte(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int16FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToShort(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Uint16FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToUnsignedShort(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Uint32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToUnsignedInt(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToInt(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int64FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToLong(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.UUIDFieldType) {
buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a JSON string " +
"type, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
headerGenerator.addImport(MessageGenerator.UUID_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(String.format(
"Uuid.fromString(%s.asText())", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Float64FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToDouble(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else {
// Handle the variable length types. All of them are potentially
// nullable, so handle that here.
IsNullConditional.forName(target.sourceVariable()).
nullableVersions(target.field().nullableVersions()).
possibleVersions(curVersions).
conditionalGenerator((name, negated) ->
String.format("%s%s.isNull()", negated ? "!" : "", name)).
ifNull(() -> {
buffer.printf("%s;%n", target.assignmentStatement("null"));
}).
ifShouldNotBeNull(() -> {
generateVariableLengthTargetFromJson(target, curVersions);
}).
generate(buffer);
}
}
private void generateVariableLengthTargetFromJson(Target target, Versions curVersions) {
if (target.field().type().isString()) {
buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a string " +
"type, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(
String.format("%s.asText()", target.sourceVariable())));
} else if (target.field().type().isBytes()) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
if (target.field().zeroCopy()) {
headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\"))",
target.sourceVariable(), target.humanReadableName())));
} else {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
}
} else if (target.field().type().isRecords()) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
headerGenerator.addImport(MessageGenerator.MEMORY_RECORDS_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type().isArray()) {
buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable());
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"%s expected a JSON " +
"array, but got \" + _node.getNodeType());%n", target.humanReadableName());
buffer.decrementIndent();
buffer.printf("}%n");
String type = target.field().concreteJavaType(headerGenerator, structRegistry);
buffer.printf("%s _collection = new %s(%s.size());%n", type, type, target.sourceVariable());
buffer.printf("%s;%n", target.assignmentStatement("_collection"));
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("for (JsonNode _element : %s) {%n", target.sourceVariable());
buffer.incrementIndent();
generateTargetFromJson(target.arrayElementTarget(
input -> String.format("_collection.add(%s)", input)),
curVersions);
buffer.decrementIndent();
buffer.printf("}%n");
} else if (target.field().type().isStruct()) {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("%s%s.read(%s, _version)",
target.field().type().toString(), SUFFIX, target.sourceVariable())));
} else {
throw new RuntimeException("Unexpected type " + target.field().type());
}
}
private void generateOverloadWrite(String className) {
buffer.printf("public static JsonNode write(%s _object, short _version) {%n",
className);
buffer.incrementIndent();
buffer.printf("return write(_object, _version, true);%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateWrite(String className,
StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
buffer.printf("public static JsonNode write(%s _object, short _version, boolean _serializeRecords) {%n",
className);
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't write " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
Versions curVersions = parentVersions.intersect(struct.versions());
headerGenerator.addImport(MessageGenerator.OBJECT_NODE_CLASS);
headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
buffer.printf("ObjectNode _node = new ObjectNode(JsonNodeFactory.instance);%n");
for (FieldSpec field : struct.fields()) {
Target target = new Target(field,
String.format("_object.%s", field.camelCaseName()),
field.camelCaseName(),
input -> String.format("_node.set(\"%s\", %s)", field.camelCaseName(), input));
VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
ifMember(presentVersions -> {
VersionConditional.forVersions(field.taggedVersions(), presentVersions).
ifMember(presentAndTaggedVersions -> {
field.generateNonDefaultValueCheck(headerGenerator,
structRegistry, buffer, "_object.", field.nullableVersions());
buffer.incrementIndent();
if (field.defaultString().equals("null")) {
// If the default was null, and we already checked that this field was not
// the default, we can omit further null checks.
generateTargetToJson(target.nonNullableCopy(), presentAndTaggedVersions);
} else {
generateTargetToJson(target, presentAndTaggedVersions);
}
buffer.decrementIndent();
buffer.printf("}%n");
}).
ifNotMember(presentAndNotTaggedVersions -> {
generateTargetToJson(target, presentAndNotTaggedVersions);
}).
generate(buffer);
});
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
field.generateNonIgnorableFieldCheck(headerGenerator,
structRegistry, "_object.", buffer);
});
}
cond.generate(buffer);
}
buffer.printf("return _node;%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
private void generateTargetToJson(Target target, Versions versions) {
if (target.field().type() instanceof FieldType.BoolFieldType) {
headerGenerator.addImport(MessageGenerator.BOOLEAN_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("BooleanNode.valueOf(%s)", target.sourceVariable())));
} else if ((target.field().type() instanceof FieldType.Int8FieldType) ||
(target.field().type() instanceof FieldType.Int16FieldType)) {
headerGenerator.addImport(MessageGenerator.SHORT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new ShortNode(%s)", target.sourceVariable())));
} else if ((target.field().type() instanceof FieldType.Int32FieldType) ||
(target.field().type() instanceof FieldType.Uint16FieldType)) {
headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new IntNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Int64FieldType ||
(target.field().type() instanceof FieldType.Uint32FieldType)) {
headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new LongNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.UUIDFieldType) {
headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new TextNode(%s.toString())", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Float64FieldType) {
headerGenerator.addImport(MessageGenerator.DOUBLE_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new DoubleNode(%s)", target.sourceVariable())));
} else {
// Handle the variable length types. All of them are potentially
// nullable, so handle that here.
IsNullConditional.forName(target.sourceVariable()).
nullableVersions(target.field().nullableVersions()).
possibleVersions(versions).
conditionalGenerator((name, negated) ->
String.format("%s %s= null", name, negated ? "!" : "=")).
ifNull(() -> {
headerGenerator.addImport(MessageGenerator.NULL_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement("NullNode.instance"));
}).
ifShouldNotBeNull(() -> {
generateVariableLengthTargetToJson(target, versions);
}).
generate(buffer);
}
}
private void generateVariableLengthTargetToJson(Target target, Versions versions) {
if (target.field().type().isString()) {
headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new TextNode(%s)", target.sourceVariable())));
} else if (target.field().type().isBytes()) {
headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
if (target.field().zeroCopy()) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new BinaryNode(MessageUtil.byteBufferToArray(%s))",
target.sourceVariable())));
} else {
headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))",
target.sourceVariable(), target.sourceVariable())));
}
} else if (target.field().type().isRecords()) {
headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
// KIP-673: When logging requests/responses, we do not serialize the record, instead we
// output its sizeInBytes, because outputting the bytes is not very useful and can be
// quite expensive. Otherwise, we will serialize the record.
buffer.printf("if (_serializeRecords) {%n");
buffer.incrementIndent();
buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})"));
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
buffer.printf("_node.set(\"%sSizeInBytes\", new IntNode(%s.sizeInBytes()));%n",
target.field().camelCaseName(),
target.sourceVariable());
buffer.decrementIndent();
buffer.printf("}%n");
} else if (target.field().type().isArray()) {
headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS);
headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
FieldType.ArrayType arrayType = (FieldType.ArrayType) target.field().type();
FieldType elementType = arrayType.elementType();
String arrayInstanceName = String.format("_%sArray",
target.field().camelCaseName());
buffer.printf("ArrayNode %s = new ArrayNode(JsonNodeFactory.instance);%n",
arrayInstanceName);
buffer.printf("for (%s _element : %s) {%n",
elementType.getBoxedJavaType(headerGenerator), target.sourceVariable());
buffer.incrementIndent();
generateTargetToJson(target.arrayElementTarget(
input -> String.format("%s.add(%s)", arrayInstanceName, input)),
versions);
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%s;%n", target.assignmentStatement(arrayInstanceName));
} else if (target.field().type().isStruct()) {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("%sJsonConverter.write(%s, _version, _serializeRecords)",
target.field().type().toString(), target.sourceVariable())));
} else {
throw new RuntimeException("unknown type " + target.field().type());
}
}
}