[feat][client] PCIP-5 Support Pull Consumer (#24)


### Motivation
Our organization is currently planning a migration from RocketMQ to Pulsar. To facilitate this transition, we aim to implement a standardized abstraction layer for MQ clients that encapsulates implementation details of specific messaging systems. This abstraction layer will allow seamless engine replacement while maintaining consistent client interfaces. However, two critical compatibility issues hinder the unification of message fetching patterns between RocketMQ/Kafka and Pulsar:

**1. Interface Disparity:**

- Pulsar currently lacks native support for offset-based message fetching (fetch/pull paradigm) that allows specifying exact message positions and batch sizes.

**2. Positioning Mechanism Mismatch:**

- RocketMQ/Kafka: Utilize monotonically increasing numerical offsets for message positioning and acknowledgment
- Pulsar: Relies on composite MessageID (ledgerId + entryId + batchIndex) for message identification.

Therefore, I propose to contribute a customized feature implementation to the Pulsar contributor repository that leverages existing Pulsar APIs to achieve functionality similar to RocketMQ's pull consumer. This implementation could serve companies like ours with standardized client requirements, allowing them to reuse this solution.


### Modifications
1. Upgrade Pulsar version to 4.0.8(features include PIP-415 a new API to get the message id by offset) #23 
2. Implement RocketMQ-like pull consumer functionality using existing Pulsar APIs.
diff --git a/pcip/pcip-5.md b/pcip/pcip-5.md
new file mode 100644
index 0000000..33d4be3
--- /dev/null
+++ b/pcip/pcip-5.md
@@ -0,0 +1,179 @@
+# PCIP-5: Pull Consumer Implementation for Apache Pulsar
+
+# Background knowledge
+
+- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared).
+  This proposal adds pull-based consumption.
+- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like
+  Kafka/RocketMQ use monotonic offsets.
+- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's
+  MessageIDs.
+
+# Motivation
+
+System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client
+abstraction layer to conceal implementation details and support seamless engine replacement.
+
+Interface Compatibility Issues:
+
+- Pulsar lacks a native offset retrieval interface (pull/fetch model).
+- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite
+  MessageID (ledgerId + entryId + partitionId).
+
+Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs.
+
+# Goals
+
+## In Scope
+
+| Goal                       | Description                                                               |  
+|----------------------------|---------------------------------------------------------------------------|  
+| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size.         |  
+| **Resource Efficiency**    | Reuses Reader connections with LRU cache management.                      |  
+| **Easy Integration**       | Compatible with Pulsar’s existing API, requiring no Broker modifications. |  
+
+## Out of Scope
+
+NA
+
+# High Level Design
+
+```mermaid
+graph TD
+    A[PullConsumer] -->|pull| B[Offset Mapping]
+    B -->|convert| C[MessageID]
+    C -->|seek| D[Reader Pool]
+    D -->|fetch| E[Broker]
+    A -->|ack| F[Offset-Based Ack]
+    F -->|convert| G[MessageID]
+    G -->|cumulative ack| H[Consumer]
+```
+
+Key components:
+
+1. **`PulsarPullConsumer` interface**: Standard pull consumer API
+2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer
+3. **Reader Pool**: Managed resource pool with LRU eviction
+4. **Partition Locking**: Thread-safe access coordination
+
+## Detailed Design
+
+### Design & Implementation Details
+
+**Core Interface** `PulsarPullConsumer`:
+
+```java
+public interface PulsarPullConsumer<T> extends AutoCloseable {
+    void start() throws PulsarClientException;
+
+    List<Message<T>> pull(PullRequest request);
+
+    void ack(long offset, int partition) throws PulsarClientException;
+
+    long searchOffset(int partition, long timestamp) throws PulsarAdminException;
+
+    long getConsumeStats(int partition) throws PulsarAdminException;
+
+    class PullRequest {
+        private long offset;
+        private int partition;
+        private int maxMessages;
+        private int maxBytes;
+        private Duration timeout;
+    }
+}
+```
+
+**Reader Management** :
+
+```java
+public class ReaderCache<T> {
+    private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap;
+
+    public Reader<T> getReader(String topic, long offset) {
+        // 1. Acquire partition lock
+        // 2. Get/create LRU cache (default: max 100 readers/partition)
+        // 3. Remove reader from cache for exclusive use
+    }
+
+    public void releaseReader(String topic, long nextOffset, Reader<T> reader) {
+        // Return reader to cache if still connected
+    }
+}
+```
+
+**Offset Mapping**:
+
+```java
+public class OffsetToMessageIdCache {
+    private final Map<String, LoadingCache<Long, MessageId>> partitionCaches;
+
+    public MessageId getMessageIdByOffset(String topic, long offset) {
+        // 1. Check caffeine cache (default: max 1000 entries/partition)
+        // 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset()
+        // 3. Populate cache
+    }
+}
+```
+
+### Public-facing Changes
+
+#### Public API
+
+**New Interfaces**:
+
+```java
+// Entry point
+PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
+        nonPartitionedTopic, subscription,
+        brokerCluster, Schema.BYTES,
+        pulsarClient, pulsarAdmin);
+
+// Usage
+List<Message<byte[]>> messages = pullConsumer1.pull(
+        PulsarPullConsumer.PullRequest.builder()
+                .offset(offset)
+                .partition(PulsarPullConsumer.PARTITION_NONE)
+                .maxMessages(10)
+                .maxBytes(1024 * 1024)
+                .timeout(java.time.Duration.ofSeconds(10))
+                .build());
+```
+
+## Get Started
+
+### Quick Start
+
+```java
+// 1. Create pull consumer
+PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
+        nonPartitionedTopic, subscription,
+        brokerCluster, Schema.BYTES,
+        pulsarClient, pulsarAdmin);
+
+consumer.start();  // Initialize connections
+
+// 2. Pull messages from partition 0 starting at offset 200
+List<Message<byte[]>> batch =  pullConsumer1.pull(
+        PulsarPullConsumer.PullRequest.builder()
+                .offset(offset)
+                .partition(PulsarPullConsumer.PARTITION_NONE)
+                .maxMessages(10)
+                .maxBytes(1024 * 1024)
+                .timeout(java.time.Duration.ofSeconds(10))
+                .build());
+);
+
+// 3. Process messages batch.
+
+forEach(msg ->{
+        System.out.println("Received: "+new String(msg.getData()));
+        // Store last offset
+        });
+
+// 4. Acknowledge up to last offset
+consumer.ack(250L,0);
+
+// 5. Close resources
+consumer.close();
+```
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e65edca..7573376 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
         <testcontainers.version>1.20.1</testcontainers.version>
         <junit.version>4.13.1</junit.version>
         <mockito.version>5.12.0</mockito.version>
