Implement seekToBeginning()/end operations in Kafka consumer wrapper (#849)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 52fedcb..fcd40b6 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -48,6 +48,7 @@
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -59,7 +60,8 @@
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@@ -179,7 +181,14 @@
@Override
public void subscribe(Collection<String> topics) {
+ subscribe(topics, null);
+ }
+
+ @Override
+ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer>> futures = new ArrayList<>();
+
+ List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
@@ -197,25 +206,32 @@
CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client
.subscribeAsync(partitionName, groupId, conf);
int partitionIndex = i;
- future.thenAccept(
- consumer -> consumers.putIfAbsent(new TopicPartition(topic, partitionIndex), consumer));
+ TopicPartition tp = new TopicPartition(topic, partitionIndex);
+ future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
+ topicPartitions.add(tp);
}
-
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client.subscribeAsync(topic,
groupId, conf);
- future.thenAccept(consumer -> consumers.putIfAbsent(new TopicPartition(topic, 0), consumer));
+ TopicPartition tp = new TopicPartition(topic, 0);
+ future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
+ topicPartitions.add(tp);
}
}
// Wait for all consumers to be ready
futures.forEach(CompletableFuture::join);
+ // Notify the listener is now owning all topics/partitions
+ if (callback != null) {
+ callback.onPartitionsAssigned(topicPartitions);
+ }
+
} catch (Exception e) {
- // Close all consumer that might have been sucessfully created
+ // Close all consumer that might have been successfully created
futures.forEach(f -> {
try {
f.get().close();
@@ -229,11 +245,6 @@
}
@Override
- public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
- throw new UnsupportedOperationException("ConsumerRebalanceListener is not supported");
- }
-
- @Override
public void assign(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException("Cannot manually assign partitions");
}
@@ -383,17 +394,59 @@
@Override
public void seek(TopicPartition partition, long offset) {
- throw new UnsupportedOperationException();
+ MessageId msgId = MessageIdUtils.getMessageId(offset);
+ org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
+ if (c == null) {
+ throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
+ }
+
+ try {
+ c.seek(msgId);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (partitions.isEmpty()) {
+ partitions = consumers.keySet();
+ }
+
+ for (TopicPartition tp : partitions) {
+ org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+ if (c == null) {
+ futures.add(FutureUtil.failedFuture(
+ new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+ } else {
+ futures.add(c.seekAsync(MessageId.earliest));
+ }
+ }
+
+ FutureUtil.waitForAll(futures).join();
}
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (partitions.isEmpty()) {
+ partitions = consumers.keySet();
+ }
+
+ for (TopicPartition tp : partitions) {
+ org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+ if (c == null) {
+ futures.add(FutureUtil.failedFuture(
+ new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+ } else {
+ futures.add(c.seekAsync(MessageId.latest));
+ }
+ }
+
+ FutureUtil.waitForAll(futures).join();
}
@Override
@@ -472,4 +525,6 @@
public void wakeup() {
throw new UnsupportedOperationException();
}
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
index 0766ea5..447d1bd 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/tests/KafkaConsumerTest.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -232,4 +233,109 @@
consumers.forEach(Consumer::close);
}
+
+ @Test
+ public void testConsumerSeek() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+ });
+
+ consumer.commitSync();
+ }
+
+ consumer.seekToBeginning(Collections.emptyList());
+
+ Thread.sleep(500);
+
+ // Messages should be available again
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+ });
+
+ consumer.commitSync();
+ }
+
+ consumer.close();
+ }
+
+ @Test
+ public void testConsumerSeekToEnd() throws Exception {
+ String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerUrl.toString());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ Producer pulsarProducer = pulsarClient.createProducer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
+ .build();
+ pulsarProducer.send(msg);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ ConsumerRecords<String, String> records = consumer.poll(100);
+ assertEquals(records.count(), 1);
+ int idx = i;
+ records.forEach(record -> {
+ assertEquals(record.key(), Integer.toString(idx));
+ assertEquals(record.value(), "hello-" + idx);
+ });
+
+ consumer.commitSync();
+ }
+
+ consumer.seekToEnd(Collections.emptyList());
+ Thread.sleep(500);
+
+ consumer.close();
+
+ // Recreate the consumer
+ consumer = new PulsarKafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ ConsumerRecords<String, String> records = consumer.poll(100);
+ // Since we are at the end of the topic, there should be no messages
+ assertEquals(records.count(), 0);
+
+ consumer.close();
+ }
+
}