blob: cb468e4822cf38f70d1600e60a4efbfeedcbb8ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.debezium;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import io.debezium.data.Envelope;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
/**
* Base class for debezium endpoint implementation
*/
public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration> extends DefaultEndpoint {
protected DebeziumEndpoint(String uri, DebeziumComponent component) {
super(uri, component);
}
protected DebeziumEndpoint() {
}
@Override
public Producer createProducer() throws Exception {
throw new UnsupportedOperationException("Cannot produce from a DebeziumEndpoint: "
+ getEndpointUri());
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
return new DebeziumConsumer(this, processor);
}
public ExecutorService createExecutor() {
return getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
"DebeziumConsumer");
}
public Exchange createDbzExchange(final SourceRecord record) {
final Exchange exchange = super.createExchange();
final Message message = exchange.getIn();
final Schema valueSchema = record.valueSchema();
final Object value = record.value();
// extract values from SourceRecord
final Map<String, Object> sourceMetadata = extractSourceMetadataValueFromValueStruct(valueSchema, value);
final Object operation = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.OPERATION);
final Object before = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.BEFORE);
final Object body = extractBodyValueFromValueStruct(valueSchema, value);
final Object timestamp = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.TIMESTAMP);
// set message headers
message.setHeader(DebeziumConstants.HEADER_IDENTIFIER, record.topic());
message.setHeader(DebeziumConstants.HEADER_KEY, record.key());
message.setHeader(DebeziumConstants.HEADER_SOURCE_METADATA, sourceMetadata);
message.setHeader(DebeziumConstants.HEADER_OPERATION, operation);
message.setHeader(DebeziumConstants.HEADER_BEFORE, before);
message.setHeader(DebeziumConstants.HEADER_TIMESTAMP, timestamp);
message.setBody(body);
return exchange;
}
public abstract C getConfiguration();
public abstract void setConfiguration(C configuration);
protected Object extractBodyValueFromValueStruct(final Schema schema, final Object value) {
// by default, we will extract the value from field `after`, however if other connector needs different field, this method needs to be overriden
return extractFieldValueFromValueStruct(schema, value, Envelope.FieldName.AFTER);
}
protected Object extractFieldValueFromValueStruct(final Schema schema, final Object value, final String fieldName) {
// first we try with normal extraction from value struct
final Object valueExtracted = extractValueFromValueStruct(schema, value, fieldName);
if (valueExtracted == null && !isSchemaAStructSchema(schema)) { // we could have anything other than struct, we just return that
return value;
}
return valueExtracted;
}
private Map<String, Object> extractSourceMetadataValueFromValueStruct(final Schema schema, final Object value) {
// we want to convert metadata to map since it facilitate usage and also struct structure is not needed for the metadata
final Object valueExtracted = extractValueFromValueStruct(schema, value, Envelope.FieldName.SOURCE);
if (valueExtracted != null) {
return DebeziumTypeConverter.toMap((Struct) valueExtracted);
}
return null;
}
private Object extractValueFromValueStruct(final Schema schema, final Object value, final String fieldName) {
// first we check if we have a value and a schema of struct type
if (isSchemaAStructSchema(schema) && value != null) {
// now we return our desired fieldName
try {
final Struct valueStruct = (Struct) value;
return valueStruct.get(fieldName);
} catch (DataException e) {
// we return null instead since this exception thrown when no value set or field doesn't exist
return null;
}
}
return null;
}
private boolean isSchemaAStructSchema(final Schema schema) {
return schema != null && schema.type().equals(Schema.Type.STRUCT);
}
}