blob: ea6f28abaf8a3570b1fde27fce393b87424add2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
import org.apache.qpid.protonj2.engine.util.SequenceNumber;
import org.apache.qpid.protonj2.engine.util.SplayMap;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.transport.Begin;
import org.apache.qpid.protonj2.types.transport.Disposition;
import org.apache.qpid.protonj2.types.transport.Flow;
import org.apache.qpid.protonj2.types.transport.Role;
import org.apache.qpid.protonj2.types.transport.Transfer;
/**
* Tracks the incoming window and provides management of that window in relation to receiver links.
* <p>
* The incoming window decreases as {@link Transfer} frames arrive and is replenished when the user reads the
* bytes received in the accumulated payload of a delivery. The window is expanded by sending a {@link Flow}
* frame to the remote with an updated incoming window value at configured intervals based on reads from the
* pending deliveries.
*/
public class ProtonSessionIncomingWindow {
private static final long DEFAULT_WINDOW_SIZE = Integer.MAX_VALUE; // biggest legal value
private final ProtonSession session;
private final ProtonEngine engine;
// User configured incoming capacity for the session used to compute the incoming window
private int incomingCapacity = 0;
// Computed incoming window based on the incoming capacity minus bytes not yet read from deliveries.
private long incomingWindow = 0;
// Tracks the next expected incoming transfer ID from the remote
private long nextIncomingId = 0;
// Tracks the most recent delivery Id for validation against the next incoming delivery
private SequenceNumber lastDeliveryid;
private long maxFrameSize;
private long incomingBytes;
private SplayMap<ProtonIncomingDelivery> unsettled = new SplayMap<>();
public ProtonSessionIncomingWindow(ProtonSession session) {
this.session = session;
this.engine = session.getConnection().getEngine();
this.maxFrameSize = session.getConnection().getMaxFrameSize();
}
public void setIncomingCapacity(int incomingCapacity) {
this.incomingCapacity = incomingCapacity;
}
public int getIncomingCapacity() {
return incomingCapacity;
}
public int getRemainingIncomingCapacity() {
// TODO: This is linked to below update of capacity which also needs more attention.
if (incomingCapacity <= 0 || maxFrameSize == UnsignedInteger.MAX_VALUE.longValue()) {
return (int) DEFAULT_WINDOW_SIZE;
} else {
return (int) (incomingCapacity - incomingBytes);
}
}
/**
* Initialize the session level window values on the outbound Begin
*
* @param begin
* The {@link Begin} performative that is about to be sent.
*
* @return the configured performative
*/
Begin configureOutbound(Begin begin) {
// Update as it might have changed if session created before connection open() called.
this.maxFrameSize = session.getConnection().getMaxFrameSize();
return begin.setIncomingWindow(updateIncomingWindow());
}
/**
* Update the session level window values based on remote information.
*
* @param begin
* The {@link Begin} performative received from the remote.
*
* @return the given performative for chaining
*/
Begin handleBegin(Begin begin) {
if (begin.hasNextOutgoingId()) {
this.nextIncomingId = begin.getNextOutgoingId();
}
return begin;
}
/**
* Update the session window state based on an incoming {@link Flow} performative
*
* @param flow
* the incoming {@link Flow} performative to process.
*/
Flow handleFlow(Flow flow) {
return flow;
}
/**
* Update the session window state based on an incoming {@link Transfer} performative
*
* @param transfer
* the incoming {@link Transfer} performative to process.
* @param payload
* the payload that was transmitted with the incoming {@link Transfer}
*/
Transfer handleTransfer(ProtonLink<?> link, Transfer transfer, ProtonBuffer payload) {
incomingBytes += payload != null ? payload.getReadableBytes() : 0;
incomingWindow--;
nextIncomingId++;
ProtonIncomingDelivery delivery = link.remoteTransfer(transfer, payload);
if (!delivery.isRemotelySettled() && delivery.isFirstTransfer()) {
unsettled.put((int) delivery.getDeliveryId(), delivery);
}
return transfer;
}
/**
* Update the state of any received Transfers that are indicated in the disposition
* with the state information conveyed therein.
*
* @param disposition
* The {@link Disposition} performative to process
*
* @return the {@link Disposition}
*/
Disposition handleDisposition(Disposition disposition) {
final int first = (int) disposition.getFirst();
if (disposition.hasLast() && disposition.getLast() != first) {
handleRangedDisposition(disposition);
} else {
final ProtonIncomingDelivery delivery = disposition.getSettled() ?
unsettled.remove(first) : unsettled.get(first);
if (delivery != null) {
delivery.getLink().remoteDisposition(disposition, delivery);
}
}
return disposition;
}
private void handleRangedDisposition(Disposition disposition) {
final int first = (int) disposition.getFirst();
final int last = (int) disposition.getLast();
final boolean settled = disposition.getSettled();
int index = first;
ProtonIncomingDelivery delivery;
// TODO: If SplayMap gets a subMap that works we could get the ranged view which would
// be more efficient.
do {
delivery = settled ? unsettled.remove(index) : unsettled.get(index);
if (delivery != null) {
delivery.getLink().remoteDisposition(disposition, delivery);
}
} while (index++ != last);
}
long updateIncomingWindow() {
// TODO - need to revisit this logic and decide on sane cutoff for capacity restriction.
if (incomingCapacity <= 0 || maxFrameSize == UnsignedInteger.MAX_VALUE.longValue()) {
incomingWindow = DEFAULT_WINDOW_SIZE;
} else {
// TODO - incomingWindow = Integer.divideUnsigned(incomingCapacity - incomingBytes, maxFrameSize);
incomingWindow = (incomingCapacity - incomingBytes) / maxFrameSize;
}
return incomingWindow;
}
void writeFlow(ProtonReceiver link) {
updateIncomingWindow();
session.writeFlow(link);
}
//----- Access to internal state useful for tests
public long getIncomingBytes() {
return incomingBytes;
}
public long getNextIncomingId() {
return nextIncomingId;
}
public long getIncomingWindow() {
return incomingWindow;
}
//----- Handle sender link actions in the session window context
private final Disposition cachedDisposition = new Disposition();
void processDisposition(ProtonReceiver receiver, ProtonIncomingDelivery delivery) {
if (!delivery.isRemotelySettled()) {
// Would only be tracked if not already remotely settled.
if (delivery.isSettled()) {
unsettled.remove((int) delivery.getDeliveryId());
}
cachedDisposition.reset();
cachedDisposition.setFirst(delivery.getDeliveryId());
cachedDisposition.setRole(Role.RECEIVER);
cachedDisposition.setSettled(delivery.isSettled());
cachedDisposition.setState(delivery.getState());
engine.fireWrite(cachedDisposition, session.getLocalChannel());
}
}
void deliveryRead(ProtonIncomingDelivery delivery, int bytesRead) {
this.incomingBytes -= bytesRead;
if (incomingWindow == 0) {
writeFlow(delivery.getLink());
}
}
void validateNextDeliveryId(long deliveryId) {
if (lastDeliveryid == null) {
lastDeliveryid = new SequenceNumber((int) deliveryId);
} else {
int previousId = lastDeliveryid.intValue();
if (lastDeliveryid.increment().compareTo((int) deliveryId) != 0) {
session.getConnection().getEngine().engineFailed(
new ProtocolViolationException("Expected delivery-id " + previousId + ", got " + deliveryId));
}
}
}
}