| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.proton.engine.impl; |
| |
| import java.util.EnumSet; |
| import java.util.Map; |
| |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedLong; |
| import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; |
| import org.apache.qpid.proton.amqp.transport.SenderSettleMode; |
| import org.apache.qpid.proton.amqp.transport.Source; |
| import org.apache.qpid.proton.amqp.transport.Target; |
| import org.apache.qpid.proton.engine.EndpointState; |
| import org.apache.qpid.proton.engine.Event; |
| import org.apache.qpid.proton.engine.Link; |
| |
| public abstract class LinkImpl extends EndpointImpl implements Link |
| { |
| |
| private final SessionImpl _session; |
| |
| DeliveryImpl _head; |
| DeliveryImpl _tail; |
| DeliveryImpl _current; |
| private String _name; |
| private Source _source; |
| private Source _remoteSource; |
| private Target _target; |
| private Target _remoteTarget; |
| private int _queued; |
| private int _credit; |
| private int _unsettled; |
| private int _drained; |
| private UnsignedLong _maxMessageSize; |
| private UnsignedLong _remoteMaxMessageSize; |
| |
| private SenderSettleMode _senderSettleMode; |
| private SenderSettleMode _remoteSenderSettleMode; |
| private ReceiverSettleMode _receiverSettleMode; |
| private ReceiverSettleMode _remoteReceiverSettleMode; |
| |
| |
| private LinkNode<LinkImpl> _node; |
| private boolean _drain; |
| private boolean _detached; |
| private Map<Symbol, Object> _properties; |
| private Map<Symbol, Object> _remoteProperties; |
| private Symbol[] _offeredCapabilities; |
| private Symbol[] _remoteOfferedCapabilities; |
| private Symbol[] _desiredCapabilities; |
| private Symbol[] _remoteDesiredCapabilities; |
| |
| LinkImpl(SessionImpl session, String name) |
| { |
| _session = session; |
| _session.incref(); |
| _name = name; |
| ConnectionImpl conn = session.getConnectionImpl(); |
| _node = conn.addLinkEndpoint(this); |
| conn.put(Event.Type.LINK_INIT, this); |
| } |
| |
| |
| @Override |
| public String getName() |
| { |
| return _name; |
| } |
| |
| @Override |
| public DeliveryImpl delivery(byte[] tag) |
| { |
| return delivery(tag, 0, tag.length); |
| } |
| |
| @Override |
| public DeliveryImpl delivery(byte[] tag, int offset, int length) |
| { |
| if (offset != 0 || length != tag.length) |
| { |
| throw new IllegalArgumentException("At present delivery tag must be the whole byte array"); |
| } |
| incrementQueued(); |
| try |
| { |
| DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail); |
| if (_tail == null) |
| { |
| _head = delivery; |
| } |
| _tail = delivery; |
| if (_current == null) |
| { |
| _current = delivery; |
| } |
| getConnectionImpl().workUpdate(delivery); |
| return delivery; |
| } |
| catch (RuntimeException e) |
| { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| @Override |
| void postFinal() { |
| _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this); |
| _session.decref(); |
| } |
| |
| @Override |
| void doFree() |
| { |
| DeliveryImpl dlv = _head; |
| while (dlv != null) { |
| DeliveryImpl next = dlv.next(); |
| dlv.free(); |
| dlv = next; |
| } |
| |
| _session.getConnectionImpl().removeLinkEndpoint(_node); |
| _node = null; |
| } |
| |
| void modifyEndpoints() { |
| modified(); |
| } |
| |
| /* |
| * Called when settling a message to ensure that the head/tail refs of the link are updated. |
| * The caller ensures the delivery updates its own refs appropriately. |
| */ |
| void remove(DeliveryImpl delivery) |
| { |
| if(_head == delivery) |
| { |
| _head = delivery.getLinkNext(); |
| } |
| if(_tail == delivery) |
| { |
| _tail = delivery.getLinkPrevious(); |
| } |
| } |
| |
| @Override |
| public DeliveryImpl current() |
| { |
| return _current; |
| } |
| |
| @Override |
| public boolean advance() |
| { |
| if(_current != null ) |
| { |
| DeliveryImpl oldCurrent = _current; |
| _current = _current.getLinkNext(); |
| getConnectionImpl().workUpdate(oldCurrent); |
| |
| if(_current != null) |
| { |
| getConnectionImpl().workUpdate(_current); |
| } |
| return true; |
| } |
| else |
| { |
| return false; |
| } |
| |
| } |
| |
| @Override |
| protected ConnectionImpl getConnectionImpl() |
| { |
| return _session.getConnectionImpl(); |
| } |
| |
| @Override |
| public SessionImpl getSession() |
| { |
| return _session; |
| } |
| |
| @Override |
| public Source getRemoteSource() |
| { |
| return _remoteSource; |
| } |
| |
| void setRemoteSource(Source source) |
| { |
| _remoteSource = source; |
| } |
| |
| @Override |
| public Target getRemoteTarget() |
| { |
| return _remoteTarget; |
| } |
| |
| void setRemoteTarget(Target target) |
| { |
| _remoteTarget = target; |
| } |
| |
| @Override |
| public Source getSource() |
| { |
| return _source; |
| } |
| |
| @Override |
| public void setSource(Source source) |
| { |
| // TODO - should be an error if local state is ACTIVE |
| _source = source; |
| } |
| |
| @Override |
| public Target getTarget() |
| { |
| return _target; |
| } |
| |
| @Override |
| public void setTarget(Target target) |
| { |
| // TODO - should be an error if local state is ACTIVE |
| _target = target; |
| } |
| |
| @Override |
| public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) |
| { |
| LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote); |
| |
| LinkNode<LinkImpl> linkNode = _node.next(query); |
| |
| return linkNode == null ? null : linkNode.getValue(); |
| |
| } |
| |
| abstract TransportLink getTransportLink(); |
| |
| @Override |
| public int getCredit() |
| { |
| return _credit; |
| } |
| |
| public void addCredit(int credit) |
| { |
| _credit+=credit; |
| } |
| |
| public void setCredit(int credit) |
| { |
| _credit = credit; |
| } |
| |
| boolean hasCredit() |
| { |
| return _credit > 0; |
| } |
| |
| void incrementCredit() |
| { |
| _credit++; |
| } |
| |
| void decrementCredit() |
| { |
| _credit--; |
| } |
| |
| @Override |
| public int getQueued() |
| { |
| return _queued; |
| } |
| |
| void incrementQueued() |
| { |
| _queued++; |
| } |
| |
| void decrementQueued() |
| { |
| _queued--; |
| } |
| |
| @Override |
| public int getUnsettled() |
| { |
| return _unsettled; |
| } |
| |
| void incrementUnsettled() |
| { |
| _unsettled++; |
| } |
| |
| void decrementUnsettled() |
| { |
| _unsettled--; |
| } |
| |
| void setDrain(boolean drain) |
| { |
| _drain = drain; |
| } |
| |
| @Override |
| public boolean getDrain() |
| { |
| return _drain; |
| } |
| |
| @Override |
| public SenderSettleMode getSenderSettleMode() |
| { |
| return _senderSettleMode; |
| } |
| |
| @Override |
| public void setSenderSettleMode(SenderSettleMode senderSettleMode) |
| { |
| _senderSettleMode = senderSettleMode; |
| } |
| |
| @Override |
| public SenderSettleMode getRemoteSenderSettleMode() |
| { |
| return _remoteSenderSettleMode; |
| } |
| |
| @Override |
| public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) |
| { |
| _remoteSenderSettleMode = remoteSenderSettleMode; |
| } |
| |
| @Override |
| public ReceiverSettleMode getReceiverSettleMode() |
| { |
| return _receiverSettleMode; |
| } |
| |
| @Override |
| public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) |
| { |
| _receiverSettleMode = receiverSettleMode; |
| } |
| |
| @Override |
| public ReceiverSettleMode getRemoteReceiverSettleMode() |
| { |
| return _remoteReceiverSettleMode; |
| } |
| |
| void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode) |
| { |
| _remoteReceiverSettleMode = remoteReceiverSettleMode; |
| } |
| |
| @Override |
| public Map<Symbol, Object> getProperties() |
| { |
| return _properties; |
| } |
| |
| @Override |
| public void setProperties(Map<Symbol, Object> properties) |
| { |
| _properties = properties; |
| } |
| |
| @Override |
| public Map<Symbol, Object> getRemoteProperties() |
| { |
| return _remoteProperties; |
| } |
| |
| void setRemoteProperties(Map<Symbol, Object> remoteProperties) |
| { |
| _remoteProperties = remoteProperties; |
| } |
| |
| @Override |
| public Symbol[] getDesiredCapabilities() |
| { |
| return _desiredCapabilities; |
| } |
| |
| @Override |
| public void setDesiredCapabilities(Symbol[] desiredCapabilities) |
| { |
| _desiredCapabilities = desiredCapabilities; |
| } |
| |
| @Override |
| public Symbol[] getRemoteDesiredCapabilities() |
| { |
| return _remoteDesiredCapabilities; |
| } |
| |
| void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) |
| { |
| _remoteDesiredCapabilities = remoteDesiredCapabilities; |
| } |
| |
| @Override |
| public Symbol[] getOfferedCapabilities() |
| { |
| return _offeredCapabilities; |
| } |
| |
| @Override |
| public void setOfferedCapabilities(Symbol[] offeredCapabilities) |
| { |
| _offeredCapabilities = offeredCapabilities; |
| } |
| |
| @Override |
| public Symbol[] getRemoteOfferedCapabilities() |
| { |
| return _remoteOfferedCapabilities; |
| } |
| |
| void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) |
| { |
| _remoteOfferedCapabilities = remoteOfferedCapabilities; |
| } |
| |
| @Override |
| public UnsignedLong getMaxMessageSize() |
| { |
| return _maxMessageSize; |
| } |
| |
| @Override |
| public void setMaxMessageSize(UnsignedLong maxMessageSize) |
| { |
| _maxMessageSize = maxMessageSize; |
| } |
| |
| @Override |
| public UnsignedLong getRemoteMaxMessageSize() |
| { |
| return _remoteMaxMessageSize; |
| } |
| |
| void setRemoteMaxMessageSize(UnsignedLong remoteMaxMessageSize) |
| { |
| _remoteMaxMessageSize = remoteMaxMessageSize; |
| } |
| |
| @Override |
| public int drained() |
| { |
| int drained = 0; |
| |
| if (this instanceof SenderImpl) { |
| if(getDrain() && hasCredit()) |
| { |
| _drained = getCredit(); |
| setCredit(0); |
| modified(); |
| drained = _drained; |
| } |
| } else { |
| drained = _drained; |
| _drained = 0; |
| } |
| |
| return drained; |
| } |
| |
| int getDrained() |
| { |
| return _drained; |
| } |
| |
| void setDrained(int value) |
| { |
| _drained = value; |
| } |
| |
| @Override |
| public DeliveryImpl head() |
| { |
| return _head; |
| } |
| |
| @Override |
| void localOpen() |
| { |
| getConnectionImpl().put(Event.Type.LINK_LOCAL_OPEN, this); |
| } |
| |
| @Override |
| void localClose() |
| { |
| getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this); |
| } |
| |
| @Override |
| public void detach() |
| { |
| _detached = true; |
| getConnectionImpl().put(Event.Type.LINK_LOCAL_DETACH, this); |
| modified(); |
| } |
| |
| public boolean detached() |
| { |
| return _detached; |
| } |
| } |