blob: 0c1bcaa735cb5abdcf448b155627e7db832af97d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import java.util.Set;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.OutgoingAMQPEnvelope;
import org.apache.qpid.protonj2.engine.util.SplayMap;
import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.transport.Begin;
import org.apache.qpid.protonj2.types.transport.Disposition;
import org.apache.qpid.protonj2.types.transport.Flow;
import org.apache.qpid.protonj2.types.transport.Performative;
import org.apache.qpid.protonj2.types.transport.Role;
import org.apache.qpid.protonj2.types.transport.Transfer;
/**
* Holds Session level credit window information for outgoing transfers from this
* Session. The window is constrained by the remote incoming capacity restrictions
* or if present outgoing restrictions on pending transfers.
*/
public class ProtonSessionOutgoingWindow {
private final ProtonSession session;
private final ProtonEngine engine;
private final int localChannel;
// This is used for the delivery-id actually stamped in each transfer frame of a given message delivery.
private int outgoingDeliveryId = 0;
// Conceptual outgoing Transfer ID value.
private int nextOutgoingId = 0;
// Track outgoing windowing state information in order to stop outgoing writes if the high
// water mark is hit and restart later once the low water mark is hit. When outgoing capacity
// is at the default -1 value then no real limit is applied. If set to zero no writes are allowed.
private int outgoingCapacity = -1;
private int outgoingWindowHighWaterMark = Integer.MAX_VALUE;
private int outgoingWindowLowWaterMark = Integer.MAX_VALUE / 2;
private int pendingOutgoingWrites;
private boolean writeable;
private long remoteIncomingWindow;
private int remoteNextIncomingId = nextOutgoingId;
private final SplayMap<ProtonOutgoingDelivery> unsettled = new SplayMap<>();
public ProtonSessionOutgoingWindow(ProtonSession session) {
this.session = session;
this.engine = session.getConnection().getEngine();
this.localChannel = session.getLocalChannel();
}
/**
* Initialize the session level window values on the outbound Begin
*
* @param begin
* The {@link Begin} performative that is about to be sent.
*
* @return the configured performative
*/
Begin configureOutbound(Begin begin) {
begin.setNextOutgoingId(getNextOutgoingId());
begin.setOutgoingWindow(getOutgoingWindow());
updateOutgoingWindowState();
return begin;
}
int getAndIncrementNextDeliveryId() {
return outgoingDeliveryId++;
}
void setOutgoingCapacity(int outgoingCapacity) {
this.outgoingCapacity = outgoingCapacity;
updateOutgoingWindowState();
}
int getOutgoingCapacity() {
return outgoingCapacity;
}
int getRemainingOutgoingCapacity() {
// If set to lower value after some writes are pending this calculation could go negative which we don't want
// so ensure it never drops below zero. Then limit the max value to max positive value and hold there
// as it being more than that is a fairly pointless value to try and convey.
final int allowedWrites = Math.max(0, outgoingWindowHighWaterMark - pendingOutgoingWrites);
final int remaining = (int) (allowedWrites * session.getEngine().configuration().getOutboundMaxFrameSize());
if (outgoingCapacity < 0 || remaining < 0) {
return Integer.MAX_VALUE;
} else {
return remaining;
}
}
boolean isSendable() {
return writeable;
}
private void updateOutgoingWindowState() {
final boolean oldWritable = writeable;
final int maxFrameSize = (int) session.getEngine().configuration().getOutboundMaxFrameSize();
if (outgoingCapacity == 0) {
// At a setting of zero outgoing writes is manually disabled until elevated again to > 0
outgoingWindowHighWaterMark = outgoingWindowLowWaterMark = 0;
writeable = false;
} else if (outgoingCapacity > 0) {
// The local end is writable here if the current pending writes count is below the low water
// mark and also if there is remote incoming window to allow more write.
outgoingWindowHighWaterMark = Math.max(1, outgoingCapacity / maxFrameSize);
outgoingWindowLowWaterMark = outgoingWindowHighWaterMark / 2;
writeable = pendingOutgoingWrites <= outgoingWindowLowWaterMark && remoteIncomingWindow > 0;
} else {
// User disabled outgoing windowing so reset state to reflect that we are not
// enforcing any limit from now on, at least not any sane limit.
outgoingWindowHighWaterMark = Integer.MAX_VALUE;
outgoingWindowLowWaterMark = Integer.MAX_VALUE / 2;
writeable = remoteIncomingWindow > 0;
}
if (!oldWritable && writeable) {
Set<ProtonSender> senders = session.senders();
for (ProtonSender sender : senders) {
sender.handleSessionCreditStateUpdate(this);
if (!writeable) {
break;
}
}
}
}
private void handleOutgoingFrameWriteComplete() {
pendingOutgoingWrites = Math.max(0, --pendingOutgoingWrites);
if (!writeable && (writeable = pendingOutgoingWrites <= outgoingWindowLowWaterMark && remoteIncomingWindow > 0)) {
Set<ProtonSender> senders = session.senders();
for (ProtonSender sender : senders) {
sender.handleSessionCreditStateUpdate(this);
if (!writeable) {
break;
}
}
}
}
//----- Handle incoming performatives relevant to the session.
/**
* Update the session level window values based on remote information.
*
* @param begin
* The {@link Begin} performative received from the remote.
*
* @return the given performative for chaining
*/
Begin handleBegin(Begin begin) {
remoteIncomingWindow = begin.getIncomingWindow();
return begin;
}
/**
* Update the session window state based on an incoming {@link Flow} performative
*
* @param flow
* the incoming {@link Flow} performative to process.
*/
Flow handleFlow(Flow flow) {
if (flow.hasNextIncomingId()) {
remoteNextIncomingId = (int) flow.getNextIncomingId();
remoteIncomingWindow = (remoteNextIncomingId + flow.getIncomingWindow()) - nextOutgoingId;
} else {
remoteIncomingWindow = flow.getIncomingWindow();
}
writeable = remoteIncomingWindow > 0 && pendingOutgoingWrites <= outgoingWindowLowWaterMark;
return flow;
}
/**
* Update the session window state based on an incoming {@link Transfer} performative
*
* @param transfer
* the incoming {@link Transfer} performative to process.
*/
Transfer handleTransfer(Transfer transfer, ProtonBuffer payload) {
return transfer;
}
/**
* Update the state of any sent Transfers that are indicated in the disposition
* with the state information conveyed therein.
*
* @param disposition
* The {@link Disposition} performative to process
*
* @return the {@link Disposition}
*/
Disposition handleDisposition(Disposition disposition) {
final int first = (int) disposition.getFirst();
if (disposition.hasLast() && disposition.getLast() != first) {
handleRangedDisposition(disposition);
} else {
final ProtonOutgoingDelivery delivery = disposition.getSettled() ?
unsettled.remove(first) : unsettled.get(first);
if (delivery != null) {
delivery.getLink().remoteDisposition(disposition, delivery);
}
}
return disposition;
}
private void handleRangedDisposition(Disposition disposition) {
final int first = (int) disposition.getFirst();
final int last = (int) disposition.getLast();
final boolean settled = disposition.getSettled();
int index = first;
ProtonOutgoingDelivery delivery;
// TODO: If SplayMap gets a subMap that works we could get the ranged view which would
// be more efficient.
do {
delivery = settled ? unsettled.remove(index) : unsettled.get(index);
if (delivery != null) {
delivery.getLink().remoteDisposition(disposition, delivery);
}
} while (index++ != last);
}
//----- Handle sender link actions in the session window context
private final Disposition cachedDisposition = new Disposition();
private final Transfer cachedTransfer = new Transfer();
private void handlePayloadToLargeRequiresSplitFrames(Performative performative) {
cachedTransfer.setMore(true);
}
boolean processSend(ProtonSender sender, ProtonOutgoingDelivery delivery, ProtonBuffer payload, boolean complete) {
// For a transfer that hasn't completed but has no bytes in the final transfer write we want
// to allow a transfer to go out with the more flag as false.
if (!delivery.isSettled()) {
unsettled.put((int) delivery.getDeliveryId(), delivery);
}
try {
cachedTransfer.setDeliveryId(delivery.getDeliveryId());
if (delivery.getMessageFormat() != 0) {
cachedTransfer.setMessageFormat(delivery.getMessageFormat());
} else {
cachedTransfer.clearMessageFormat();
}
cachedTransfer.setHandle(sender.getHandle());
cachedTransfer.setSettled(delivery.isSettled());
cachedTransfer.setState(delivery.getState());
do {
// Update session window tracking for each transfer that ends up being sent.
++nextOutgoingId;
++pendingOutgoingWrites;
--remoteIncomingWindow;
writeable = pendingOutgoingWrites < outgoingWindowHighWaterMark && remoteIncomingWindow > 0;
// Only the first transfer requires the delivery tag, afterwards we can omit it for efficiency.
if (delivery.getTransferCount() == 0) {
cachedTransfer.setDeliveryTag(delivery.getTag());
} else {
cachedTransfer.setDeliveryTag((DeliveryTag) null);
}
cachedTransfer.setMore(!complete);
OutgoingAMQPEnvelope frame = engine.wrap(cachedTransfer, localChannel, payload);
frame.setPayloadToLargeHandler(this::handlePayloadToLargeRequiresSplitFrames);
frame.setFrameWriteCompletionHandler(this::handleOutgoingFrameWriteComplete);
engine.fireWrite(frame);
delivery.afterTransferWritten();
} while (payload != null && payload.isReadable() && isSendable());
} finally {
cachedTransfer.reset();
}
return isSendable();
}
void processDisposition(ProtonSender sender, ProtonOutgoingDelivery delivery) {
// Would only be tracked if not already remotely settled.
if (delivery.isSettled() && !delivery.isRemotelySettled()) {
unsettled.remove((int) delivery.getDeliveryId());
}
if (!delivery.isRemotelySettled()) {
cachedDisposition.setFirst(delivery.getDeliveryId());
cachedDisposition.setRole(Role.SENDER);
cachedDisposition.setSettled(delivery.isSettled());
cachedDisposition.setState(delivery.getState());
try {
engine.fireWrite(cachedDisposition, session.getLocalChannel());
} finally {
cachedDisposition.reset();
}
}
}
void processAbort(ProtonSender sender, ProtonOutgoingDelivery delivery) {
cachedTransfer.setDeliveryId(delivery.getDeliveryId());
cachedTransfer.setDeliveryTag(delivery.getTag());
cachedTransfer.setSettled(true);
cachedTransfer.setAborted(true);
cachedTransfer.setHandle(sender.getHandle());
// Ensure we don't track the aborted delivery any longer.
unsettled.remove((int) delivery.getDeliveryId());
try {
engine.fireWrite(cachedTransfer, session.getLocalChannel());
} finally {
cachedTransfer.reset();
}
}
//----- Access to internal state useful for tests
int getNextOutgoingId() {
return nextOutgoingId;
}
long getOutgoingWindow() {
return Integer.MAX_VALUE;
}
int getRemoteNextIncomingId() {
return remoteNextIncomingId;
}
long getRemoteIncomingWindow() {
return remoteIncomingWindow;
}
}