PROTON-2541 Finish work on providing more common base receiver API
Pulls up most of the common and repeated bits in receiver
implementations into the base receiver link type.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index a042c91..7bb52e0 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -17,6 +17,7 @@
package org.apache.qpid.protonj2.client.impl;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -47,7 +48,7 @@
public abstract class ClientLinkType<LinkType extends Link<LinkType>,
ProtonType extends org.apache.qpid.protonj2.engine.Link<ProtonType>> implements Link<LinkType> {
- private static final Logger LOG = LoggerFactory.getLogger(ClientLinkType.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@SuppressWarnings("rawtypes")
protected static final AtomicIntegerFieldUpdater<ClientLinkType> CLOSED_UPDATER =
@@ -246,20 +247,20 @@
return openFuture;
}
- LinkType remotelyClosedHandler(Consumer<LinkType> handler) {
+ final LinkType remotelyClosedHandler(Consumer<LinkType> handler) {
this.linkRemotelyClosedHandler = handler;
return self();
}
- String getId() {
+ final String getId() {
return linkId;
}
- void setFailureCause(ClientException failureCause) {
+ final void setFailureCause(ClientException failureCause) {
this.failureCause = failureCause;
}
- ClientException getFailureCause() {
+ final ClientException getFailureCause() {
if (failureCause == null) {
return session.getFailureCause();
} else {
@@ -267,11 +268,11 @@
}
}
- boolean isClosed() {
+ final boolean isClosed() {
return closed > 0;
}
- boolean isDynamic() {
+ final boolean isDynamic() {
if (protonLink().isSender()) {
return protonLink().getTarget() != null && protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().isDynamic();
} else {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index e025d35..91c7270 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -16,8 +16,8 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import java.lang.invoke.MethodHandles;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.Delivery;
@@ -26,13 +26,11 @@
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.client.util.FifoDeliveryQueue;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.types.messaging.Accepted;
import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +39,7 @@
*/
public final class ClientReceiver extends ClientReceiverLinkType<Receiver> implements Receiver {
- private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
-
- private ClientFuture<Receiver> drainingFuture;
- private ScheduledFuture<?> drainingTimeout;
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ReceiverOptions options;
private final FifoDeliveryQueue messageQueue;
@@ -75,9 +70,9 @@
ClientDelivery delivery = messageQueue.dequeue(units.toMillis(timeout));
if (delivery != null) {
if (options.autoAccept()) {
- asyncApplyDisposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
- } else {
- asyncReplenishCreditIfNeeded();
+ disposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
+ } else if (options.creditWindow() > 0) {
+ executor.execute(() -> replenishCreditIfNeeded());
}
return delivery;
@@ -100,8 +95,8 @@
if (delivery != null) {
if (options.autoAccept()) {
delivery.disposition(org.apache.qpid.protonj2.client.DeliveryState.accepted(), options.autoSettle());
- } else {
- asyncReplenishCreditIfNeeded();
+ } else if (options.creditWindow() > 0) {
+ executor.execute(() -> replenishCreditIfNeeded());
}
} else {
checkClosedOrFailed();
@@ -172,24 +167,14 @@
//----- Internal API for the ClientReceiver and other Client objects
@Override
- void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
- checkClosedOrFailed();
- asyncApplyDisposition(delivery, state, settle);
- }
-
- @Override
- boolean isDynamic() {
- return protonReceiver.getSource() != null && protonReceiver.getSource().isDynamic();
- }
-
- @Override
protected Receiver self() {
return this;
}
//----- Handlers for proton receiver events
- private void handleDeliveryReceived(IncomingDelivery delivery) {
+ @Override
+ protected void handleDeliveryRead(IncomingDelivery delivery) {
LOG.trace("Delivery data was received: {}", delivery);
if (delivery.getDefaultDeliveryState() == null) {
@@ -197,47 +182,17 @@
}
if (!delivery.isPartial()) {
- LOG.trace("{} has incoming Message(s).", this);
+ LOG.trace("Receiver {} has incoming Message(s).", linkId);
messageQueue.enqueue(new ClientDelivery(this, delivery));
} else {
delivery.claimAvailableBytes();
}
}
- private void handleDeliveryAborted(IncomingDelivery delivery) {
- LOG.trace("Delivery data was aborted: {}", delivery);
- delivery.settle();
- replenishCreditIfNeeded();
- }
-
- private void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
- LOG.trace("Delivery remote state was updated: {}", delivery);
- }
-
- private void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
- LOG.trace("Receiver credit update by remote: {}", receiver);
-
- if (drainingFuture != null) {
- if (receiver.getCredit() == 0) {
- drainingFuture.complete(this);
- if (drainingTimeout != null) {
- drainingTimeout.cancel(false);
- drainingTimeout = null;
- }
- }
- }
- }
-
//----- Private implementation details
- private void asyncApplyDisposition(IncomingDelivery delivery, DeliveryState state, boolean settle) {
- executor.execute(() -> {
- session.getTransactionContext().disposition(delivery, state, settle);
- replenishCreditIfNeeded();
- });
- }
-
- private void replenishCreditIfNeeded() {
+ @Override
+ protected void replenishCreditIfNeeded() {
int creditWindow = options.creditWindow();
if (creditWindow > 0) {
int currentCredit = protonReceiver.getCredit();
@@ -247,7 +202,7 @@
if (potentialPrefetch <= creditWindow * 0.7) {
int additionalCredit = creditWindow - potentialPrefetch;
- LOG.trace("Consumer granting additional credit: {}", additionalCredit);
+ LOG.trace("Receiver {} granting additional credit: {}", linkId, additionalCredit);
try {
protonReceiver.addCredit(additionalCredit);
} catch (Exception ex) {
@@ -258,13 +213,6 @@
}
}
- private void asyncReplenishCreditIfNeeded() {
- int creditWindow = options.creditWindow();
- if (creditWindow > 0) {
- executor.execute(() -> replenishCreditIfNeeded());
- }
- }
-
@Override
protected void recreateLinkForReconnect() {
int previousCredit = protonReceiver.getCredit() + messageQueue.size();
@@ -288,35 +236,8 @@
}
@Override
- protected void linkSpecificCleanupHandler(ClientException failureCause) {
- if (drainingTimeout != null) {
- drainingFuture.failed(
- failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
- drainingTimeout.cancel(false);
- drainingTimeout = null;
- }
- }
-
- @Override
- protected void linkSpecificLocalOpenHandler() {
- protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
- .deliveryReadHandler(this::handleDeliveryReceived)
- .deliveryAbortedHandler(this::handleDeliveryAborted)
- .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
- }
-
- @Override
protected void linkSpecificLocalCloseHandler() {
messageQueue.stop(); // Ensure blocked receivers are all unblocked.
- }
-
- @Override
- protected void linkSpecificRemoteOpenHandler() {
- replenishCreditIfNeeded();
- }
-
- @Override
- protected void linkSpecificRemoteCloseHandler() {
- // Nothing needed for receiver link remote close
+ messageQueue.clear();
}
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
index a2a00fa..76df24e 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -17,12 +17,19 @@
package org.apache.qpid.protonj2.client.impl;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ScheduledFuture;
+
import org.apache.qpid.protonj2.client.Link;
import org.apache.qpid.protonj2.client.LinkOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.Receiver;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class for client link types that wrap a proton receiver to provide
@@ -30,6 +37,11 @@
*/
public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected ClientFuture<ReceiverType> drainingFuture;
+ protected ScheduledFuture<?> drainingTimeout;
+
protected Receiver protonReceiver;
protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) {
@@ -55,6 +67,78 @@
*
* @throws ClientException if an error occurs while applying the disposition to the delivery.
*/
- abstract void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException;
+ void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
+ checkClosedOrFailed();
+ executor.execute(() -> {
+ session.getTransactionContext().disposition(delivery, state, settle);
+ replenishCreditIfNeeded();
+ });
+ }
+ //----- Abstract API that receiver must implement as they are implementation specific
+
+ protected abstract void replenishCreditIfNeeded();
+
+ protected abstract void handleDeliveryRead(IncomingDelivery delivery);
+
+ //----- API that receiver may override if they need additional handling
+
+ protected void handleDeliveryAborted(IncomingDelivery delivery) {
+ LOG.trace("Delivery data was aborted: {}", delivery);
+ delivery.settle();
+ replenishCreditIfNeeded();
+ }
+
+ protected void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
+ LOG.trace("Delivery remote state was updated: {}", delivery);
+ }
+
+ protected void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
+ LOG.trace("Receiver credit update by remote: {}", receiver);
+
+ if (drainingFuture != null) {
+ if (receiver.getCredit() == 0) {
+ drainingFuture.complete(self());
+ if (drainingTimeout != null) {
+ drainingTimeout.cancel(false);
+ drainingTimeout = null;
+ }
+ }
+ }
+ }
+
+ //----- Default receiver link handling of proton engine events
+
+ @Override
+ protected void linkSpecificLocalOpenHandler() {
+ protonLink().deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
+ .deliveryReadHandler(this::handleDeliveryRead)
+ .deliveryAbortedHandler(this::handleDeliveryAborted)
+ .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
+ }
+
+ @Override
+ protected void linkSpecificRemoteOpenHandler() {
+ replenishCreditIfNeeded();
+ }
+
+ @Override
+ protected void linkSpecificLocalCloseHandler() {
+ // Nothing needed for local close handling
+ }
+
+ @Override
+ protected void linkSpecificRemoteCloseHandler() {
+ // Nothing needed for receiver link remote close
+ }
+
+ @Override
+ protected void linkSpecificCleanupHandler(ClientException failureCause) {
+ if (drainingTimeout != null) {
+ drainingFuture.failed(
+ failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
+ drainingTimeout.cancel(false);
+ drainingTimeout = null;
+ }
+ }
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 9ab35c8..067b4b8 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -16,6 +16,7 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -34,7 +35,6 @@
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@
*/
public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver {
- private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ClientFuture<StreamReceiver> drainingFuture;
private ScheduledFuture<?> drainingTimeout;
@@ -102,7 +102,9 @@
}
} else {
receive.complete(new ClientStreamDelivery(this, delivery));
- asyncReplenishCreditIfNeeded();
+ if (options.creditWindow() > 0) {
+ executor.execute(() -> replenishCreditIfNeeded());
+ }
}
}
});
@@ -177,17 +179,10 @@
executor.execute(() -> {
if (notClosedOrFailed(request)) {
- int queued = 0;
-
// Scan for an unsettled delivery that isn't yet assigned to a client delivery
// either it is a complete delivery or the initial stage of the next incoming
- for (IncomingDelivery unsettled : protonReceiver.unsettled()) {
- if (unsettled.getLinkedResource() == null) {
- queued++;
- }
- }
-
- request.complete(queued);
+ request.complete((int)
+ protonReceiver.unsettled().stream().filter(delivery -> delivery.getLinkedResource() == null).count());
}
});
@@ -200,9 +195,15 @@
return options;
}
+ @Override
+ protected StreamReceiver self() {
+ return this;
+ }
+
//----- Handlers for proton receiver events
- private void handleDeliveryRead(IncomingDelivery delivery) {
+ @Override
+ protected void handleDeliveryRead(IncomingDelivery delivery) {
LOG.trace("Delivery data was received: {}", delivery);
if (delivery.getDefaultDeliveryState() == null) {
delivery.setDefaultDeliveryState(Released.getInstance());
@@ -223,31 +224,9 @@
entry.getKey().complete(new ClientStreamDelivery(this, delivery));
} finally {
entries.remove();
- asyncReplenishCreditIfNeeded();
- }
- }
- }
- }
-
- private void handleDeliveryAborted(IncomingDelivery delivery) {
- LOG.trace("Delivery data was aborted: {}", delivery);
- delivery.settle();
- replenishCreditIfNeeded();
- }
-
- private void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
- LOG.trace("Delivery remote state was updated: {}", delivery);
- }
-
- private void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
- LOG.trace("Receiver credit update by remote: {}", receiver);
-
- if (drainingFuture != null) {
- if (receiver.getCredit() == 0) {
- drainingFuture.complete(this);
- if (drainingTimeout != null) {
- drainingTimeout.cancel(false);
- drainingTimeout = null;
+ if (options.creditWindow() > 0) {
+ executor.execute(() -> replenishCreditIfNeeded());
+ }
}
}
}
@@ -256,26 +235,7 @@
//----- Private implementation details
@Override
- void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
- checkClosedOrFailed();
- asyncApplyDisposition(delivery, state, settle);
- }
-
- private void asyncApplyDisposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
- executor.execute(() -> {
- session.getTransactionContext().disposition(delivery, state, settle);
- replenishCreditIfNeeded();
- });
- }
-
- private void asyncReplenishCreditIfNeeded() {
- int creditWindow = options.creditWindow();
- if (creditWindow > 0) {
- executor.execute(() -> replenishCreditIfNeeded());
- }
- }
-
- private void replenishCreditIfNeeded() {
+ protected void replenishCreditIfNeeded() {
int creditWindow = options.creditWindow();
if (creditWindow > 0) {
int currentCredit = protonReceiver.getCredit();
@@ -325,40 +285,7 @@
}
});
- if (drainingTimeout != null) {
- drainingFuture.failed(
- failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
- drainingTimeout.cancel(false);
- drainingTimeout = null;
- }
- }
-
- @Override
- protected StreamReceiver self() {
- return this;
- }
-
- @Override
- protected void linkSpecificLocalOpenHandler() {
- protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
- .deliveryReadHandler(this::handleDeliveryRead)
- .deliveryAbortedHandler(this::handleDeliveryAborted)
- .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
- }
-
- @Override
- protected void linkSpecificLocalCloseHandler() {
- // Nothing needed for local close handling
- }
-
- @Override
- protected void linkSpecificRemoteOpenHandler() {
- replenishCreditIfNeeded();
- }
-
- @Override
- protected void linkSpecificRemoteCloseHandler() {
- // Nothing needed for remote close handling
+ super.linkSpecificCleanupHandler(failureCause);
}
@Override