blob: 616158525583fa280fda71f810754641d7bc07da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
/**
* Base type used to provide some common plumbing for Tracker types
*
* @param <SenderType> The client sender type that created this tracker
* @param <TrackerType> The actual type of tracker that is being implemented
*/
public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> {
protected final SenderType sender;
protected final OutgoingDelivery delivery;
@SuppressWarnings("rawtypes")
protected static final AtomicIntegerFieldUpdater<ClientTrackable> REMOTELY_SETTLED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ClientTrackable.class, "remotelySettled");
@SuppressWarnings("rawtypes")
protected static final AtomicReferenceFieldUpdater<ClientTrackable, DeliveryState> REMOTEL_DELIVERY_STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ClientTrackable.class, DeliveryState.class, "remoteDeliveryState");
private ClientFuture<TrackerType> remoteSettlementFuture;
private volatile int remotelySettled;
private volatile DeliveryState remoteDeliveryState;
/**
* Create an instance of a client outgoing delivery tracker.
*
* @param sender
* The sender that was used to send the delivery
* @param delivery
* The proton outgoing delivery object that backs this tracker.
*/
ClientTrackable(SenderType sender, OutgoingDelivery delivery) {
this.sender = sender;
this.delivery = delivery;
this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
}
protected abstract TrackerType self();
OutgoingDelivery delivery() {
return delivery;
}
public synchronized DeliveryState state() {
return ClientDeliveryState.fromProtonType(delivery.getState());
}
public DeliveryState remoteState() {
return remoteDeliveryState;
}
public boolean remoteSettled() {
return remotelySettled > 0;
}
public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException {
try {
sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
} finally {
if (settle) {
synchronized (this) {
if (remoteSettlementFuture == null) {
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
}
remoteSettlementFuture.complete(self());
}
}
}
return self();
}
public TrackerType settle() throws ClientException {
try {
sender.disposition(delivery, null, true);
} finally {
synchronized (this) {
if (remoteSettlementFuture == null) {
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
}
remoteSettlementFuture.complete(self());
}
}
return self();
}
public synchronized boolean settled() {
return delivery.isSettled();
}
public ClientFuture<TrackerType> settlementFuture() {
synchronized (this) {
if (remoteSettlementFuture == null) {
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
}
}
if (delivery.isSettled() || delivery.isRemotelySettled()) {
remoteSettlementFuture.complete(self());
}
return remoteSettlementFuture;
}
public TrackerType awaitSettlement() throws ClientException {
try {
if (settled()) {
return self();
} else {
return settlementFuture().get();
}
} catch (ExecutionException exe) {
throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
} catch (InterruptedException e) {
Thread.interrupted();
throw new ClientException("Wait for settlement was interrupted", e);
}
}
public TrackerType awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
try {
if (settled()) {
return self();
} else {
return settlementFuture().get(timeout, unit);
}
} catch (InterruptedException ie) {
Thread.interrupted();
throw new ClientException("Wait for settlement was interrupted", ie);
} catch (ExecutionException exe) {
throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
} catch (TimeoutException te) {
throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
}
}
public TrackerType awaitAccepted() throws ClientException {
try {
if (settled() && !remoteSettled()) {
return self();
} else {
settlementFuture().get();
if (remoteState() != null && remoteState().isAccepted()) {
return self();
} else {
throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
}
}
} catch (ExecutionException exe) {
throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
} catch (InterruptedException ie) {
Thread.interrupted();
throw new ClientException("Wait for Accepted outcome was interrupted", ie);
}
}
public TrackerType awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
try {
if (settled() && !remoteSettled()) {
return self();
} else {
settlementFuture().get(timeout, unit);
if (remoteState() != null && remoteState().isAccepted()) {
return self();
} else {
throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
}
}
} catch (InterruptedException ie) {
Thread.interrupted();
throw new ClientException("Wait for Accepted outcome was interrupted", ie);
} catch (ExecutionException exe) {
throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
} catch (TimeoutException te) {
throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
}
}
//----- Internal Event hooks for delivery updates
private void processDeliveryUpdated(OutgoingDelivery delivery) {
if (delivery.isRemotelySettled()) {
REMOTELY_SETTLED_UPDATER.lazySet(this, 1);
REMOTEL_DELIVERY_STATE_UPDATER.lazySet(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
if (sender.options.autoSettle()) {
delivery.settle();
}
synchronized (this) {
if (remoteSettlementFuture != null) {
remoteSettlementFuture.complete(self());
}
}
} else {
REMOTEL_DELIVERY_STATE_UPDATER.set(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
}
}
}