+        <caffeine.version>3.2.0</caffeine.version>
     </properties>
 
     <dependencyManagement>
diff --git a/pulsar-client-common-contrib/pom.xml b/pulsar-client-common-contrib/pom.xml
index db36c7d..1e7cf3a 100644
--- a/pulsar-client-common-contrib/pom.xml
+++ b/pulsar-client-common-contrib/pom.xml
@@ -27,8 +27,25 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>${caffeine.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-admin</artifactId>
+            <version>${pulsar.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <inceptionYear>2024</inceptionYear>
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java
new file mode 100644
index 0000000..ea63939
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.common.ConsumeStats;
+import org.apache.pulsar.client.common.PullRequest;
+import org.apache.pulsar.client.common.PullResponse;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * Pull-based consumer interface with enhanced offset management capabilities.
+ *
+ * <p>Features:
+ *
+ * <ul>
+ *   <li>Precise offset control with partition-aware operations
+ *   <li>Thread-safe design for concurrent access
+ *   <li>Support for both partitioned and non-partitioned topics
+ *   <li>Built-in offset to message ID mapping
+ * </ul>
+ *
+ * @param <T> message payload type
+ */
+public interface PulsarPullConsumer<T> extends AutoCloseable {
+
+  /**
+   * Initializes consumer resources and establishes connections.
+   *
+   * @throws PulsarClientException if client initialization fails
+   */
+  void start() throws PulsarClientException;
+
+  /**
+   * Pulls messages from the specified partition starting from the given offset.
+   *
+   * @param request pull request configuration
+   * @return immutable list of messages starting from the specified offset
+   * @throws IllegalArgumentException for invalid request parameters
+   */
+  PullResponse<T> pull(PullRequest request);
+
+  /**
+   * Acknowledges all messages up to the specified offset (inclusive).
+   *
+   * @param offset target offset to acknowledge
+   * @param partition partition index (use -1 for non-partitioned topics)
+   * @throws PulsarClientException for acknowledgment failures
+   * @throws IllegalArgumentException for invalid partition index
+   */
+  void ack(long offset, int partition) throws PulsarClientException;
+
+  /**
+   * Acknowledges all messages up to the specified offset (inclusive).
+   *
+   * @param offset target offset to acknowledge
+   * @param partition partition index (use -1 for non-partitioned topics)
+   * @param ackType ackType Individual(0),Cumulative(1);
+   * @throws PulsarClientException for acknowledgment failures
+   * @throws IllegalArgumentException for invalid partition index
+   */
+  void ack(long offset, int partition, CommandAck.AckType ackType) throws PulsarClientException;
+
+  /**
+   * Finds the latest message offset before or at the specified timestamp.
+   *
+   * @param partition partition index (use -1 for non-partitioned topics)
+   * @param timestamp target timestamp in milliseconds
+   * @return corresponding message offset
+   * @throws PulsarAdminException for admin operation failures
+   * @throws IllegalArgumentException for invalid partition index
+   */
+  long searchOffset(int partition, long timestamp) throws PulsarAdminException;
+
+  /**
+   * Retrieves consumption statistics for the specified partition.
+   *
+   * @param partition partition index (use -1 for non-partitioned topics)
+   * @return current consumption offset
+   * @throws PulsarAdminException for stats retrieval failures
+   * @throws IllegalArgumentException for invalid partition index
+   */
+  ConsumeStats getConsumeStats(int partition) throws PulsarAdminException;
+
+  /** Releases all resources and closes connections gracefully. */
+  @Override
+  void close();
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java
new file mode 100644
index 0000000..74bba7b
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.api.impl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarPullConsumer;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.common.ConsumeStats;
+import org.apache.pulsar.client.common.PullRequest;
+import org.apache.pulsar.client.common.PullResponse;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.client.util.OffsetToMessageIdCache;
+import org.apache.pulsar.client.util.OffsetToMessageIdCacheProvider;
+import org.apache.pulsar.client.util.PulsarAdminUtils;
+import org.apache.pulsar.client.util.ReaderCache;
+import org.apache.pulsar.client.util.ReaderCacheProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarPullConsumerImpl<T> implements PulsarPullConsumer<T> {
+  private static final Logger log = LoggerFactory.getLogger(PulsarPullConsumerImpl.class);
+  private static final String PARTITION_SPLICER = "-partition-";
+  private static final int DEFAULT_READ_TIMEOUT_MS = 30_000;
+
+  private final String topic;
+  private final String subscription;
+  private final String brokerCluster;
+  private final Schema<T> schema;
+  private final Map<String, Consumer<T>> consumerMap;
+  private final OffsetToMessageIdCache offsetToMessageIdCache;
+  private final ReaderCache<T> readerCache;
+  private final PulsarAdmin pulsarAdmin;
+  private final Supplier<PulsarClient> pulsarClientSupplier;
+  private final ConsumerBuilder<T> consumerBuilder;
+
+  private volatile int partitionCount;
+
+  public PulsarPullConsumerImpl(
+      String topic,
+      String subscription,
+      String brokerCluster,
+      Schema<T> schema,
+      Supplier<PulsarClient> clientSupplier,
+      PulsarAdmin admin,
+      ConsumerBuilder<T> consumerBuilder) {
+    this.topic = Objects.requireNonNull(topic, "Topic must not be null");
+    this.subscription = Objects.requireNonNull(subscription, "Subscription must not be null");
+    this.brokerCluster = Objects.requireNonNull(brokerCluster, "Broker cluster must not be null");
+    this.schema = Objects.requireNonNull(schema, "Schema must not be null");
+    this.pulsarClientSupplier =
+        Objects.requireNonNull(clientSupplier, "PulsarClient must not be null");
+    this.pulsarAdmin = Objects.requireNonNull(admin, "PulsarAdmin must not be null");
+    this.consumerMap = new ConcurrentHashMap<>();
+    this.offsetToMessageIdCache =
+        OffsetToMessageIdCacheProvider.getOrCreateCache(admin, brokerCluster);
+    this.readerCache =
+        ReaderCacheProvider.getOrCreateReaderCache(
+            this.subscription, brokerCluster, schema, clientSupplier.get(), offsetToMessageIdCache);
+    this.consumerBuilder =
+        consumerBuilder == null
+            ? getPulsarClient().newConsumer(schema).subscriptionType(SubscriptionType.Failover)
+            : consumerBuilder;
+  }
+
+  @Override
+  public void start() throws PulsarClientException {
+    try {
+      initializePartitions();
+    } catch (PulsarAdminException e) {
+      throw new PulsarClientException("Failed to initialize partitions", e);
+    }
+  }
+
+  private void initializePartitions() throws PulsarAdminException, PulsarClientException {
+    PartitionedTopicMetadata metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
+    this.partitionCount = metadata.partitions;
+
+    if (partitionCount == 0) {
+      subscribeToTopic(topic);
+      return;
+    }
+
+    for (int i = 0; i < partitionCount; i++) {
+      String partitionTopic = buildPartitionTopic(topic, i);
+      subscribeToTopic(partitionTopic);
+    }
+  }
+
+  private void subscribeToTopic(String topicName) throws PulsarClientException {
+    Consumer<T> consumer =
+        consumerBuilder.topic(topicName).subscriptionName(subscription).subscribe();
+    consumerMap.put(topicName, consumer);
+    log.debug("Subscribed to topic: {}", topicName);
+  }
+
+  private PulsarClient getPulsarClient() {
+    return Objects.requireNonNull(
+        pulsarClientSupplier.get(),
+        "PulsarClient supplier returned null. Ensure PulsarClient is properly initialized.");
+  }
+
+  @Override
+  public PullResponse<T> pull(PullRequest request) {
+    validatePullParameters(request.getMaxMessages(), request.getMaxBytes());
+
+    String partitionTopic = buildPartitionTopic(topic, request.getPartition());
+    Reader<T> reader = null;
+    Message<T> lastMessage = null;
+    try {
+      reader = readerCache.getReader(partitionTopic, request.getOffset());
+      List<Message<T>> messages =
+          readMessages(
+              reader, request.getMaxMessages(), request.getMaxBytes(), request.getTimeout());
+      lastMessage = messages.isEmpty() ? null : messages.get(messages.size() - 1);
+      return new PullResponse<>(reader.hasMessageAvailable(), messages);
+    } catch (PulsarClientException e) {
+      log.error(
+          "Failed to pull messages from topic {} at offset {}",
+          partitionTopic,
+          request.getOffset(),
+          e);
+      return new PullResponse<>(false, Collections.emptyList());
+    } finally {
+      if (reader != null) {
+        releaseReader(
+            partitionTopic,
+            reader,
+            lastMessage != null ? lastMessage.getIndex().get() + 1 : request.getOffset());
+      }
+    }
+  }
+
+  private List<Message<T>> readMessages(
+      Reader<T> reader, int maxMessages, int maxBytes, Duration timeout) {
+    List<Message<T>> messages = new ArrayList<>(Math.min(maxMessages, 1024));
+    int totalBytes = 0;
+    long deadline = System.nanoTime() + timeout.toNanos();
+
+    while (messages.size() < maxMessages && totalBytes < maxBytes) {
+      long remaining = deadline - System.nanoTime();
+      if (remaining <= 0) {
+        break;
+      }
+
+      try {
+        Message<T> msg =
+            reader.readNext(
+                (int) Math.min(TimeUnit.NANOSECONDS.toMillis(remaining), DEFAULT_READ_TIMEOUT_MS),
+                TimeUnit.MILLISECONDS);
+        if (msg == null) {
+          break;
+        }
+
+        messages.add(msg);
+        totalBytes += msg.getData().length;
+      } catch (PulsarClientException e) {
+        log.warn("Error reading message from {}", reader.getTopic(), e);
+        break;
+      }
+    }
+    return Collections.unmodifiableList(messages);
+  }
+
+  @Override
+  public void ack(long offset, int partition) throws PulsarClientException {
+    ack(offset, partition, CommandAck.AckType.Cumulative);
+  }
+
+  @Override
+  public void ack(long offset, int partition, CommandAck.AckType ackType)
+      throws PulsarClientException {
+    String partitionTopic = buildPartitionTopic(topic, partition);
+    Consumer<T> consumer = consumerMap.get(partitionTopic);
+    if (consumer == null) {
+      throw new PulsarClientException("Consumer not found for partition: " + partition);
+    }
+
+    MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset);
+    if (messageId == null) {
+      throw new PulsarClientException("MessageID not found for offset: " + offset);
+    }
+
+    if (ackType == CommandAck.AckType.Individual) {
+      TopicMessageIdImpl topicMessageId =
+          new TopicMessageIdImpl(partitionTopic, (MessageIdAdv) messageId);
+      consumer.acknowledge(topicMessageId);
+    } else {
+      if (consumer instanceof MultiTopicsConsumerImpl) {
+        TopicMessageIdImpl topicMessageId =
+            new TopicMessageIdImpl(partitionTopic, (MessageIdAdv) messageId);
+        consumer.acknowledgeCumulative(topicMessageId);
+      } else {
+        consumer.acknowledgeCumulative(messageId);
+      }
+    }
+  }
+
+  @Override
+  public long searchOffset(int partition, long timestamp) throws PulsarAdminException {
+    String partitionTopic = buildPartitionTopic(topic, partition);
+    return PulsarAdminUtils.searchOffset(partitionTopic, timestamp, brokerCluster, pulsarAdmin);
+  }
+
+  @Override
+  public ConsumeStats getConsumeStats(int partition) throws PulsarAdminException {
+    String partitionTopic = buildPartitionTopic(topic, partition);
+    return PulsarAdminUtils.getConsumeStats(
+        partitionTopic, partition, subscription, brokerCluster, pulsarAdmin);
+  }
+
+  @Override
+  public void close() {
+    closeResources();
+  }
+
+  private void closeResources() {
+    consumerMap
+        .values()
+        .forEach(
+            consumer -> {
+              try {
+                consumer.close();
+              } catch (PulsarClientException e) {
+                log.warn("Failed to close consumer for topic {}", consumer.getTopic(), e);
+              }
+            });
+
+    try {
+      offsetToMessageIdCache.cleanup();
+    } catch (Exception e) {
+      log.warn("Error cleaning offset cache", e);
+    }
+  }
+
+  private void validatePullParameters(int maxMessages, int maxBytes) {
+    if (maxMessages <= 0) {
+      throw new IllegalArgumentException("maxMessages must be positive");
+    }
+    if (maxBytes <= 0) {
+      throw new IllegalArgumentException("maxBytes must be positive");
+    }
+  }
+
+  private String buildPartitionTopic(String baseTopic, int partition) {
+    if (partitionCount > 0 && partition >= partitionCount) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Invalid partition %d for topic %s with %d partitions",
+              partition, baseTopic, partitionCount));
+    }
+    return partitionCount == 0 ? baseTopic : baseTopic + PARTITION_SPLICER + partition;
+  }
+
+  private void releaseReader(String topicPartition, Reader<T> reader, long nextOffset) {
+    try {
+      if (reader.isConnected()) {
+        readerCache.releaseReader(topicPartition, nextOffset, reader);
+      }
+    } catch (Exception e) {
+      log.warn("Error releasing reader for {}", topicPartition, e);
+    }
+  }
+}
diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java
similarity index 64%
copy from pulsar-client-common-contrib/src/test/java/DemoTest.java
copy to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java
index 7b2a7c4..a40fc05 100644
--- a/pulsar-client-common-contrib/src/test/java/DemoTest.java
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java
@@ -11,14 +11,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.Test;
+package org.apache.pulsar.client.common;
 
