blob: f190b0f50024d3f4935f3d37ab38aada29ed1da7 [file] [log] [blame]
package org.apache.rocketmq.connect.kafka.util;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.SchemaBuilder;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class RecordUtil {
public static final String BROKER_NAME = "brokerName";
public static final String QUEUE_ID = "queueId";
public static final String TOPIC = "topic";
public static final String QUEUE_OFFSET = "queueOffset";
public static final String KAFKA_MSG_KEY = "kafka_key";
public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY = "kafka_connect_record_topic";
public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY = "kafka_connect_record_partition";
public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX = "kafka_connect_record_header_";
public static long getOffset(RecordOffset recordOffset){
return Long.valueOf(
(String) recordOffset.getOffset().get(QUEUE_OFFSET)
);
}
public static ConnectRecord toConnectRecord(SourceRecord sourceRecord, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter){
Map<String, Object> partition = new HashMap<>();
partition.put("topic", sourceRecord.topic());
partition.putAll(sourceRecord.sourcePartition());
RecordPartition recordPartition = new RecordPartition(partition);
RecordOffset recordOffset = new RecordOffset(new HashMap<>(sourceRecord.sourceOffset()));
Long timestamp = sourceRecord.timestamp();
byte[] value = valueConverter.fromConnectData(
sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value()
);
ConnectRecord connectRecord = new ConnectRecord(
recordPartition, recordOffset, timestamp,
SchemaBuilder.string().build(), new String(value, StandardCharsets.UTF_8)
);
if(sourceRecord.key() != null) {
byte[] key = keyConverter.fromConnectData
(sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key()
);
connectRecord.addExtension(RecordUtil.KAFKA_MSG_KEY, new String(key, StandardCharsets.UTF_8));
}
for(Header header: sourceRecord.headers()){
byte[] headerValue = headerConverter.fromConnectHeader(
sourceRecord.topic(), header.key(), header.schema(), header.value()
);
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX+header.key(), new String(headerValue, StandardCharsets.UTF_8));
}
if(sourceRecord.topic() != null){
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY, sourceRecord.topic());
}
if(sourceRecord.kafkaPartition() != null){
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY, sourceRecord.kafkaPartition().toString());
}
return connectRecord;
}
public static SourceRecord toSourceRecord(ConnectRecord connectRecord, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter){
Map<String, ?> sourcePartition = new HashMap<>(connectRecord.getPosition().getPartition().getPartition());
Map<String, ?> sourceOffset = new HashMap<>(connectRecord.getPosition().getOffset().getOffset());
String topic = connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY);
String partitionStr = connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY);
Integer partition = null;
if(partitionStr != null){
partition = Integer.valueOf(partitionStr);
}
String keyStr = connectRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
Schema keySchema = null;
Object key = null;
if(keyStr != null){
SchemaAndValue keySchemaAndValue = keyConverter.toConnectData(topic, keyStr.getBytes(StandardCharsets.UTF_8));
keySchema = keySchemaAndValue.schema();
key = keySchemaAndValue.value();
}
ConnectHeaders headers = new ConnectHeaders();
for(String extKey: connectRecord.getExtensions().keySet()){
if(!extKey.startsWith(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX)){
continue;
}
String header = extKey.substring(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX.length());
SchemaAndValue headerSchemaAndValue = headerConverter
.toConnectHeader(topic, header, connectRecord.getExtension(extKey).getBytes(StandardCharsets.UTF_8));
headers.add(header, headerSchemaAndValue);
}
SchemaAndValue valueSchemaAndValue = keyConverter.toConnectData(topic, ((String)connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
SourceRecord sourceRecord = new SourceRecord(
sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
connectRecord.getTimestamp(),headers
);
return sourceRecord;
}
public static ConnectRecord sinkRecordToConnectRecord(SinkRecord sinkRecord, RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper){
TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
RecordPartition recordPartition = kafkaTopicPartitionMapper.toRecordPartition(topicPartition);
Map<String, String> offsetMap = new HashMap<>();
offsetMap.put(RecordUtil.QUEUE_OFFSET, sinkRecord.kafkaOffset() + "");
RecordOffset recordOffset = new RecordOffset(offsetMap);
ConnectRecord connectRecord = new ConnectRecord(
recordPartition, recordOffset, sinkRecord.timestamp(),
SchemaBuilder.string().build(), sinkRecord.value()
);
return connectRecord;
}
}