Fix kafka_0_9 Consumer partition topic throw error (#7179)
Master Issue: #7178
## Motivation
When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,
because
```
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
```
## Modifications
BrokerService
```java
public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
//consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getPartitionedTopicName(),
partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
}));
topicPartitions.add(tp);
```
We should remove consumerBuilder.topics(topics)。
1 file changed