blob: 9bd89cbb307f97d2fcff2216b75e7559dc57407d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Delivery
{
private final UnsignedInteger _deliveryId;
private final Binary _deliveryTag;
private final List<Transfer> _transfers = new CopyOnWriteArrayList<>();
private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> _linkEndpoint;
private final UnsignedInteger _messageFormat;
private volatile boolean _complete;
private volatile boolean _settled;
private volatile boolean _aborted;
private volatile DeliveryState _state;
private volatile ReceiverSettleMode _receiverSettleMode;
private volatile boolean _resume;
private volatile long _totalPayloadSize;
public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
{
_totalPayloadSize = 0L;
_deliveryId = transfer.getDeliveryId();
_deliveryTag = transfer.getDeliveryTag();
_linkEndpoint = endpoint;
_messageFormat = transfer.getMessageFormat() != null ? transfer.getMessageFormat() : UnsignedInteger.ZERO;
addTransfer(transfer);
}
public UnsignedInteger getDeliveryId()
{
return _deliveryId;
}
public Binary getDeliveryTag()
{
return _deliveryTag;
}
public boolean isComplete()
{
return _complete;
}
public boolean isSettled()
{
return _settled;
}
public boolean isAborted()
{
return _aborted;
}
public DeliveryState getState()
{
return _state;
}
public ReceiverSettleMode getReceiverSettleMode()
{
return _receiverSettleMode;
}
public UnsignedInteger getMessageFormat()
{
return _messageFormat;
}
public boolean getResume()
{
return _resume;
}
public long getTotalPayloadSize()
{
return _totalPayloadSize;
}
final void addTransfer(Transfer transfer)
{
if (_aborted)
{
throw new IllegalStateException(String.format("Delivery '%s/%d' is already aborted",
_deliveryTag,
_deliveryId.intValue()));
}
if (_complete)
{
throw new IllegalStateException(String.format("Delivery '%s/%d' is already completed",
_deliveryTag,
_deliveryId.intValue()));
}
_transfers.add(transfer);
if (Boolean.TRUE.equals(transfer.getAborted()))
{
_aborted = true;
discard();
}
if(!Boolean.TRUE.equals(transfer.getMore()))
{
_complete = true;
}
if(Boolean.TRUE.equals(transfer.getSettled()))
{
_settled = true;
}
if(Boolean.TRUE.equals(transfer.getResume()))
{
_resume = true;
}
if (transfer.getState() != null)
{
DeliveryState currentState;
if (_state instanceof TransactionalState)
{
currentState = ((TransactionalState) _state).getOutcome();
}
else
{
currentState = _state;
}
if (!(currentState instanceof Outcome))
{
_state = transfer.getState();
}
}
if (transfer.getRcvSettleMode() != null)
{
if (_receiverSettleMode == null)
{
_receiverSettleMode = transfer.getRcvSettleMode();
}
}
try (QpidByteBuffer payload = transfer.getPayload())
{
if (payload != null)
{
_totalPayloadSize += (long) payload.remaining();
}
}
}
public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> getLinkEndpoint()
{
return _linkEndpoint;
}
public QpidByteBuffer getPayload()
{
List<QpidByteBuffer> transferBuffers = new ArrayList<>(_transfers.size());
for (Transfer t : _transfers)
{
QpidByteBuffer payload = t.getPayload();
if (payload != null)
{
transferBuffers.add(payload);
}
t.dispose();
}
_transfers.clear();
final QpidByteBuffer combined = QpidByteBuffer.concatenate(transferBuffers);
transferBuffers.forEach(QpidByteBuffer::dispose);
return combined;
}
public void discard()
{
for (Transfer transfer: _transfers)
{
transfer.dispose();
}
_transfers.clear();
}
}