-@Slf4j
-public class DemoTest {
+import java.time.Duration;
 
-  @Test
-  public void testDemo() {
-    log.info("=== Test started ===");
-  }
+public class Constants {
+  public static final int PARTITION_NONE_INDEX = -1;
+  public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofSeconds(30);
+  public static final String PARTITIONED_TOPIC_SUFFIX = "-partitioned-";
 }
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java
new file mode 100644
index 0000000..58b19b8
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.common;
+
+import lombok.Data;
+
+@Data
+public class ConsumeStats {
+  private String topic;
+  private String group;
+  private long minOffset;
+  private long maxOffset;
+  private long lastConsumedOffset;
+
+  public ConsumeStats(
+      String topic, String group, long minOffset, long maxOffset, long lastConsumedOffset) {
+    this.topic = topic;
+    this.group = group;
+    this.minOffset = minOffset;
+    this.maxOffset = maxOffset;
+    this.lastConsumedOffset = lastConsumedOffset;
+  }
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java
new file mode 100644
index 0000000..02179a6
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.common;
+
+import java.time.Duration;
+import lombok.Data;
+
+/** Configuration object for pull requests. */
+@Data
+public class PullRequest {
+  private final long offset;
+  private final int partition;
+  private final int maxMessages;
+  private final int maxBytes;
+  private final Duration timeout;
+
+  private PullRequest(Builder builder) {
+    this.offset = builder.offset;
+    this.partition = builder.partition;
+    this.maxMessages = builder.maxMessages;
+    this.maxBytes = builder.maxBytes;
+    this.timeout = builder.timeout;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private long offset = -1L;
+    private int partition = Constants.PARTITION_NONE_INDEX;
+    private int maxMessages = 100;
+    private int maxBytes = 10_485_760; // 10MB
+    private Duration timeout = Constants.DEFAULT_OPERATION_TIMEOUT;
+
+    public Builder offset(long offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    public Builder partition(int partition) {
+      this.partition = partition;
+      return this;
+    }
+
+    public Builder maxMessages(int maxMessages) {
+      this.maxMessages = maxMessages;
+      return this;
+    }
+
+    public Builder maxBytes(int maxBytes) {
+      this.maxBytes = maxBytes;
+      return this;
+    }
+
+    public Builder timeout(Duration timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
+    public PullRequest build() {
+      validate();
+      return new PullRequest(this);
+    }
+
+    private void validate() {
+      if (maxMessages <= 0 || maxBytes <= 0) {
+        throw new IllegalArgumentException("Max messages/bytes must be positive");
+      }
+    }
+  }
+}
diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java
similarity index 61%
copy from pulsar-client-common-contrib/src/test/java/DemoTest.java
copy to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java
index 7b2a7c4..63cdd91 100644
--- a/pulsar-client-common-contrib/src/test/java/DemoTest.java
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java
@@ -11,14 +11,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.Test;
+package org.apache.pulsar.client.common;
 
-@Slf4j
-public class DemoTest {
+import java.util.List;
+import lombok.Data;
+import org.apache.pulsar.client.api.Message;
 
-  @Test
-  public void testDemo() {
-    log.info("=== Test started ===");
+@Data
+public class PullResponse<T> {
+  boolean hasMoreMsg;
+  List<Message<T>> messages;
+
+  public PullResponse(boolean hasMoreMsg, List<Message<T>> messages) {
+    this.hasMoreMsg = hasMoreMsg;
+    this.messages = messages;
   }
 }
diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java
similarity index 74%
copy from pulsar-client-common-contrib/src/test/java/DemoTest.java
copy to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java
index 7b2a7c4..4873d10 100644
--- a/pulsar-client-common-contrib/src/test/java/DemoTest.java
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java
@@ -11,14 +11,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class DemoTest {
-
-  @Test
-  public void testDemo() {
-    log.info("=== Test started ===");
-  }
-}
+package org.apache.pulsar.client.common;
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java
new file mode 100644
index 0000000..f1ecc00
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+
+/**
+ * Cache for mapping offsets to MessageIds with the following features. - Per-partition caching with
+ * configurable size and expiration - Thread-safe cache initialization and access - Automatic cache
+ * cleanup
+ */
+public class OffsetToMessageIdCache {
+  private static final int DEFAULT_MAX_CACHE_SIZE = 1000;
+  private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5);
+
+  private final PulsarAdmin pulsarAdmin;
+  private final Map<String, LoadingCache<Long, MessageId>> partitionCaches =
+      new ConcurrentHashMap<>();
+  private final int maxCacheSize;
+  private final Duration expireAfterAccess;
+
+  public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin) {
+    this(pulsarAdmin, DEFAULT_MAX_CACHE_SIZE, DEFAULT_EXPIRE_AFTER_ACCESS);
+  }
+
+  public OffsetToMessageIdCache(
+      PulsarAdmin pulsarAdmin, int maxCacheSize, Duration expireAfterAccess) {
+    this.pulsarAdmin = Objects.requireNonNull(pulsarAdmin, "PulsarAdmin must not be null");
+    this.maxCacheSize = validatePositive(maxCacheSize, "Cache size must be positive");
+    this.expireAfterAccess =
+        Objects.requireNonNull(expireAfterAccess, "Expire duration must not be null");
+  }
+
+  private LoadingCache<Long, MessageId> createCache(String partitionTopic) {
+    return Caffeine.newBuilder()
+        .maximumSize(maxCacheSize)
+        .expireAfterAccess(expireAfterAccess)
+        .recordStats()
+        .build(offset -> loadMessageId(partitionTopic, offset));
+  }
+
+  private MessageId loadMessageId(String partitionTopic, Long offset) throws PulsarAdminException {
+    if (offset < 0) {
+      return MessageId.earliest;
+    }
+    return pulsarAdmin.topics().getMessageIdByIndex(partitionTopic, offset);
+  }
+
+  public MessageId getMessageIdByOffset(String partitionTopic, long offset) {
+    return partitionCaches.computeIfAbsent(partitionTopic, this::createCache).get(offset);
+  }
+
+  public void putMessageIdByOffset(String partitionTopic, long offset, MessageId messageId) {
+    partitionCaches.computeIfAbsent(partitionTopic, this::createCache).put(offset, messageId);
+  }
+
+  /** Cleans up all cached entries and releases resources. */
+  public void cleanup() {
+    partitionCaches
+        .values()
+        .forEach(
+            cache -> {
+              cache.invalidateAll();
+              cache.cleanUp();
+            });
+    partitionCaches.clear();
+  }
+
+  /**
+   * Removes cache for specific partition topic.
+   *
+   * @param partitionTopic topic partition to remove
+   */
+  public void removePartitionCache(String partitionTopic) {
+    LoadingCache<Long, MessageId> cache = partitionCaches.remove(partitionTopic);
+    if (cache != null) {
+      cache.invalidateAll();
+      cache.cleanUp();
+    }
+  }
+
+  private static int validatePositive(int value, String errorMessage) {
+    if (value <= 0) {
+      throw new IllegalArgumentException(errorMessage);
+    }
+    return value;
+  }
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java
new file mode 100644
index 0000000..dc2e740
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+public class OffsetToMessageIdCacheProvider {
+  private static final Map<String, OffsetToMessageIdCache> CACHE_MAP = new ConcurrentHashMap<>();
+
+  public static OffsetToMessageIdCache getOrCreateCache(
+      PulsarAdmin pulsarAdmin, String brokerCluster) {
+    return CACHE_MAP.computeIfAbsent(brokerCluster, key -> new OffsetToMessageIdCache(pulsarAdmin));
+  }
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java
new file mode 100644
index 0000000..7f517d9
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.common.ConsumeStats;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+
+public class PulsarAdminUtils {
+
+  public static long searchOffset(
+      String partitionTopic, long timestamp, String brokerCluster, PulsarAdmin pulsarAdmin)
+      throws PulsarAdminException {
+    MessageIdAdv messageId =
+        (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp);
+    return extractMessageIndex(partitionTopic, messageId, brokerCluster, pulsarAdmin);
+  }
+
+  public static ConsumeStats getConsumeStats(
+      String partitionTopic,
+      int partition,
+      String subscription,
+      String brokerCluster,
+      PulsarAdmin pulsarAdmin)
+      throws PulsarAdminException {
+    ConsumeStats consumeStats = new ConsumeStats(partitionTopic, subscription, -1L, -1L, -1L);
+
+    PersistentTopicInternalStats internalStats =
+        pulsarAdmin.topics().getInternalStats(partitionTopic);
+    if (internalStats == null || internalStats.ledgers.isEmpty()) {
+      return consumeStats;
+    }
+
+    String consumedPosition =
+        internalStats.cursors.containsKey(subscription)
+            ? internalStats.cursors.get(subscription).markDeletePosition
+            : "-1:-1";
+    MessageIdAdv consumedMessageId = parseMessageIdFromString(consumedPosition, partition);
+    String maxPosition = internalStats.lastConfirmedEntry;
+    MessageIdAdv maxMessageId = parseMessageIdFromString(maxPosition, partition);
+    String minPosition = internalStats.ledgers.get(0).ledgerId + ":-1";
+    MessageIdAdv minMessageId = parseMessageIdFromString(minPosition, partition);
+
+    // Ensure consumedMessageId is not less than minMessageId
+    if (consumedMessageId.compareTo(minMessageId) < 0) {
+      consumedMessageId = minMessageId;
+    }
+
+    consumeStats.setLastConsumedOffset(
+        extractMessageIndex(partitionTopic, consumedMessageId, brokerCluster, pulsarAdmin));
+    consumeStats.setMaxOffset(
+        extractMessageIndex(partitionTopic, maxMessageId, brokerCluster, pulsarAdmin));
+    consumeStats.setMinOffset(
+        extractMessageIndex(partitionTopic, minMessageId, brokerCluster, pulsarAdmin));
+    return consumeStats;
+  }
+
+  // Common message processing logic
+  private static long extractMessageIndex(
+      String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin)
+      throws PulsarAdminException {
+    if (messageId == null || messageId.getLedgerId() < 0) {
+      return -1;
+    }
+    long ledgerId = messageId.getLedgerId();
+    long entryId = messageId.getEntryId();
+    if (ledgerId > 0 && entryId < 0) {
+      entryId = 0;
+      return getMessageIndex(
+              topic,
+              new MessageIdImpl(ledgerId, entryId, messageId.getPartitionIndex()),
+              brokerCluster,
+              pulsarAdmin)
+          - 1;
+    } else {
+      return getMessageIndex(topic, messageId, brokerCluster, pulsarAdmin);
+    }
+  }
+
+  private static long getMessageIndex(
+      String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin)
+      throws PulsarAdminException {
+    Message<byte[]> message =
+        pulsarAdmin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
+
+    if (message == null) {
+      throw new PulsarAdminException("No messages found for " + messageId + " in topic " + topic);
+    }
+
+    Optional<Long> indexOptional = message.getIndex();
+    if (indexOptional.isPresent()) {
+      long index = indexOptional.get();
+      OffsetToMessageIdCacheProvider.getOrCreateCache(pulsarAdmin, brokerCluster)
+          .putMessageIdByOffset(topic, index, messageId);
+      return index;
+    } else {
+      throw new PulsarAdminException(
+          "Message index not found for " + messageId + " in topic " + topic);
+    }
+  }
+
+  private static long processMessageId(
+      String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin)
+      throws PulsarAdminException {
+    try {
+      return extractMessageIndex(topic, messageId, brokerCluster, pulsarAdmin);
+    } catch (NumberFormatException e) {
+      throw new PulsarAdminException("Invalid ID components: " + messageId, e);
+    }
+  }
+
+  private static MessageIdAdv parseMessageIdFromString(String messageIdStr, int partition)
+      throws PulsarAdminException {
+    String[] parts = messageIdStr.split(":");
+    if (parts.length < 2) {
+      throw new PulsarAdminException("Invalid message ID format: " + messageIdStr);
+    }
+    try {
+      long ledgerId = Long.parseLong(parts[0]);
+      long entryId = Long.parseLong(parts[1]);
+      return new MessageIdImpl(ledgerId, entryId, partition);
+    } catch (NumberFormatException e) {
+      throw new PulsarAdminException("Invalid message ID components: " + messageIdStr, e);
+    }
+  }
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java
new file mode 100644
index 0000000..6c2ff4b
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReaderCache provides thread-safe management of Pulsar Reader instances with the following
+ * features. - Partition-level locking for concurrent access - LRU eviction with size-based and
+ * access-time-based policies - Automatic resource cleanup
+ */
+public class ReaderCache<T> {
+  private static final Logger log = LoggerFactory.getLogger(ReaderCache.class);
+  private static final int DEFAULT_MAX_CACHE_SIZE = 100;
+  private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5);
+  private static final int MAX_RETRIES = 3;
+  private static final long RETRY_DELAY_MS = 100;
+
+  private final String subscription;
+  private final PulsarClient pulsarClient;
+  private final OffsetToMessageIdCache offsetToMessageIdCache;
+  private final Schema<T> schema;
+  private final Map<String, ReentrantLock> partitionLocks = new ConcurrentHashMap<>();
+  private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap =
+      new ConcurrentHashMap<>();
+
+  private final int maxCacheSize;
+  private final Duration expireAfterAccess;
+
+  public ReaderCache(
+      String subscription,
+      PulsarClient pulsarClient,
+      OffsetToMessageIdCache offsetToMessageIdCache,
+      Schema<T> schema) {
+    this(
+        subscription,
+        pulsarClient,
+        offsetToMessageIdCache,
+        schema,
+        DEFAULT_MAX_CACHE_SIZE,
+        DEFAULT_EXPIRE_AFTER_ACCESS);
+  }
+
+  public ReaderCache(
+      String subscription,
+      PulsarClient pulsarClient,
+      OffsetToMessageIdCache offsetToMessageIdCache,
+      Schema<T> schema,
+      int maxCacheSize,
+      Duration expireAfterAccess) {
+    this.subscription = subscription;
+    this.pulsarClient = Objects.requireNonNull(pulsarClient);
+    this.offsetToMessageIdCache = Objects.requireNonNull(offsetToMessageIdCache);
+    this.schema = Objects.requireNonNull(schema);
+    this.maxCacheSize = maxCacheSize;
+    this.expireAfterAccess = expireAfterAccess;
+  }
+
+  private LoadingCache<Long, Reader<T>> createCache(String partitionTopic) {
+    return Caffeine.newBuilder()
+        .maximumSize(maxCacheSize)
+        .expireAfterAccess(expireAfterAccess)
+        .removalListener(
+            (RemovalListener<Long, Reader<T>>)
+                (key, reader, cause) -> {
+                  // Do not close reader on explicit removal
+                  if (reader != null && cause != RemovalCause.EXPLICIT) {
+                    closeReaderSilently(reader);
+                  }
+                })
+        .recordStats()
+        .build(key -> createReaderWithRetry(partitionTopic, key));
+  }
+
+  private Reader<T> createReaderWithRetry(String partitionTopic, Long offset)
+      throws PulsarClientException {
+    int attempts = 0;
+    while (attempts < MAX_RETRIES) {
+      try {
+        return pulsarClient
+            .newReader(schema)
+            .readerName("reader-" + subscription)
+            .startMessageId(offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset))
+            .topic(partitionTopic)
+            .create();
+      } catch (PulsarClientException e) {
+        if (++attempts >= MAX_RETRIES) {
+          throw e;
+        }
+        handleRetry(partitionTopic, offset, attempts, e);
+      }
+    }
+    throw new PulsarClientException("Failed to create reader after " + MAX_RETRIES + " attempts");
+  }
+
+  private void handleRetry(String partitionTopic, Long offset, int attempt, Exception e) {
+    log.warn(
+        "Reader creation failed [Topic: {}][Offset: {}] Attempt {}/{}: {}",
+        partitionTopic,
+        offset,
+        attempt,
+        MAX_RETRIES,
+        e.getMessage());
+    try {
+      Thread.sleep(RETRY_DELAY_MS * attempt);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during retry", ie);
+    }
+  }
+
+  public Reader<T> getReader(String partitionTopic, long offset) {
+    final ReentrantLock lock =
+        partitionLocks.computeIfAbsent(partitionTopic, k -> new ReentrantLock());
+    lock.lock();
+    try {
+      LoadingCache<Long, Reader<T>> cache =
+          readerCacheMap.computeIfAbsent(partitionTopic, pt -> createCache(pt));
+      Reader<T> reader = cache.get(offset);
+      // Ensure exclusive use: remove from cache but don't close
+      cache.invalidate(offset);
+      return reader;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  // Allows users to proactively return the reader
+  public void releaseReader(String partitionTopic, long offset, Reader<T> reader) {
+    readerCacheMap.computeIfPresent(
+        partitionTopic,
+        (pt, cache) -> {
+          if (reader.isConnected()) {
+            cache.put(offset, reader);
+          }
+          return cache;
+        });
+  }
+
+  public void cleanup() {
+    readerCacheMap.forEach(
+        (partition, cache) -> {
+          cache.invalidateAll();
+          cache.cleanUp();
+        });
+    readerCacheMap.clear();
+    partitionLocks.clear();
+  }
+
+  public CacheStats getCacheStats(String partitionTopic) {
+    LoadingCache<?, ?> cache = readerCacheMap.get(partitionTopic);
+    return cache != null ? new CacheStats(cache.stats()) : null;
+  }
+
+  private void closeReaderSilently(Reader<T> reader) {
+    try {
+      if (reader != null && reader.isConnected()) {
+        reader.close();
+      }
+    } catch (Exception e) {
+      log.error("Error closing reader for topic {}", reader.getTopic(), e);
+    }
+  }
+
+  public static class CacheStats {
+    private final com.github.benmanes.caffeine.cache.stats.CacheStats stats;
+
+    CacheStats(com.github.benmanes.caffeine.cache.stats.CacheStats stats) {
+      this.stats = stats;
+    }
+
+    public long hitCount() {
+      return stats.hitCount();
+    }
+
+    public long missCount() {
+      return stats.missCount();
+    }
+
+    public long loadSuccessCount() {
+      return stats.loadSuccessCount();
+    }
+
+    public long loadFailureCount() {
+      return stats.loadFailureCount();
+    }
+  }
+}
diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java
new file mode 100644
index 0000000..c8281ec
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+public class ReaderCacheProvider {
+  private static final Map<String, Map<Schema<?>, ReaderCache<?>>> CACHE_MAP =
+      new ConcurrentHashMap<>();
+
+  public static <T> ReaderCache<T> getOrCreateReaderCache(
+      String subscription,
+      String brokerCluster,
+      Schema<T> schema,
+      PulsarClient client,
+      OffsetToMessageIdCache offsetToMessageIdCache) {
+    Map<Schema<?>, ReaderCache<?>> partitionReaderCache =
+        CACHE_MAP.computeIfAbsent(brokerCluster, key -> new ConcurrentHashMap<>());
+    @SuppressWarnings("unchecked")
+    ReaderCache<T> cache =
+        (ReaderCache<T>)
+            partitionReaderCache.computeIfAbsent(
+                schema,
+                key -> new ReaderCache<>(subscription, client, offsetToMessageIdCache, schema));
+    return cache;
+  }
+}
diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java
similarity index 74%
rename from pulsar-client-common-contrib/src/test/java/DemoTest.java
rename to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java
index 7b2a7c4..e53ef19 100644
--- a/pulsar-client-common-contrib/src/test/java/DemoTest.java
+++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java
@@ -11,14 +11,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class DemoTest {
-
-  @Test
-  public void testDemo() {
-    log.info("=== Test started ===");
-  }
-}
+package org.apache.pulsar.client.util;
diff --git a/pulsar-client-common-contrib/src/main/resources/pulsar-container.properties b/pulsar-client-common-contrib/src/main/resources/pulsar-container.properties
new file mode 100644
index 0000000..2bd2e79
--- /dev/null
+++ b/pulsar-client-common-contrib/src/main/resources/pulsar-container.properties
@@ -0,0 +1,15 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pulsar.version=4.1.1
\ No newline at end of file
diff --git a/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java
new file mode 100644
index 0000000..dc49026
--- /dev/null
+++ b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarPullConsumer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.impl.PulsarPullConsumerImpl;
+import org.apache.pulsar.client.common.Constants;
+import org.apache.pulsar.client.common.ConsumeStats;
+import org.apache.pulsar.client.common.PullRequest;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarPullConsumerTest {
+
+  String nonPartitionedTopic = "persistent://public/default/my-topic1";
+  String partitionedTopic = "persistent://public/default/my-topic2";
+  PulsarAdmin pulsarAdmin;
+  PulsarClient pulsarClient;
+
+  public PulsarPullConsumerTest() throws PulsarClientException {}
+
+  @BeforeClass
+  public void setup() throws Exception {
+    try {
+      pulsarAdmin = SingletonPulsarContainer.createPulsarAdmin();
+      pulsarClient = SingletonPulsarContainer.createPulsarClient();
+
+      pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+      pulsarAdmin.topics().createPartitionedTopic(partitionedTopic, 2);
+      log.info("Created topics: {}, {}", nonPartitionedTopic, partitionedTopic);
+    } catch (Exception e) {
+      log.info(
+          "Topics already exist, skipping creation: {}, {}", nonPartitionedTopic, partitionedTopic);
+    }
+  }
+
+  @AfterClass
+  public void cleanup() throws Exception {
+    pulsarAdmin.topics().delete(nonPartitionedTopic, true);
+    pulsarAdmin.topics().deletePartitionedTopic(partitionedTopic, true);
+    if (pulsarClient != null) {
+      pulsarClient.close();
+    }
+    if (pulsarAdmin != null) {
+      pulsarAdmin.close();
+    }
+  }
+
+  @DataProvider(name = "testData")
+  public Object[][] testData() {
+    return new Object[][] {
+      {nonPartitionedTopic, Constants.PARTITION_NONE_INDEX},
+      {partitionedTopic, 0},
+      {partitionedTopic, 1}
+    };
+  }
+
+  /**
+   * Case test design: 1. Single partition topicA 1. Send one thousand messages 2. Create a
+   * PullConsumer, subscribe to topicA, and pull messages. 3. Verify message Exactly-once 4. Verify
+   * message consumption status 2. Multi-partition topicB 1. Send one thousand messages 3. Create
+   * multiple PullConsumers and subscribe to each partition of topicB. 4. Each PullConsumer pulls
+   * messages and verifies that the message is Exactly-once. 5. Verify message consumption status
+   */
+  @Test(dataProvider = "testData")
+  public void testPullConsumer(String topic, int partitionIndex) throws Exception {
+    log.info("Starting testPullConsumer with topic: {}, partitionIndex: {}", topic, partitionIndex);
+    topic =
+        partitionIndex == Constants.PARTITION_NONE_INDEX
+            ? topic
+            : topic + "-partition-" + partitionIndex;
+    String subscription = "my-subscription";
+    String brokerCluster = "sit";
+    @Cleanup
+    PulsarPullConsumer<byte[]> pullConsumer =
+        new PulsarPullConsumerImpl<>(
+            topic,
+            subscription,
+            brokerCluster,
+            Schema.BYTES,
+            () -> pulsarClient,
+            pulsarAdmin,
+            null);
+    pullConsumer.start();
+
+    @Cleanup
+    Producer<byte[]> producer =
+        pulsarClient.newProducer(Schema.BYTES).topic(topic).enableBatching(false).create();
+
+    Set<String> sent = new HashSet<>();
+    for (int i = 0; i < 1000; i++) {
+      String message = "Hello-Pulsar-" + i;
+      MessageId messageId = producer.send(message.getBytes());
+      sent.add(message);
+      log.info("Sent message: {} with id: {}", message, messageId);
+    }
+
+    ConsumeStats consumeStats = pullConsumer.getConsumeStats(partitionIndex);
+    long offset = consumeStats.getLastConsumedOffset();
+    Set<String> received = new HashSet<>();
+    while (true) {
+      List<Message<byte[]>> messages =
+          pullConsumer
+              .pull(
+                  PullRequest.builder()
+                      .offset(offset)
+                      .partition(partitionIndex)
+                      .maxMessages(10)
+                      .maxBytes(1024 * 1024)
+                      .timeout(java.time.Duration.ofSeconds(10))
+                      .build())
+              .getMessages();
+      log.info("Pulled {} messages from topic {}", messages.size(), topic);
+      if (messages.isEmpty()) {
+        log.info("No more messages to pull, exiting...");
+        break;
+      }
+      for (Message<byte[]> message : messages) {
+        if (!received.add(new String(message.getData()))) {
+          log.error("Duplicate message detected: {}", new String(message.getData()));
+        }
+      }
+      long consumedIndex = messages.get(messages.size() - 1).getIndex().get();
+      pullConsumer.ack(consumedIndex, partitionIndex);
+      offset = consumedIndex + 1;
+      log.info("Acknowledged messages up to index: {}", consumedIndex);
+    }
+    offset = pullConsumer.getConsumeStats(partitionIndex).getLastConsumedOffset();
+    log.info("Final consume offset for non-partitioned topic: {}", offset);
+    log.info(
+        "received {} unique messages from non-partitioned topic, it is equals to sent {}",
+        received.size(),
+        received.equals(sent));
+    assert received.equals(sent) : "Received messages do not match sent messages";
+  }
+
+  @Test(dataProvider = "testData")
+  public void testSearchOffset(String topic, int partitionIndex) throws Exception {
+    topic =
+        partitionIndex == Constants.PARTITION_NONE_INDEX
+            ? topic
+            : topic + "-partition-" + partitionIndex;
+    String subscription = "my-subscription";
+    String brokerCluster = "sit";
+    @Cleanup
+    PulsarPullConsumer<byte[]> pullConsumer =
+        new PulsarPullConsumerImpl<>(
+            topic,
+            subscription,
+            brokerCluster,
+            Schema.BYTES,
+            () -> pulsarClient,
+            pulsarAdmin,
+            null);
+    pullConsumer.start();
+
+    @Cleanup
+    Producer<byte[]> producer =
+        pulsarClient.newProducer(Schema.BYTES).topic(topic).enableBatching(false).create();
+
+    long timestamp = 0;
+    MessageIdAdv messageId = null;
+    for (int i = 0; i < 10; i++) {
+      String message = "Hello-Pulsar-" + i;
+      timestamp = System.currentTimeMillis();
+      messageId = (MessageIdAdv) producer.send(message.getBytes());
+    }
+    for (int i = 0; i < 10; i++) {
+      String message = "Hello-Pulsar-" + i;
+      producer.send(message.getBytes());
+      System.currentTimeMillis();
+    }
+    long offset = pullConsumer.searchOffset(partitionIndex, timestamp);
+    MessageIdAdv searchedMessageId =
+        (MessageIdAdv) pulsarAdmin.topics().getMessageIdByIndex(topic, offset);
+    assert messageId.getEntryId() == searchedMessageId.getEntryId()
+            && messageId.getLedgerId() == searchedMessageId.getLedgerId()
+        : "Searched message ID does not match expected message ID";
+  }
+
+  @Test
+  public void testGetConsumeStats() {
+    try {
+      String subscription = "test-subscription";
+      String brokerCluster = "sit";
+      @Cleanup
+      PulsarPullConsumer<byte[]> pullConsumer =
+          new PulsarPullConsumerImpl<>(
+              nonPartitionedTopic,
+              subscription,
+              brokerCluster,
+              Schema.BYTES,
+              () -> pulsarClient,
+              pulsarAdmin,
+              null);
+      pullConsumer.start();
+
+      @Cleanup
+      Producer<byte[]> producer =
+          pulsarClient
+              .newProducer(Schema.BYTES)
+              .topic(nonPartitionedTopic)
+              .enableBatching(false)
+              .create();
+
+      for (int i = 0; i < 10; i++) {
+        String message = "Hello-Pulsar-" + i;
+        producer.send(message.getBytes());
+      }
+
+      ConsumeStats offset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX);
+      log.info("Initial consume offset for topic {}: {}", nonPartitionedTopic, offset);
+      Assert.assertEquals(offset.getLastConsumedOffset(), -1L);
+      Assert.assertEquals(offset.getMaxOffset(), 9L);
+      Assert.assertEquals(offset.getMinOffset(), -1L);
+      // Simulate some message processing
+      pullConsumer.ack(offset.getLastConsumedOffset() + 10, Constants.PARTITION_NONE_INDEX);
+      Thread.sleep(1000);
+      ConsumeStats newOffset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX);
+      Assert.assertEquals(newOffset.getLastConsumedOffset(), 9L);
+      Assert.assertEquals(newOffset.getMaxOffset(), 9L);
+      Assert.assertEquals(newOffset.getMinOffset(), -1L);
+      log.info("New consume offset after ack: {}", newOffset);
+    } catch (Exception e) {
+      log.error("Error during testExamineConsumeStats", e);
+    }
+  }
+}
diff --git a/pulsar-client-common-contrib/src/test/java/SingletonPulsarContainer.java b/pulsar-client-common-contrib/src/test/java/SingletonPulsarContainer.java
new file mode 100644
index 0000000..96976af
--- /dev/null
+++ b/pulsar-client-common-contrib/src/test/java/SingletonPulsarContainer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Slf4j
+public class SingletonPulsarContainer {
+
+  private static final PulsarContainer PULSAR_CONTAINER;
+
+  static {
+    PULSAR_CONTAINER =
+            new PulsarContainer(getPulsarImage())
+                    .withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true")
+                    .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true")
+                    .withEnv("PULSAR_PREFIX_brokerEntryMetadataInterceptors",
+                            "org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor, "
+                                    + "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor")
+                    .withEnv("PULSAR_PREFIX_exposingBrokerEntryMetadataToClientEnabled", "true")
+                    .withStartupTimeout(Duration.ofMinutes(3));
+    PULSAR_CONTAINER.start();
+  }
+
+  private static DockerImageName getPulsarImage() {
+    return DockerImageName.parse("apachepulsar/pulsar:" + getPulsarImageVersion());
+  }
+
+  private static String getPulsarImageVersion() {
+    String pulsarVersion = "";
+    Properties properties = new Properties();
+    try {
+      properties.load(
+              SingletonPulsarContainer.class
+                      .getClassLoader()
+                      .getResourceAsStream("pulsar-container.properties"));
+      if (!properties.isEmpty()) {
+        pulsarVersion = properties.getProperty("pulsar.version");
+      }
+    } catch (IOException e) {
+      log.error("Failed to load pulsar version. " + e.getCause());
+    }
+    return pulsarVersion;
+  }
+
+  static PulsarClient createPulsarClient() throws PulsarClientException {
+    return PulsarClient.builder()
+            .serviceUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getPulsarBrokerUrl())
+            .enableTransaction(true)
+            .build();
+  }
+
+  static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+    return PulsarAdmin.builder()
+            .serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+            .build();
+  }
+}