blob: 74fde35734109fda2b7acfa9520c6340877e58af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.protonj2.client.Link;
import org.apache.qpid.protonj2.client.LinkOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.Receiver;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for client link types that wrap a proton receiver to provide
* delivery dispatch in some manner.
*
* @param <ReceiverType> The client receiver type that is being implemented.
*/
public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected ClientFuture<ReceiverType> drainingFuture;
protected ScheduledFuture<?> drainingTimeout;
protected Receiver protonReceiver;
protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) {
super(session, linkId, options);
this.protonReceiver = protonReceiver;
this.protonReceiver.setLinkedResource(self());
}
@Override
protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
return protonReceiver;
}
/**
* Apply the given disposition and settlement state to the given incoming delivery instance.
*
* @param delivery
* The incoming delivery that will be acted upon
* @param state
* The delivery state to apply to the given incoming delivery
* @param settle
* The settlement state to apply to the given incoming delivery
*
* @throws ClientException if an error occurs while applying the disposition to the delivery.
*/
void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
checkClosedOrFailed();
executor.execute(() -> {
session.getTransactionContext().disposition(delivery, state, settle);
replenishCreditIfNeeded();
});
}
//----- Abstract API that receiver must implement as they are implementation specific
protected abstract void replenishCreditIfNeeded();
protected abstract void handleDeliveryRead(IncomingDelivery delivery);
//----- API that receiver may override if they need additional handling
protected void handleDeliveryAborted(IncomingDelivery delivery) {
LOG.trace("Delivery data was aborted: {}", delivery);
delivery.settle();
replenishCreditIfNeeded();
}
protected void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
LOG.trace("Delivery remote state was updated: {}", delivery);
}
protected void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
LOG.trace("Receiver credit update by remote: {}", receiver);
if (drainingFuture != null) {
if (receiver.getCredit() == 0) {
drainingFuture.complete(self());
if (drainingTimeout != null) {
drainingTimeout.cancel(false);
drainingTimeout = null;
}
}
}
}
//----- Default receiver link handling of proton engine events
@Override
protected void linkSpecificLocalOpenHandler() {
protonLink().deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
.deliveryReadHandler(this::handleDeliveryRead)
.deliveryAbortedHandler(this::handleDeliveryAborted)
.creditStateUpdateHandler(this::handleReceiverCreditUpdated);
}
@Override
protected void linkSpecificRemoteOpenHandler() {
replenishCreditIfNeeded();
}
@Override
protected void linkSpecificLocalCloseHandler() {
// Nothing needed for local close handling
}
@Override
protected void linkSpecificRemoteCloseHandler() {
// Nothing needed for receiver link remote close
}
@Override
protected void linkSpecificCleanupHandler(ClientException failureCause) {
if (drainingTimeout != null) {
drainingFuture.failed(
failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
drainingTimeout.cancel(false);
drainingTimeout = null;
}
}
}