blob: 3cf94aeddd35e5d433b659cc95a793f88a7c7d95 [file] [log] [blame]
package org.apache.rocketmq.connect.kafka.connector;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class KafkaRocketmqTask {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqTask.class);
private ClassLoader classLoader;
private Converter keyConverter;
private Converter valueConverter;
private HeaderConverter headerConverter;
public ClassLoader getClassLoader() {
return classLoader;
}
public void setClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
public Converter getKeyConverter() {
return keyConverter;
}
public Converter getValueConverter() {
return valueConverter;
}
public HeaderConverter getHeaderConverter() {
return headerConverter;
}
public void recoverClassLoader(){
if(this.classLoader != null){
Plugins.compareAndSwapLoaders(this.classLoader);
this.classLoader = null;
}
}
public void initConverter(Plugins plugins, Map<String, String> taskProps, String connectorName, String taskName){
Map<String, String> connProps = new HashMap<>(taskProps);
connProps.put(ConnectorConfig.NAME_CONFIG, connectorName);
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
Map<String, String> workerProps = new HashMap<>();
workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
ConfigDef converterConfigDef = new ConfigDef()
.define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
.define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
.define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
.CURRENT_CLASSLOADER);
valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), taskName);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), taskName);
}
if (valueConverter == null) {
valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), taskName);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), taskName);
}
if (headerConverter == null) {
headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
.PLUGINS);
log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), taskName);
} else {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), taskName);
}
}
}