[Java Client] fixed Producer semaphore permit release issue (#13682)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 4e93124..78fc659 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -72,11 +72,13 @@
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages);
+ Assert.assertFalse(producer.isErrorStat());
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
futures.clear();
// Simulate replicator, non batching message but `numMessagesInBatch` of message metadata > 1
@@ -89,15 +91,18 @@
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages/2);
+ Assert.assertFalse(producer.isErrorStat());
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
futures.clear();
// Here must ensure that the semaphore available permits is 0
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
// Acquire 5 and not wait the send ack call back
producer.getClientCnx().channel().config().setAutoRead(false);
@@ -108,12 +113,14 @@
// Here must ensure that the Semaphore a acquired 5
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages / 2);
+ Assert.assertFalse(producer.isErrorStat());
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
}
/**
@@ -138,6 +145,7 @@
// Test that when we fill the queue with "replicator" messages, we are notified
// (replicator itself would block)
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < pendingQueueSize; i++) {
@@ -148,6 +156,7 @@
futures.add(producer.sendAsync(msg));
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+ Assert.assertFalse(producer.isErrorStat());
try {
MessageMetadata metadata = new MessageMetadata()
.setNumMessagesInBatch(10);
@@ -159,6 +168,7 @@
Assert.assertEquals(ee.getCause().getClass(),
PulsarClientException.ProducerQueueIsFullError.class);
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+ Assert.assertFalse(producer.isErrorStat());
}
} finally {
producer.getClientCnx().channel().config().setAutoRead(true);
@@ -168,12 +178,14 @@
// Test that when we fill the queue with normal messages, we get an error
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
producer.getClientCnx().channel().config().setAutoRead(false);
try {
for (int i = 0; i < pendingQueueSize; i++) {
futures.add(producer.newMessage().value(("Semaphore-test-" + i).getBytes()).sendAsync());
}
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+ Assert.assertFalse(producer.isErrorStat());
try {
producer.newMessage().value(("Semaphore-test-Q-full").getBytes()).sendAsync().get();
@@ -181,6 +193,7 @@
Assert.assertEquals(ee.getCause().getClass(),
PulsarClientException.ProducerQueueIsFullError.class);
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
+ Assert.assertFalse(producer.isErrorStat());
}
} finally {
@@ -188,5 +201,6 @@
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ Assert.assertFalse(producer.isErrorStat());
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index cf9a234..2f630ba 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -266,11 +266,12 @@
protected void semaphoreRelease(final int releaseCountRequest) {
if (semaphore.isPresent()) {
if (!errorState) {
- final int availablePermits = semaphore.get().availablePermits();
- if (availablePermits - releaseCountRequest < 0) {
- log.error("Semaphore permit release count request greater then availablePermits" +
- " : availablePermits={}, releaseCountRequest={}",
- availablePermits, releaseCountRequest);
+ final int availableReleasePermits =
+ conf.getMaxPendingMessages() - this.semaphore.get().availablePermits();
+ if (availableReleasePermits - releaseCountRequest < 0) {
+ log.error("Semaphore permit release count request greater then availableReleasePermits" +
+ " : availableReleasePermits={}, releaseCountRequest={}",
+ availableReleasePermits, releaseCountRequest);
errorState = true;
}
}
@@ -2107,5 +2108,10 @@
return semaphore;
}
+ @VisibleForTesting
+ boolean isErrorStat() {
+ return errorState;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
}