blob: ab3203f5c3c95528651f99f8500aee7114b1883e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json.debezium;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* {@link DecodingFormat} for Debezium using JSON encoding.
*/
public class DebeziumJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
private List<String> metadataKeys;
// --------------------------------------------------------------------------------------------
// Debezium-specific attributes
// --------------------------------------------------------------------------------------------
private final boolean schemaInclude;
private final boolean ignoreParseErrors;
private final TimestampFormat timestampFormat;
public DebeziumJsonDecodingFormat(
boolean schemaInclude,
boolean ignoreParseErrors,
TimestampFormat timestampFormat) {
this.schemaInclude = schemaInclude;
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
this.metadataKeys = Collections.emptyList();
}
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,
DataType physicalDataType) {
final List<ReadableMetadata> readableMetadata = metadataKeys.stream()
.map(k ->
Stream.of(ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.collect(Collectors.toList());
final List<DataTypes.Field> metadataFields = readableMetadata.stream()
.map(m -> DataTypes.FIELD(m.key, m.dataType))
.collect(Collectors.toList());
final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new DebeziumJsonDeserializationSchema(
physicalDataType,
readableMetadata,
producedTypeInfo,
schemaInclude,
ignoreParseErrors,
timestampFormat);
}
@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
return metadataMap;
}
@Override
public void applyReadableMetadata(List<String> metadataKeys) {
this.metadataKeys = metadataKeys;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
// --------------------------------------------------------------------------------------------
// Metadata handling
// --------------------------------------------------------------------------------------------
/**
* List of metadata that can be read with this format.
*/
enum ReadableMetadata {
SCHEMA(
"schema",
DataTypes.STRING().nullable(),
false,
DataTypes.FIELD("schema", DataTypes.STRING()),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
return row.getString(pos);
}
}
),
INGESTION_TIMESTAMP(
"ingestion-timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
true,
DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
if (row.isNullAt(pos)) {
return null;
}
return TimestampData.fromEpochMillis(row.getLong(pos));
}
}
),
SOURCE_TIMESTAMP(
"source.timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
true,
DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
final StringData timestamp = (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
if (timestamp == null) {
return null;
}
return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
}
}
),
SOURCE_DATABASE(
"source.database",
DataTypes.STRING().nullable(),
true,
DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
return readProperty(row, pos, KEY_SOURCE_DATABASE);
}
}
),
SOURCE_SCHEMA(
"source.schema",
DataTypes.STRING().nullable(),
true,
DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
return readProperty(row, pos, KEY_SOURCE_SCHEMA);
}
}
),
SOURCE_TABLE(
"source.table",
DataTypes.STRING().nullable(),
true,
DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
return readProperty(row, pos, KEY_SOURCE_TABLE);
}
}
),
SOURCE_PROPERTIES(
"source.properties",
// key and value of the map are nullable to make handling easier in queries
DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
true,
DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(GenericRowData row, int pos) {
return row.getMap(pos);
}
}
);
final String key;
final DataType dataType;
final boolean isJsonPayload;
final DataTypes.Field requiredJsonField;
final MetadataConverter converter;
ReadableMetadata(
String key,
DataType dataType,
boolean isJsonPayload,
DataTypes.Field requiredJsonField,
MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.isJsonPayload = isJsonPayload;
this.requiredJsonField = requiredJsonField;
this.converter = converter;
}
}
private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
private static Object readProperty(GenericRowData row, int pos, StringData key) {
final GenericMapData map = (GenericMapData) row.getMap(pos);
if (map == null) {
return null;
}
return map.get(key);
}
}