Fix kafka_0_9 Consumer partition topic throw error (#7179)

Master Issue: #7178
## Motivation

When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,  
because 
```
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
```
## Modifications
BrokerService
```java
 public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
        List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();

        List<TopicPartition> topicPartitions = new ArrayList<>();
        try {
            for (String topic : topics) {
                // Create individual subscription on each partition, that way we can keep using the
                // acknowledgeCumulative()
                int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener(this);
                consumerBuilder.subscriptionName(groupId);
                //consumerBuilder.topics(topics);
                if (numberOfPartitions > 1) {
                    // Subscribe to each partition
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < numberOfPartitions; i++) {
                        String partitionName = TopicName.get(topic).getPartition(i).toString();
                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                .topic(partitionName).subscribeAsync();
                        int partitionIndex = i;
                        TopicPartition tp = new TopicPartition(
                                TopicName.get(topic).getPartitionedTopicName(),
                                partitionIndex);
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
```
We should remove consumerBuilder.topics(topics)。
1 file changed