KAFKA-19972: Bump delivery count on session release (#21092)

If a client application crashes then it can go in infinte loop where
same records will be delivered. Though previously we chose to decrease
the delivery count on session release as we didn't have throttling
support. Now when we do then it makes sense to bump the delivery count
on session close. Also as share-groups clients should ideally not have
pre-fetched data hence it's safe to bump the delivery count on session
release.

I have not removed the code to decrease the delivery count as that
functionality is well tested and we might need at the time of
pre-fetching support or in cases where we do need not to bump the
delivery count, in future.

Reviewers: Andrew Schofield <aschofield@confluent.io>
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d40f223..85fa049 100644
--- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2219,29 +2219,6 @@
     }
 
     @ClusterTest
-    public void testDeliveryCountNotIncreaseAfterSessionClose() {
-        alterShareAutoOffsetReset("group1", "earliest");
-        try (Producer<byte[], byte[]> producer = createProducer()) {
-            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
-            // We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
-            for (int i = 0; i < 10; i++) {
-                assertDoesNotThrow(() -> producer.send(record).get(), "Failed to send records");
-            }
-        }
-
-        // Perform the fetch, close in a loop.
-        for (int count = 0; count < ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
-            consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
-        }
-
-        // If the delivery count is increased, consumer will get nothing.
-        int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, true);
-        // The records returned belong to offsets 0-9.
-        assertEquals(10, consumedMessageCount);
-        verifyShareGroupStateTopicRecordsProduced();
-    }
-
-    @ClusterTest
     public void testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
@@ -2270,13 +2247,13 @@
             ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 2);
             assertEquals(2, records.count());
             assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
-            assertEquals((short) 1, records.records(tp).get(1).deliveryCount().get());
+            assertEquals((short) 2, records.records(tp).get(1).deliveryCount().get());
         }
     }
 
     @ClusterTest(
         serverProperties = {
-            @ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "2"),
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "3"),
         }
     )
     public void testBehaviorOnDeliveryCountBoundary() {
@@ -2304,7 +2281,6 @@
             records = waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
             assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
-
         }
 
         // Start again and same record should be delivered
@@ -2312,7 +2288,7 @@
             shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
-            assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
+            assertEquals((short) 3, records.records(tp).get(0).deliveryCount().get());
         }
     }
 
@@ -2369,9 +2345,9 @@
         // Let the complex consumer read the messages.
         service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS);
 
-        // All messages which can be read are read, some would be redelivered (roughly 3 times the records produced).
+        // All messages which can be read are read, some would be redelivered (roughly 2 times the records produced).
         TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
-        int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 3 * 0.95);    // 3 times with margin of error (5%).
+        int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 2 * 0.95);    // 2 times with margin of error (5%).
 
         assertTrue(delta > 0,
             String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead()));
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java
index 1043adb..243a740 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1116,7 +1116,7 @@
                 // These records were fetched but they were not actually delivered to the client.
                 InFlightState updateResult = offsetState.getValue().startStateTransition(
                         offsetState.getKey() < startOffset ? RecordState.ARCHIVED : recordState,
-                        DeliveryCountOps.DECREASE,
+                        DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount,
                         EMPTY_MEMBER_ID
                 );
@@ -1158,7 +1158,7 @@
         if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
             InFlightState updateResult = inFlightBatch.startBatchStateTransition(
                     inFlightBatch.lastOffset() < startOffset ? RecordState.ARCHIVED : recordState,
-                    DeliveryCountOps.DECREASE,
+                    DeliveryCountOps.NO_OP,
                     this.maxDeliveryCount,
                     EMPTY_MEMBER_ID
             );
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index beae9e4..f210ab2 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5034,7 +5034,7 @@
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).batchState());
         // Release delivery count.
-        assertEquals(0, sharePartition.cachedState().get(0L).batchDeliveryCount());
+        assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(0L).offsetState());
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
@@ -5053,7 +5053,7 @@
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
-        assertEquals(0, sharePartition.cachedState().get(5L).batchDeliveryCount());
+        assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(5L).offsetState());
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
@@ -5126,7 +5126,7 @@
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
 
@@ -5140,8 +5140,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(10, sharePartition.deliveryCompleteCount());
     }
@@ -5192,8 +5192,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
 
@@ -5216,8 +5216,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
     }
@@ -5268,8 +5268,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
 
@@ -5289,7 +5289,7 @@
         // Check cached state.
         expectedOffsetStateMap.clear();
         expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
         expectedOffsetStateMap.clear();
         expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -5301,8 +5301,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
     }
@@ -5339,7 +5339,7 @@
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
@@ -5370,9 +5370,9 @@
 
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
-        assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(10L).batchState());
         assertNull(sharePartition.cachedState().get(10L).offsetState());
-        assertEquals(0, sharePartition.deliveryCompleteCount());
+        assertEquals(5, sharePartition.deliveryCompleteCount());
     }
 
     @Test
@@ -5417,26 +5417,25 @@
         assertNotNull(sharePartition.cachedState().get(10L).offsetState());
         assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(15L).batchState());
         assertNotNull(sharePartition.cachedState().get(10L).offsetState());
-        assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState());
-        assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState());
         assertNull(sharePartition.cachedState().get(20L).offsetState());
 
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
 
         expectedOffsetStateMap.clear();
-        expectedOffsetStateMap.put(15L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(16L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(19L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(15L).offsetState());
-        assertEquals(3, sharePartition.deliveryCompleteCount());
+        assertEquals(12, sharePartition.deliveryCompleteCount());
     }
 
     @Test
@@ -5604,7 +5603,7 @@
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
-        assertEquals(0, sharePartition.cachedState().get(5L).batchDeliveryCount());
+        assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(5L).offsetState());
         // Acquisition lock timer task would be cancelled by the release acquired records operation.
         assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -5649,7 +5648,7 @@
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState());
 
@@ -5663,8 +5662,8 @@
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
 
         // Acquisition lock timer task would be cancelled by the release acquired records operation.
@@ -6853,7 +6852,7 @@
         expectedOffsetStateMap.put(21L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(22L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(23L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(24L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(24L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
 
         assertEquals(0, sharePartition.deliveryCompleteCount());
 
@@ -6869,8 +6868,8 @@
         expectedOffsetStateMap.put(35L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(36L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(37L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(38L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(39L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(38L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(39L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(35L).offsetState());
     }
 
@@ -6932,10 +6931,10 @@
 
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
 
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
 
@@ -6944,7 +6943,7 @@
     }
 
     @Test
-    public void testReleaseAcquiredRecordsDecreaseDeliveryCount() {
+    public void testReleaseAcquiredRecordsDoNotDecreaseDeliveryCount() {
         SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
@@ -6986,10 +6985,10 @@
         // After release, the delivery count was decremented.
         expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState());
         assertEquals(2, sharePartition.deliveryCompleteCount());
     }
@@ -11252,8 +11251,8 @@
         expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(15L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(16L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));