blob: 86fda5bb2cad5884cda11d05fea4e8c881e6c621 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extends AbstractLinkEndpoint<Source, T>
{
private final SectionDecoder _sectionDecoder;
final Map<Binary, DeliveryState> _unsettled = Collections.synchronizedMap(new LinkedHashMap<>());
private volatile boolean _creditWindow;
private volatile Delivery _currentDelivery;
public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, T> link)
{
super(session, link);
_sectionDecoder = new SectionDecoderImpl(session.getConnection()
.getDescribedTypeRegistry()
.getSectionDecoderRegistry());
}
@Override
protected Map<Symbol, Object> initProperties(final Attach attach)
{
return Collections.emptyMap();
}
@Override public Role getRole()
{
return Role.RECEIVER;
}
void receiveTransfer(final Transfer transfer)
{
if (!isErrored())
{
Error error = validateTransfer(transfer);
if (error != null)
{
transfer.dispose();
if (_currentDelivery != null)
{
_currentDelivery.discard();
_currentDelivery = null;
}
close(error);
return;
}
if (_currentDelivery == null)
{
error = validateNewTransfer(transfer);
if (error != null)
{
transfer.dispose();
close(error);
return;
}
_currentDelivery = new Delivery(transfer, this);
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
getDeliveryCount().incr();
getSession().getIncomingDeliveryRegistry()
.addDelivery(transfer.getDeliveryId(),
new UnsettledDelivery(transfer.getDeliveryTag(), this));
}
else
{
error = validateSubsequentTransfer(transfer);
if (error != null)
{
transfer.dispose();
_currentDelivery.discard();
_currentDelivery = null;
close(error);
return;
}
_currentDelivery.addTransfer(transfer);
}
if (_currentDelivery.getTotalPayloadSize() > getSession().getConnection().getMaxMessageSize())
{
error = new Error(LinkError.MESSAGE_SIZE_EXCEEDED,
String.format("delivery '%s' exceeds max-message-size %d",
_currentDelivery.getDeliveryTag(),
getSession().getConnection().getMaxMessageSize()));
_currentDelivery.discard();
_currentDelivery = null;
close(error);
return;
}
if (!_currentDelivery.getResume())
{
_unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
}
if (_currentDelivery.isAborted() || (_currentDelivery.getResume() && !_unsettled.containsKey(_currentDelivery.getDeliveryTag())))
{
_unsettled.remove(_currentDelivery.getDeliveryTag());
getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
_currentDelivery = null;
setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
getDeliveryCount().decr();
}
else if (_currentDelivery.isComplete())
{
try
{
if (_currentDelivery.isSettled())
{
_unsettled.remove(_currentDelivery.getDeliveryTag());
getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
}
error = receiveDelivery(_currentDelivery);
if (error != null)
{
close(error);
}
}
finally
{
_currentDelivery = null;
}
}
else
{
getSession().sendFlowConditional();
}
}
else
{
End end = new End();
end.setError(new Error(SessionError.ERRANT_LINK,
String.format("Received TRANSFER for link handle %s which is in errored state.",
transfer.getHandle())));
getSession().end(end);
}
}
private Error validateTransfer(final Transfer transfer)
{
Error error = null;
if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
&& ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
{
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
}
else if (transfer.getState() instanceof TransactionalState)
{
final Binary txnId = ((TransactionalState) transfer.getState()).getTxnId();
try
{
getSession().getTransaction(txnId);
}
catch (UnknownTransactionException e)
{
error = new Error(TransactionError.UNKNOWN_ID,
String.format("Transfer has an unknown transaction-id '%s'.", txnId));
}
}
return error;
}
private Error validateNewTransfer(final Transfer transfer)
{
Error error = null;
if (transfer.getDeliveryId() == null)
{
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"delivery-id\" is required for a new delivery.");
}
else if (transfer.getDeliveryTag() == null)
{
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"delivery-tag\" is required for a new delivery.");
}
else if (!Boolean.TRUE.equals(transfer.getResume()))
{
if (_unsettled.containsKey(transfer.getDeliveryTag()))
{
error = new Error(AmqpError.ILLEGAL_STATE,
String.format("Delivery-tag '%s' is used by another unsettled delivery."
+ " The delivery-tag MUST be unique amongst all deliveries that"
+ " could be considered unsettled by either end of the link.",
transfer.getDeliveryTag()));
}
else if (_localIncompleteUnsettled || _remoteIncompleteUnsettled)
{
error = new Error(AmqpError.ILLEGAL_STATE,
"Cannot accept new deliveries while incomplete-unsettled is true.");
}
}
return error;
}
private Error validateSubsequentTransfer(final Transfer transfer)
{
Error error = null;
if (transfer.getDeliveryId() != null && !_currentDelivery.getDeliveryId()
.equals(transfer.getDeliveryId()))
{
error = new Error(AmqpError.INVALID_FIELD,
String.format(
"Unexpected transfer \"delivery-id\" for multi-transfer delivery: found '%s', expected '%s'.",
transfer.getDeliveryId(),
_currentDelivery.getDeliveryId()));
}
else if (transfer.getDeliveryTag() != null && !_currentDelivery.getDeliveryTag()
.equals(transfer.getDeliveryTag()))
{
error = new Error(AmqpError.INVALID_FIELD,
String.format(
"Unexpected transfer \"delivery-tag\" for multi-transfer delivery: found '%s', expected '%s'.",
transfer.getDeliveryTag(),
_currentDelivery.getDeliveryTag()));
}
else if (_currentDelivery.getReceiverSettleMode() != null && transfer.getRcvSettleMode() != null
&& !_currentDelivery.getReceiverSettleMode().equals(transfer.getRcvSettleMode()))
{
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
}
else if (transfer.getMessageFormat() != null && !_currentDelivery.getMessageFormat()
.equals(transfer.getMessageFormat()))
{
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"message-format\" is set to different value than on previous transfer.");
}
return error;
}
protected abstract Error receiveDelivery(final Delivery delivery);
@Override
public void receiveFlow(final Flow flow)
{
setAvailable(flow.getAvailable());
setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
if (Boolean.TRUE.equals(flow.getEcho()))
{
sendFlow();
}
}
private boolean settled(final Binary deliveryTag)
{
return _unsettled.remove(deliveryTag) != null;
}
void updateDisposition(final Binary deliveryTag,
final DeliveryState state,
final boolean settled)
{
updateDispositions(Collections.singleton(deliveryTag), state, settled);
}
void updateDispositions(final Set<Binary> deliveryTags,
final DeliveryState state,
final boolean settled)
{
final Set<Binary> unsettledKeys = new HashSet<>(_unsettled.keySet());
unsettledKeys.retainAll(deliveryTags);
final int settledDeliveryCount = deliveryTags.size() - unsettledKeys.size();
if (!unsettledKeys.isEmpty())
{
boolean outcomeUpdate = false;
Outcome outcome = null;
if (state instanceof Outcome)
{
outcome = (Outcome) state;
}
else if (state instanceof TransactionalState)
{
outcome = ((TransactionalState) state).getOutcome();
}
if (outcome != null)
{
for (final Binary deliveryTag : unsettledKeys)
{
if (!(_unsettled.get(deliveryTag) instanceof Outcome))
{
Object oldOutcome = _unsettled.put(deliveryTag, outcome);
outcomeUpdate = outcomeUpdate || !outcome.equals(oldOutcome);
}
}
}
if (outcomeUpdate || settled)
{
getSession().updateDisposition(this, deliveryTags, state, settled);
}
if (settled)
{
int credit = 0;
for (final Binary deliveryTag : unsettledKeys)
{
if (settled(deliveryTag))
{
if (!isDetached() && _creditWindow)
{
credit++;
}
}
}
if (credit > 0)
{
setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(credit)));
sendFlowConditional();
}
else
{
getSession().sendFlowConditional();
}
}
}
if (settledDeliveryCount > 0 && _creditWindow)
{
setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
sendFlowConditional();
}
}
void setCreditWindow()
{
setCreditWindow(true);
}
private void setCreditWindow(boolean window)
{
_creditWindow = window;
sendFlowConditional();
}
SectionDecoder getSectionDecoder()
{
return _sectionDecoder;
}
@Override
public void settle(Binary deliveryTag)
{
super.settle(deliveryTag);
_unsettled.remove(deliveryTag);
if (_creditWindow)
{
setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
sendFlowConditional();
}
}
@Override
public void flowStateChanged()
{
}
@Override
protected void detach(final Error error, final boolean close)
{
try
{
super.detach(error, close);
}
finally
{
if (_currentDelivery != null)
{
_currentDelivery.discard();
_currentDelivery = null;
}
}
}
@Override
protected void handleDeliveryState(Binary deliveryTag, DeliveryState state, Boolean settled)
{
if(Boolean.TRUE.equals(settled))
{
_unsettled.remove(deliveryTag);
}
}
}