Implement seekToBeginning()/end operations in Kafka consumer wrapper  (#849)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 52fedcb..fcd40b6 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -59,7 +60,8 @@
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
@@ -179,7 +181,14 @@
 
     @Override
     public void subscribe(Collection<String> topics) {
+        subscribe(topics, null);
+    }
+
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
         List<CompletableFuture<org.apache.pulsar.client.api.Consumer>> futures = new ArrayList<>();
+
+        List<TopicPartition> topicPartitions = new ArrayList<>();
         try {
             for (String topic : topics) {
                 // Create individual subscription on each partition, that way we can keep using the
@@ -197,25 +206,32 @@
                         CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client
                                 .subscribeAsync(partitionName, groupId, conf);
                         int partitionIndex = i;
-                        future.thenAccept(
-                                consumer -> consumers.putIfAbsent(new TopicPartition(topic, partitionIndex), consumer));
+                        TopicPartition tp = new TopicPartition(topic, partitionIndex);
+                        future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
                         futures.add(future);
+                        topicPartitions.add(tp);
                     }
-
                 } else {
                     // Topic has a single partition
                     CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client.subscribeAsync(topic,
                             groupId, conf);
-                    future.thenAccept(consumer -> consumers.putIfAbsent(new TopicPartition(topic, 0), consumer));
+                    TopicPartition tp = new TopicPartition(topic, 0);
+                    future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
                     futures.add(future);
+                    topicPartitions.add(tp);
                 }
             }
 
             // Wait for all consumers to be ready
             futures.forEach(CompletableFuture::join);
 
+            // Notify the listener is now owning all topics/partitions
+            if (callback != null) {
+                callback.onPartitionsAssigned(topicPartitions);
+            }
+
         } catch (Exception e) {
-            // Close all consumer that might have been sucessfully created
+            // Close all consumer that might have been successfully created
             futures.forEach(f -> {
                 try {
                     f.get().close();
@@ -229,11 +245,6 @@
     }
 
     @Override
-    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
-        throw new UnsupportedOperationException("ConsumerRebalanceListener is not supported");
-    }
-
-    @Override
     public void assign(Collection<TopicPartition> partitions) {
         throw new UnsupportedOperationException("Cannot manually assign partitions");
     }
@@ -383,17 +394,59 @@
 
     @Override
     public void seek(TopicPartition partition, long offset) {
-        throw new UnsupportedOperationException();
+        MessageId msgId = MessageIdUtils.getMessageId(offset);
+        org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
+        if (c == null) {
+            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
+        }
+
+        try {
+            c.seek(msgId);
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public void seekToBeginning(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        if (partitions.isEmpty()) {
+            partitions = consumers.keySet();
+        }
+
+        for (TopicPartition tp : partitions) {
+            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            if (c == null) {
+                futures.add(FutureUtil.failedFuture(
+                        new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+            } else {
+                futures.add(c.seekAsync(MessageId.earliest));
+            }
+        }
+
+        FutureUtil.waitForAll(futures).join();
     }
 
     @Override
     public void seekToEnd(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        if (partitions.isEmpty()) {
+            partitions = consumers.keySet();
+        }
+
+        for (TopicPartition tp : partitions) {
+            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            if (c == null) {
+                futures.add(FutureUtil.failedFuture(
+                        new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+            } else {
+                futures.add(c.seekAsync(MessageId.latest));
+            }
+        }
+
+        FutureUtil.waitForAll(futures).join();
     }
 
     @Override
@@ -472,4 +525,6 @@
     public void wakeup() {
         throw new UnsupportedOperationException();
     }
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
index 0766ea5..447d1bd 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
@@ -22,6 +22,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -232,4 +233,109 @@
 
         consumers.forEach(Consumer::close);
     }
+
+    @Test
+    public void testConsumerSeek() throws Exception {
+        String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerUrl.toString());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+        for (int i = 0; i < 10; i++) {
+            Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+                    .build();
+            pulsarProducer.send(msg);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            assertEquals(records.count(), 1);
+            int idx = i;
+            records.forEach(record -> {
+                assertEquals(record.key(), Integer.toString(idx));
+                assertEquals(record.value(), "hello-" + idx);
+            });
+
+            consumer.commitSync();
+        }
+
+        consumer.seekToBeginning(Collections.emptyList());
+
+        Thread.sleep(500);
+
+        // Messages should be available again
+        for (int i = 0; i < 10; i++) {
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            assertEquals(records.count(), 1);
+            int idx = i;
+            records.forEach(record -> {
+                assertEquals(record.key(), Integer.toString(idx));
+                assertEquals(record.value(), "hello-" + idx);
+            });
+
+            consumer.commitSync();
+        }
+
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerSeekToEnd() throws Exception {
+        String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerUrl.toString());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+        for (int i = 0; i < 10; i++) {
+            Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+                    .build();
+            pulsarProducer.send(msg);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            ConsumerRecords<String, String> records = consumer.poll(100);
+            assertEquals(records.count(), 1);
+            int idx = i;
+            records.forEach(record -> {
+                assertEquals(record.key(), Integer.toString(idx));
+                assertEquals(record.value(), "hello-" + idx);
+            });
+
+            consumer.commitSync();
+        }
+
+        consumer.seekToEnd(Collections.emptyList());
+        Thread.sleep(500);
+
+        consumer.close();
+
+        // Recreate the consumer
+        consumer = new PulsarKafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        ConsumerRecords<String, String> records = consumer.poll(100);
+        // Since we are at the end of the topic, there should be no messages
+        assertEquals(records.count(), 0);
+
+        consumer.close();
+    }
+
 }