blob: 76c87368e72f2a4b8c2e23f85f1a476971cb3366 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.engine.impl;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
public class DeliveryImpl implements Delivery
{
private DeliveryImpl _linkPrevious;
private DeliveryImpl _linkNext;
private DeliveryImpl _workNext;
private DeliveryImpl _workPrev;
private DeliveryImpl _transportWorkNext;
private DeliveryImpl _transportWorkPrev;
private Object _context;
private final byte[] _tag;
private final LinkImpl _link;
private DeliveryState _deliveryState;
private boolean _settled;
private boolean _remoteSettled;
private DeliveryState _remoteDeliveryState;
private static final int DELIVERY_STATE_CHANGED = 1;
private static final int ABLE_TO_SEND = 2;
private static final int IO_WORK = 4;
private int _flags = (byte) 0;
private int _transportFlags = (byte) 0;
private TransportDelivery _transportDelivery;
private byte[] _data;
private int _dataSize;
private boolean _complete;
private boolean _updated;
private boolean _done;
private int _offset;
DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous)
{
_tag = tag;
_link = link;
_link.incrementUnsettled();
_linkPrevious = previous;
if(previous != null)
{
previous._linkNext = this;
}
}
public byte[] getTag()
{
return _tag;
}
public LinkImpl getLink()
{
return _link;
}
public DeliveryState getLocalState()
{
return _deliveryState;
}
public DeliveryState getRemoteState()
{
return _remoteDeliveryState;
}
public boolean remotelySettled()
{
return _remoteSettled;
}
public int getMessageFormat()
{
return 0;
}
public void disposition(final DeliveryState state)
{
_deliveryState = state;
if(!_remoteSettled)
{
setTransportFlag(DELIVERY_STATE_CHANGED);
}
}
public void settle()
{
_settled = true;
_link.decrementUnsettled();
if(!_remoteSettled)
{
setTransportFlag(DELIVERY_STATE_CHANGED);
}
else
{
_transportDelivery.settled();
}
if(_link.current() == this)
{
_link.advance();
}
_link.remove(this);
if(_linkPrevious != null)
{
_linkPrevious._linkNext = _linkNext;
}
if(_linkNext != null)
{
_linkNext._linkPrevious = _linkPrevious;
}
clearWork();
}
DeliveryImpl getLinkNext()
{
return _linkNext;
}
public void free()
{
}
DeliveryImpl getLinkPrevious()
{
return _linkPrevious;
}
public DeliveryImpl getWorkNext()
{
return _workNext;
}
DeliveryImpl getWorkPrev()
{
return _workPrev;
}
void setWorkNext(DeliveryImpl workNext)
{
_workNext = workNext;
}
void setWorkPrev(DeliveryImpl workPrev)
{
_workPrev = workPrev;
}
int recv(byte[] bytes, int offset, int size)
{
final int consumed;
if(_data != null)
{
//TODO - should only be if no bytes left
consumed = Math.min(size, _dataSize);
System.arraycopy(_data, _offset, bytes, offset, consumed);
_offset += consumed;
_dataSize -= consumed;
}
else
{
_dataSize = consumed = 0;
}
if(_dataSize == 0)
{
clearFlag(IO_WORK);
}
return (_complete && consumed == 0) ? TransportImpl.END_OF_STREAM : consumed; //TODO - Implement
}
private void clearFlag(int ioWork)
{
_flags = _flags & (~IO_WORK);
if(_flags == 0)
{
clearWork();
}
}
void clearWork()
{
getLink().getConnectionImpl().removeWork(this);
if(_workPrev != null)
{
_workPrev.setWorkNext(_workNext);
}
if(_workNext != null)
{
_workNext.setWorkPrev(_workPrev);
}
_workPrev = null;
}
void addToWorkList()
{
getLink().getConnectionImpl().addWork(this);
}
void addIOWork()
{
setFlag(IO_WORK);
}
private void setFlag(int flag)
{
boolean addWork;
if(flag == IO_WORK && (_flags & flag) == 0)
{
clearWork();
addWork = true;
}
else
{
addWork = (_flags == 0);
}
_flags = _flags | flag;
if(addWork)
{
addToWorkList();
}
}
private void clearTransportFlag(int ioWork)
{
_flags = _flags & (~IO_WORK);
if(_flags == 0)
{
clearTransportWork();
}
}
DeliveryImpl clearTransportWork()
{
DeliveryImpl next = _transportWorkNext;
getLink().getConnectionImpl().removeTransportWork(this);
if(_transportWorkPrev != null)
{
_transportWorkPrev.setTransportWorkNext(_transportWorkNext);
}
if(_transportWorkNext != null)
{
_transportWorkNext.setTransportWorkPrev(_transportWorkPrev);
}
_transportWorkNext = null;
_transportWorkPrev = null;
return next;
}
void addToTransportWorkList()
{
if(_transportWorkNext == null
&& _transportWorkPrev == null
&& getLink().getConnectionImpl().getTransportWorkHead() != this)
{
getLink().getConnectionImpl().addTransportWork(this);
}
}
private void setTransportFlag(int flag)
{
boolean addWork = (_transportFlags == 0);
_transportFlags = _transportFlags | flag;
if(addWork)
{
addToTransportWorkList();
}
}
DeliveryImpl getTransportWorkNext()
{
return _transportWorkNext;
}
DeliveryImpl getTransportWorkPrev()
{
return _transportWorkPrev;
}
void setTransportWorkNext(DeliveryImpl transportWorkNext)
{
_transportWorkNext = transportWorkNext;
}
void setTransportWorkPrev(DeliveryImpl transportWorkPrev)
{
_transportWorkPrev = transportWorkPrev;
}
boolean isLocalStateChange()
{
return (_transportFlags & DELIVERY_STATE_CHANGED) != 0;
}
TransportDelivery getTransportDelivery()
{
return _transportDelivery;
}
void setTransportDelivery(TransportDelivery transportDelivery)
{
_transportDelivery = transportDelivery;
}
public boolean isSettled()
{
return _settled;
}
int send(byte[] bytes, int offset, int length)
{
if(_data == null)
{
_data = new byte[length];
}
else if(_data.length - _dataSize < length)
{
byte[] oldData = _data;
_data = new byte[oldData.length + _dataSize];
System.arraycopy(oldData,_offset,_data,0,_dataSize);
_offset = 0;
}
System.arraycopy(bytes,offset,_data,_dataSize+_offset,length);
_dataSize+=length;
addToTransportWorkList();
return length; //TODO - Implement.
}
byte[] getData()
{
return _data;
}
int getDataOffset()
{
return _offset;
}
int getDataLength()
{
return _dataSize; //TODO - Implement.
}
void setData(byte[] data)
{
_data = data;
}
void setDataLength(int length)
{
_dataSize = length;
}
public void setDataOffset(int arrayOffset)
{
_offset = arrayOffset;
}
public boolean isWritable()
{
return getLink() instanceof SenderImpl
&& getLink().current() == this
&& ((SenderImpl) getLink()).hasCredit();
}
public boolean isReadable()
{
return getLink() instanceof ReceiverImpl
&& getLink().current() == this
&& _dataSize > 0;
}
void setComplete()
{
_complete = true;
}
public boolean isPartial()
{
return !_complete;
}
void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
{
_remoteDeliveryState = remoteDeliveryState;
_updated = true;
}
public boolean isUpdated()
{
return _updated;
}
void setDone()
{
_done = true;
}
boolean isDone()
{
return _done;
}
void setRemoteSettled(boolean remoteSettled)
{
_remoteSettled = remoteSettled;
_updated = true;
}
public Object getContext()
{
return _context;
}
public void setContext(Object context)
{
_context = context;
}
}