blob: e789307967e2d617ac4863c85d0e0a186938ef3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Serialization schema that serializes an object of Flink types into a JSON bytes.
*
* <p>Serializes the input Flink object into a JSON string and converts it into <code>byte[]</code>.
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link
* JsonRowDeserializationSchema}.
*
* @deprecated The format was developed for the Table API users and will not be maintained for
* DataStream API users anymore. Either use Table API or switch to Data Stream, defining your
* own {@link SerializationSchema}.
*/
@PublicEvolving
@Deprecated
public class JsonRowSerializationSchema implements SerializationSchema<Row> {
private static final long serialVersionUID = -2885556750743978636L;
/** Type information describing the input type. */
private final RowTypeInfo typeInfo;
/** Object mapper that is used to create output JSON objects. */
private transient ObjectMapper mapper;
private final SerializationRuntimeConverter runtimeConverter;
/** Reusable object node. */
private transient ObjectNode node;
private JsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information");
Preconditions.checkArgument(
typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
this.typeInfo = (RowTypeInfo) typeInfo;
this.runtimeConverter = createConverter(typeInfo);
}
@Override
public void open(InitializationContext context) throws Exception {
mapper = new ObjectMapper();
}
/** Builder for {@link JsonRowSerializationSchema}. */
@PublicEvolving
public static class Builder {
private RowTypeInfo typeInfo;
private Builder() {
// private constructor
}
/**
* Creates a JSON serialization schema for the given type information.
*
* @param typeInfo Type information describing the result type. The field names of {@link
* Row} are used to parse the JSON properties.
* @deprecated Use {@link JsonRowSerializationSchema#builder()} instead.
*/
@Deprecated
public Builder(TypeInformation<Row> typeInfo) {
checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
this.typeInfo = (RowTypeInfo) typeInfo;
}
/**
* Creates a JSON serialization schema for the given JSON schema.
*
* @param jsonSchema JSON schema describing the result type
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
* @deprecated Use {@link JsonRowSerializationSchema#builder()} instead.
*/
@Deprecated
public Builder(String jsonSchema) {
this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)));
}
/**
* Sets type information for JSON serialization schema.
*
* @param typeInfo Type information describing the result type. The field names of {@link
* Row} are used to parse the JSON properties.
*/
public Builder withTypeInfo(TypeInformation<Row> typeInfo) {
checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
this.typeInfo = (RowTypeInfo) typeInfo;
return this;
}
/**
* Finalizes the configuration and checks validity.
*
* @return Configured {@link JsonRowSerializationSchema}
*/
public JsonRowSerializationSchema build() {
checkArgument(typeInfo != null, "typeInfo should be set.");
return new JsonRowSerializationSchema(typeInfo);
}
}
/** Creates a builder for {@link JsonRowSerializationSchema.Builder}. */
public static Builder builder() {
return new Builder();
}
@Override
public byte[] serialize(Row row) {
if (node == null) {
node = mapper.createObjectNode();
}
try {
runtimeConverter.convert(mapper, node, row);
return mapper.writeValueAsBytes(node);
} catch (Throwable t) {
throw new RuntimeException(
"Could not serialize row '"
+ row
+ "'. "
+ "Make sure that the schema matches the input.",
t);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final JsonRowSerializationSchema that = (JsonRowSerializationSchema) o;
return Objects.equals(typeInfo, that.typeInfo);
}
@Override
public int hashCode() {
return Objects.hash(typeInfo);
}
/*
Runtime converters
*/
/** Runtime converter that maps between Java objects and corresponding {@link JsonNode}s. */
@FunctionalInterface
private interface SerializationRuntimeConverter extends Serializable {
JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object object);
}
private SerializationRuntimeConverter createConverter(TypeInformation<?> typeInfo) {
SerializationRuntimeConverter baseConverter =
createConverterForSimpleType(typeInfo)
.orElseGet(
() ->
createContainerConverter(typeInfo)
.orElseGet(this::createFallbackConverter));
return wrapIntoNullableConverter(baseConverter);
}
private SerializationRuntimeConverter wrapIntoNullableConverter(
SerializationRuntimeConverter converter) {
return (mapper, reuse, object) -> {
if (object == null) {
return mapper.getNodeFactory().nullNode();
}
return converter.convert(mapper, reuse, object);
};
}
private Optional<SerializationRuntimeConverter> createContainerConverter(
TypeInformation<?> typeInfo) {
if (typeInfo instanceof RowTypeInfo) {
return Optional.of(createRowConverter((RowTypeInfo) typeInfo));
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
return Optional.of(
createObjectArrayConverter(
((ObjectArrayTypeInfo) typeInfo).getComponentInfo()));
} else if (typeInfo instanceof BasicArrayTypeInfo) {
return Optional.of(
createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()));
} else if (isPrimitiveByteArray(typeInfo)) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().binaryNode((byte[]) object));
} else {
return Optional.empty();
}
}
private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
return typeInfo instanceof PrimitiveArrayTypeInfo
&& ((PrimitiveArrayTypeInfo) typeInfo).getComponentType() == Types.BYTE;
}
private SerializationRuntimeConverter createObjectArrayConverter(
TypeInformation elementTypeInfo) {
SerializationRuntimeConverter elementConverter = createConverter(elementTypeInfo);
return assembleArrayConverter(elementConverter);
}
private SerializationRuntimeConverter createRowConverter(RowTypeInfo typeInfo) {
List<SerializationRuntimeConverter> fieldConverters =
Arrays.stream(typeInfo.getFieldTypes())
.map(this::createConverter)
.collect(Collectors.toList());
return assembleRowConverter(typeInfo.getFieldNames(), fieldConverters);
}
private SerializationRuntimeConverter createFallbackConverter() {
return (mapper, reuse, object) -> {
// for types that were specified without JSON schema
// e.g. POJOs
try {
return mapper.valueToTree(object);
} catch (IllegalArgumentException e) {
throw new WrappingRuntimeException(
format("Could not convert object: %s", object), e);
}
};
}
private Optional<SerializationRuntimeConverter> createConverterForSimpleType(
TypeInformation<?> simpleTypeInfo) {
if (simpleTypeInfo == Types.VOID) {
return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().nullNode());
} else if (simpleTypeInfo == Types.BOOLEAN) {
return Optional.of(
(mapper, reuse, object) ->
mapper.getNodeFactory().booleanNode((Boolean) object));
} else if (simpleTypeInfo == Types.STRING) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().textNode((String) object));
} else if (simpleTypeInfo == Types.INT) {
return Optional.of(
(mapper, reuse, object) ->
mapper.getNodeFactory().numberNode((Integer) object));
} else if (simpleTypeInfo == Types.LONG) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Long) object));
} else if (simpleTypeInfo == Types.DOUBLE) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Double) object));
} else if (simpleTypeInfo == Types.FLOAT) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Float) object));
} else if (simpleTypeInfo == Types.SHORT) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Short) object));
} else if (simpleTypeInfo == Types.BYTE) {
return Optional.of(
(mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Byte) object));
} else if (simpleTypeInfo == Types.BIG_DEC) {
return Optional.of(createBigDecimalConverter());
} else if (simpleTypeInfo == Types.BIG_INT) {
return Optional.of(createBigIntegerConverter());
} else if (simpleTypeInfo == Types.SQL_DATE) {
return Optional.of(this::convertDate);
} else if (simpleTypeInfo == Types.SQL_TIME) {
return Optional.of(this::convertTime);
} else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
return Optional.of(this::convertTimestamp);
} else if (simpleTypeInfo == Types.LOCAL_DATE) {
return Optional.of(this::convertLocalDate);
} else if (simpleTypeInfo == Types.LOCAL_TIME) {
return Optional.of(this::convertLocalTime);
} else if (simpleTypeInfo == Types.LOCAL_DATE_TIME) {
return Optional.of(this::convertLocalDateTime);
} else {
return Optional.empty();
}
}
private JsonNode convertLocalDate(ObjectMapper mapper, JsonNode reuse, Object object) {
return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) object));
}
private JsonNode convertDate(ObjectMapper mapper, JsonNode reuse, Object object) {
Date date = (Date) object;
return convertLocalDate(mapper, reuse, date.toLocalDate());
}
private JsonNode convertLocalDateTime(ObjectMapper mapper, JsonNode reuse, Object object) {
return mapper.getNodeFactory()
.textNode(RFC3339_TIMESTAMP_FORMAT.format((LocalDateTime) object));
}
private JsonNode convertTimestamp(ObjectMapper mapper, JsonNode reuse, Object object) {
Timestamp timestamp = (Timestamp) object;
return convertLocalDateTime(mapper, reuse, timestamp.toLocalDateTime());
}
private JsonNode convertLocalTime(ObjectMapper mapper, JsonNode reuse, Object object) {
JsonNodeFactory nodeFactory = mapper.getNodeFactory();
return nodeFactory.textNode(RFC3339_TIME_FORMAT.format((LocalTime) object));
}
private JsonNode convertTime(ObjectMapper mapper, JsonNode reuse, Object object) {
final Time time = (Time) object;
return convertLocalTime(mapper, reuse, time.toLocalTime());
}
private SerializationRuntimeConverter createBigDecimalConverter() {
return (mapper, reuse, object) -> {
// convert decimal if necessary
JsonNodeFactory nodeFactory = mapper.getNodeFactory();
if (object instanceof BigDecimal) {
return nodeFactory.numberNode((BigDecimal) object);
}
return nodeFactory.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
};
}
private SerializationRuntimeConverter createBigIntegerConverter() {
return (mapper, reuse, object) -> {
// convert decimal if necessary
JsonNodeFactory nodeFactory = mapper.getNodeFactory();
if (object instanceof BigInteger) {
return nodeFactory.numberNode((BigInteger) object);
}
return nodeFactory.numberNode(BigInteger.valueOf(((Number) object).longValue()));
};
}
private SerializationRuntimeConverter assembleRowConverter(
String[] fieldNames, List<SerializationRuntimeConverter> fieldConverters) {
return (mapper, reuse, object) -> {
ObjectNode node;
// reuse could be a NullNode if last record is null.
if (reuse == null || reuse.isNull()) {
node = mapper.createObjectNode();
} else {
node = (ObjectNode) reuse;
}
Row row = (Row) object;
for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
node.set(
fieldName,
fieldConverters
.get(i)
.convert(mapper, node.get(fieldNames[i]), row.getField(i)));
}
return node;
};
}
private SerializationRuntimeConverter assembleArrayConverter(
SerializationRuntimeConverter elementConverter) {
return (mapper, reuse, object) -> {
ArrayNode node;
// reuse could be a NullNode if last record is null.
if (reuse == null || reuse.isNull()) {
node = mapper.createArrayNode();
} else {
node = (ArrayNode) reuse;
node.removeAll();
}
Object[] array = (Object[]) object;
for (Object element : array) {
node.add(elementConverter.convert(mapper, null, element));
}
return node;
};
}
}