blob: 7fc734ad2a7fe59b79ba30b65edacd3d564b2688 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json.debezium;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Objects;
import static java.lang.String.format;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link RowData}.
* The deserialization schema knows Debezium's schema definition and can extract the database data
* and convert into {@link RowData} with {@link RowKind}.
*
* <p>Deserializes a <code>byte[]</code> message as a JSON object and reads
* the specified fields.
*
* <p>Failures during deserialization are forwarded as wrapped IOExceptions.
*
* @see <a href="https://debezium.io/">Debezium</a>
*/
@Internal
public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete
private static final String REPLICA_IDENTITY_EXCEPTION = "The \"before\" field of %s message is null, " +
"if you are using Debezium Postgres Connector, " +
"please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
/** The deserializer to deserialize Debezium JSON data. **/
private final JsonRowDataDeserializationSchema jsonDeserializer;
/** TypeInformation of the produced {@link RowData}. **/
private final TypeInformation<RowData> resultTypeInfo;
/**
* Flag indicating whether the Debezium JSON data contains schema part or not.
* When Debezium Kafka Connect enables "value.converter.schemas.enable", the JSON
* will contain "schema" information, but we just ignore "schema" and extract data
* from "payload".
*/
private final boolean schemaInclude;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
public DebeziumJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean schemaInclude,
boolean ignoreParseErrors,
TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.schemaInclude = schemaInclude;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer = new JsonRowDataDeserializationSchema(
createJsonRowType(fromLogicalToDataType(rowType), schemaInclude),
// the result type is never used, so it's fine to pass in Debezium's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of failOnMissingField
ignoreParseErrors,
timestampFormatOption);
}
@Override
public RowData deserialize(byte[] message) throws IOException {
throw new RuntimeException(
"Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
}
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
if (message == null || message.length == 0) {
// skip tombstone messages
return;
}
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
GenericRowData payload;
if (schemaInclude) {
payload = (GenericRowData) row.getField(0);
} else {
payload = row;
}
GenericRowData before = (GenericRowData) payload.getField(0);
GenericRowData after = (GenericRowData) payload.getField(1);
String op = payload.getField(2).toString();
if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
after.setRowKind(RowKind.INSERT);
out.collect(after);
} else if (OP_UPDATE.equals(op)) {
if (before == null) {
throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(before);
out.collect(after);
} else if (OP_DELETE.equals(op)) {
if (before == null) {
throw new IllegalStateException(String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
}
before.setRowKind(RowKind.DELETE);
out.collect(before);
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
"Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(message)), t);
}
}
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
return schemaInclude == that.schemaInclude &&
ignoreParseErrors == that.ignoreParseErrors &&
Objects.equals(jsonDeserializer, that.jsonDeserializer) &&
Objects.equals(resultTypeInfo, that.resultTypeInfo);
}
@Override
public int hashCode() {
return Objects.hash(jsonDeserializer, resultTypeInfo, schemaInclude, ignoreParseErrors);
}
private static RowType createJsonRowType(DataType databaseSchema, boolean schemaInclude) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("before", databaseSchema),
DataTypes.FIELD("after", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
if (schemaInclude) {
// when Debezium Kafka connect enables "value.converter.schemas.enable",
// the JSON will contain "schema" information, but we just ignore "schema"
// and extract data from "payload".
return (RowType) DataTypes.ROW(
DataTypes.FIELD("payload", payload)).getLogicalType();
} else {
// payload contains some other information, e.g. "source", "ts_ms"
// but we don't need them.
return (RowType) payload.getLogicalType();
}
}
}