blob: 89d0d40d0193272fff072f7fac7e0fb0a9bec92a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.buffer.ProtonCompositeBuffer;
import org.apache.qpid.protonj2.engine.EventHandler;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
/**
* Proton Incoming Delivery implementation
*/
public class ProtonIncomingDelivery implements IncomingDelivery {
private final DeliveryTag deliveryTag;
private final ProtonReceiver link;
private final long deliveryId;
private boolean complete;
private int messageFormat;
private boolean aborted;
private int transferCount;
private int claimedBytes;
private DeliveryState defaultDeliveryState;
private DeliveryState localState;
private boolean locallySettled;
private DeliveryState remoteState;
private boolean remotelySettled;
private ProtonBuffer payload;
private ProtonCompositeBuffer aggregate;
private ProtonAttachments attachments;
private Object linkedResource;
private EventHandler<IncomingDelivery> deliveryReadEventHandler = null;
private EventHandler<IncomingDelivery> deliveryAbortedEventHandler = null;
private EventHandler<IncomingDelivery> deliveryUpdatedEventHandler = null;
/**
* @param link
* The link that this delivery is associated with
* @param deliveryId
* The Delivery Id that is assigned to this delivery.
* @param deliveryTag
* The delivery tag assigned to this delivery
*/
public ProtonIncomingDelivery(ProtonReceiver link, long deliveryId, DeliveryTag deliveryTag) {
this.deliveryId = deliveryId;
this.deliveryTag = deliveryTag;
this.link = link;
}
@Override
public ProtonReceiver getLink() {
return link;
}
@Override
public ProtonAttachments getAttachments() {
return attachments == null ? attachments = new ProtonAttachments() : attachments;
}
@Override
public ProtonIncomingDelivery setLinkedResource(Object resource) {
this.linkedResource = resource;
return this;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getLinkedResource() {
return (T) linkedResource;
}
@Override
public <T> T getLinkedResource(Class<T> typeClass) {
return typeClass.cast(linkedResource);
}
@Override
public DeliveryTag getTag() {
return deliveryTag;
}
@Override
public DeliveryState getState() {
return localState;
}
@Override
public DeliveryState getRemoteState() {
return remoteState;
}
@Override
public int getMessageFormat() {
return messageFormat;
}
ProtonIncomingDelivery setMessageFormat(int messageFormat) {
this.messageFormat = messageFormat;
return this;
}
@Override
public boolean isPartial() {
return !complete || aborted;
}
@Override
public boolean isAborted() {
return aborted;
}
@Override
public boolean isSettled() {
return locallySettled;
}
@Override
public boolean isRemotelySettled() {
return remotelySettled;
}
@Override
public ProtonIncomingDelivery setDefaultDeliveryState(DeliveryState state) {
this.defaultDeliveryState = state;
return this;
}
@Override
public DeliveryState getDefaultDeliveryState() {
return defaultDeliveryState;
}
@Override
public IncomingDelivery disposition(DeliveryState state) {
return disposition(state, false);
}
@Override
public IncomingDelivery disposition(DeliveryState state, boolean settle) {
if (locallySettled) {
if ((localState != null && !localState.equals(state)) || localState != state) {
throw new IllegalStateException("Cannot update disposition on an already settled Delivery");
} else {
return this;
}
}
this.locallySettled = settle;
this.localState = state;
this.link.disposition(this);
return this;
}
@Override
public IncomingDelivery settle() {
return disposition(localState, true);
}
//----- Payload access
@Override
public int available() {
return payload == null ? 0 : payload.getReadableBytes();
}
@Override
public ProtonBuffer readAll() {
ProtonBuffer result = null;
if (payload != null) {
int bytesRead = claimedBytes -= payload.getReadableBytes();
result = payload;
payload = null;
aggregate = null;
if (bytesRead < 0) {
claimedBytes = 0;
link.deliveryRead(this, -bytesRead);
}
}
return result;
}
@Override
public ProtonIncomingDelivery readBytes(ProtonBuffer buffer) {
if (payload != null) {
int bytesRead = payload.getReadableBytes();
payload.readBytes(buffer);
bytesRead -= payload.getReadableBytes();
if (!payload.isReadable()) {
payload = null;
aggregate = null;
}
bytesRead = claimedBytes -= bytesRead;
if (bytesRead < 0) {
claimedBytes = 0;
link.deliveryRead(this, -bytesRead);
}
}
return this;
}
@Override
public ProtonIncomingDelivery readBytes(byte[] array, int offset, int length) {
if (payload != null) {
int bytesRead = payload.getReadableBytes();
payload.readBytes(array, offset, length);
bytesRead -= payload.getReadableBytes();
if (!payload.isReadable()) {
payload = null;
aggregate = null;
}
bytesRead = claimedBytes -= bytesRead;
if (bytesRead < 0) {
claimedBytes = 0;
link.deliveryRead(this, -bytesRead);
}
}
return this;
}
@Override
public IncomingDelivery claimAvailableBytes() {
long available = available();
if (available > 0) {
long unclaimed = available - claimedBytes;
if (unclaimed > 0) {
claimedBytes += unclaimed;
link.deliveryRead(this, (int) unclaimed);
}
}
return this;
}
//----- Incoming Delivery event handlers
@Override
public ProtonIncomingDelivery deliveryReadHandler(EventHandler<IncomingDelivery> handler) {
this.deliveryReadEventHandler = handler;
return this;
}
EventHandler<IncomingDelivery> deliveryReadHandler() {
return deliveryReadEventHandler;
}
@Override
public ProtonIncomingDelivery deliveryAbortedHandler(EventHandler<IncomingDelivery> handler) {
this.deliveryAbortedEventHandler = handler;
return this;
}
EventHandler<IncomingDelivery> deliveryAbortedHandler() {
return deliveryAbortedEventHandler;
}
@Override
public ProtonIncomingDelivery deliveryStateUpdatedHandler(EventHandler<IncomingDelivery> handler) {
this.deliveryUpdatedEventHandler = handler;
return this;
}
EventHandler<IncomingDelivery> deliveryStateUpdatedHandler() {
return deliveryUpdatedEventHandler;
}
//----- Internal methods to manage the Delivery
@Override
public int getTransferCount() {
return transferCount;
}
boolean isFirstTransfer() {
return transferCount <= 1;
}
long getDeliveryId() {
return deliveryId;
}
ProtonIncomingDelivery aborted() {
aborted = true;
if (payload != null) {
final int bytesRead = payload.getReadableBytes();
payload = null;
aggregate = null;
// Ensure Session no longer records these in the window metrics
link.deliveryRead(this, bytesRead);
}
return this;
}
ProtonIncomingDelivery completed() {
this.complete = true;
return this;
}
ProtonIncomingDelivery remotelySettled() {
this.remotelySettled = true;
return this;
}
ProtonIncomingDelivery remoteState(DeliveryState remoteState) {
this.remoteState = remoteState;
return this;
}
ProtonIncomingDelivery locallySettled() {
this.locallySettled = true;
return this;
}
ProtonIncomingDelivery localState(DeliveryState localState) {
this.localState = localState;
return this;
}
ProtonIncomingDelivery appendTransferPayload(ProtonBuffer buffer) {
transferCount++;
if (payload == null) {
payload = buffer;
} else if (aggregate != null) {
aggregate.append(buffer);
} else {
final ProtonBuffer previous = payload;
payload = aggregate = new ProtonCompositeBuffer();
aggregate.append(previous);
aggregate.append(buffer);
}
return this;
}
}