[Java Client] fixed Producer semaphore permit release issue (#13682)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 4e93124..78fc659 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -72,11 +72,13 @@
                 futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
             }
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
@@ -89,15 +91,18 @@
                 futures.add(producer.sendAsync(msg));
             }
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages/2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         futures.clear();
 
         // Here must ensure that the semaphore available permits is 0
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
 
         // Acquire 5 and not wait the send ack call back
         producer.getClientCnx().channel().config().setAutoRead(false);
@@ -108,12 +113,14 @@
 
             // Here must ensure that the Semaphore a acquired 5
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages / 2);
+            Assert.assertFalse(producer.isErrorStat());
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
 
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 
     /**
@@ -138,6 +145,7 @@
         // Test that when we fill the queue with "replicator" messages, we are notified
         // (replicator itself would block)
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
@@ -148,6 +156,7 @@
                 futures.add(producer.sendAsync(msg));
             }
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
             try {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
@@ -159,6 +168,7 @@
                 Assert.assertEquals(ee.getCause().getClass(),
                                     PulsarClientException.ProducerQueueIsFullError.class);
                 Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
             }
         } finally {
             producer.getClientCnx().channel().config().setAutoRead(true);
@@ -168,12 +178,14 @@
 
         // Test that when we fill the queue with normal messages, we get an error
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
         producer.getClientCnx().channel().config().setAutoRead(false);
         try {
             for (int i = 0; i < pendingQueueSize; i++) {
                 futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
             }
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+            Assert.assertFalse(producer.isErrorStat());
 
             try {
                 producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get();
@@ -181,6 +193,7 @@
                 Assert.assertEquals(ee.getCause().getClass(),
                                     PulsarClientException.ProducerQueueIsFullError.class);
                 Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+                Assert.assertFalse(producer.isErrorStat());
 
             }
         } finally {
@@ -188,5 +201,6 @@
         }
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        Assert.assertFalse(producer.isErrorStat());
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cf9a234..2f630ba 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -266,11 +266,12 @@
     protected void semaphoreRelease(final int releaseCountRequest) {
         if (semaphore.isPresent()) {
             if (!errorState) {
-                final int availablePermits = semaphore.get().availablePermits();
-                if (availablePermits - releaseCountRequest < 0) {
-                    log.error("Semaphore permit release count request greater then availablePermits" +
-                                    " : availablePermits={}, releaseCountRequest={}",
-                            availablePermits, releaseCountRequest);
+                final int availableReleasePermits =
+                        conf.getMaxPendingMessages() - this.semaphore.get().availablePermits();
+                if (availableReleasePermits - releaseCountRequest < 0) {
+                    log.error("Semaphore permit release count request greater then availableReleasePermits" +
+                                    " : availableReleasePermits={}, releaseCountRequest={}",
+                            availableReleasePermits, releaseCountRequest);
                     errorState = true;
                 }
             }
@@ -2107,5 +2108,10 @@
         return semaphore;
     }
 
+    @VisibleForTesting
+    boolean isErrorStat() {
+        return errorState;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
 }