blob: 431d6b3aa0f9105c611495e2b2527d9a4f6e7262 [file] [log] [blame]
package org.apache.rocketmq.connect.kafka.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
import org.apache.rocketmq.connect.kafka.util.RecordUtil;
import org.apache.rocketmq.connect.kafka.util.RocketmqRecordPartitionKafkaTopicPartitionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class KafkaRocketmqSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkTask.class);
private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask;
private final KafkaRocketmqTask parentTask = new KafkaRocketmqTask();
private RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
@Override
public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size());
for(ConnectRecord sinkRecord: sinkRecords){
String topic = (String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC);
SchemaAndValue valueSchemaAndValue = this.parentTask.getValueConverter().toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
SchemaAndValue keySchemaAndValue = null;
if(key != null) {
keySchemaAndValue = this.parentTask.getKeyConverter().toConnectData(topic, key.getBytes(StandardCharsets.UTF_8));
}
TopicPartition topicPartition = this.kafkaTopicPartitionMapper.toTopicPartition(sinkRecord.getPosition().getPartition());
SinkRecord record = new SinkRecord(
topicPartition.topic(), topicPartition.partition(),
keySchemaAndValue==null?null:keySchemaAndValue.schema(),
keySchemaAndValue==null?null:keySchemaAndValue.value(),
valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
RecordUtil.getOffset(sinkRecord.getPosition().getOffset()),
sinkRecord.getTimestamp(), TimestampType.NO_TIMESTAMP_TYPE,
getHeaders(sinkRecord.getExtensions(), topic)
);
records.add(record);
}
try {
this.kafkaSinkTask.put(records);
} catch (org.apache.kafka.connect.errors.RetriableException e){
throw new RetriableException(e);
}
}
private ConnectHeaders getHeaders(KeyValue extensions, String topic){
ConnectHeaders headers = new ConnectHeaders();
for(String headerKey: extensions.keySet()){
if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){
continue;
}
SchemaAndValue headerSchemaAndValue = parentTask.getHeaderConverter()
.toConnectHeader(topic, headerKey, extensions.getString(headerKey).getBytes());
headers.add(headerKey, headerSchemaAndValue);
}
return headers;
}
@Override
public void start(KeyValue config) {
Map<String, String> kafkaTaskProps = ConfigUtil.getKafkaConnectorConfigs(config);
log.info("kafka connector task config is {}", kafkaTaskProps);
Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
this.parentTask.setClassLoader(Plugins.compareAndSwapLoaders(connectorLoader));
try {
TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
this.kafkaSinkTask = (org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass);
this.parentTask.initConverter(kafkaPlugins, kafkaTaskProps, this.sinkTaskContext.getConnectorName(), this.sinkTaskContext.getTaskName());
this.kafkaTopicPartitionMapper = RocketmqRecordPartitionKafkaTopicPartitionMapper.newKafkaTopicPartitionMapper(kafkaTaskProps);
this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext, this.kafkaTopicPartitionMapper));
this.kafkaSinkTask.start(kafkaTaskProps);
} catch (Throwable e){
this.parentTask.recoverClassLoader();
throw e;
}
}
@Override
public void stop() {
try {
this.kafkaSinkTask.stop();
} finally {
this.parentTask.recoverClassLoader();
}
}
@Override
public void flush(Map<RecordPartition, RecordOffset> currentOffsets) throws ConnectException {
if(this.kafkaSinkTask == null){
log.warn("the task is not start, currentOffsets:{}", currentOffsets);
return;
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets.size());
for(Map.Entry<RecordPartition, RecordOffset> po: currentOffsets.entrySet()){
TopicPartition tp = this.kafkaTopicPartitionMapper.toTopicPartition(po.getKey());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(RecordUtil.getOffset(po.getValue()));
offsets.put(tp, offsetAndMetadata);
}
this.kafkaSinkTask.flush(offsets);
}
}