blob: c9212b1a1e455874a5a6c56df87082a2fa8ec230 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.amqp_1_0.transport;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.amqp_1_0.type.transaction.*;
import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeoutException;
public class SessionEndpoint
{
private SessionState _state = SessionState.INACTIVE;
private final Map<String, LinkEndpoint> _linkMap = new HashMap<String, LinkEndpoint>();
private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<LinkEndpoint, UnsignedInteger>();
private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<UnsignedInteger, LinkEndpoint>();
private long _timeout;
private ConnectionEndpoint _connection;
private long _lastAttachedTime;
private short _receivingChannel;
private short _sendingChannel;
private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled;
private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled;
// has to be a power of two
private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
private SequenceNumber _nextIncomingTransferId;
private SequenceNumber _nextOutgoingTransferId;
private int _nextOutgoingDeliveryId;
//private SequenceNumber _incomingLWM;
//private SequenceNumber _outgoingLWM;
private UnsignedInteger _outgoingSessionCredit;
private UnsignedInteger _initialOutgoingId;
private SessionEventListener _sessionEventListener = SessionEventListener.DEFAULT;
private int _availableIncomingCredit;
private int _availableOutgoingCredit;
private UnsignedInteger _lastSentIncomingLimit;
public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
{
this(connectionEndpoint, UnsignedInteger.valueOf(0));
}
public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, Begin begin)
{
this(connectionEndpoint, UnsignedInteger.valueOf(0));
_state = SessionState.BEGIN_RECVD;
_nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
}
public SessionEndpoint(final ConnectionEndpoint connectionEndpoint, UnsignedInteger nextOutgoingId)
{
_connection = connectionEndpoint;
_initialOutgoingId = nextOutgoingId;
_nextOutgoingTransferId = new SequenceNumber(nextOutgoingId.intValue());
_outgoingUnsettled = new LinkedHashMap<UnsignedInteger,Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
_incomingUnsettled = new LinkedHashMap<UnsignedInteger, Delivery>(DEFAULT_SESSION_BUFFER_SIZE);
_availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
_availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
}
public void setReceivingChannel(final short receivingChannel)
{
_receivingChannel = receivingChannel;
switch(_state)
{
case INACTIVE:
_state = SessionState.BEGIN_RECVD;
break;
case BEGIN_SENT:
_state = SessionState.ACTIVE;
break;
case END_PIPE:
_state = SessionState.END_SENT;
break;
default:
// TODO error
}
}
public void setSendingChannel(final short sendingChannel)
{
_sendingChannel = sendingChannel;
switch(_state)
{
case INACTIVE:
_state = SessionState.BEGIN_SENT;
break;
case BEGIN_RECVD:
_state = SessionState.ACTIVE;
break;
default:
// TODO error
}
}
public SessionState getState()
{
return _state;
}
public void end()
{
end(new End());
}
public void end(End end)
{
synchronized(getLock())
{
switch(_state)
{
case BEGIN_SENT:
_connection.sendEnd(getSendingChannel(), end, false);
_state = SessionState.END_PIPE;
break;
case ACTIVE:
detachLinks();
short sendChannel = getSendingChannel();
_connection.sendEnd(sendChannel, end, true);
_state = SessionState.END_SENT;
break;
default:
sendChannel = getSendingChannel();
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(sendChannel, reply, true);
break;
}
getLock().notifyAll();
}
}
public void receiveEnd(final End end)
{
synchronized(getLock())
{
switch(_state)
{
case END_SENT:
_state = SessionState.ENDED;
break;
case ACTIVE:
detachLinks();
_sessionEventListener.remoteEnd(end);
short sendChannel = getSendingChannel();
_connection.sendEnd(sendChannel, new End(), true);
_state = SessionState.ENDED;
break;
default:
sendChannel = getSendingChannel();
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(sendChannel, reply, true);
break;
}
getLock().notifyAll();
}
}
private void detachLinks()
{
Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
Error error = new Error();
error.setCondition(LinkError.DETACH_FORCED);
error.setDescription("Force detach the link because the session is remotely ended.");
for(UnsignedInteger handle : handles)
{
Detach detach = new Detach();
detach.setClosed(false);
detach.setHandle(handle);
detach.setError(error);
detach(handle, detach);
}
}
public short getSendingChannel()
{
return _sendingChannel;
}
public void receiveAttach(final Attach attach)
{
if(_state == SessionState.ACTIVE)
{
UnsignedInteger handle = attach.getHandle();
if(_remoteLinkEndpoints.containsKey(handle))
{
// TODO - Error - handle busy?
}
else
{
LinkEndpoint endpoint = getLinkMap().get(attach.getName());
if(endpoint == null)
{
endpoint = attach.getRole() == Role.RECEIVER
? new SendingLinkEndpoint(this, attach)
: new ReceivingLinkEndpoint(this, attach);
// TODO : fix below - distinguish between local and remote owned
endpoint.setSource(attach.getSource());
endpoint.setTarget(attach.getTarget());
}
if(attach.getRole() == Role.SENDER)
{
endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
}
_remoteLinkEndpoints.put(handle, endpoint);
if(!_localLinkEndpoints.containsKey(endpoint))
{
UnsignedInteger localHandle = findNextAvailableHandle();
endpoint.setLocalHandle(localHandle);
_localLinkEndpoints.put(endpoint, localHandle);
_sessionEventListener.remoteLinkCreation(endpoint);
}
else
{
endpoint.receiveAttach(attach);
}
}
}
}
private void send(final FrameBody frameBody)
{
_connection.send(this.getSendingChannel(), frameBody);
}
private int send(final FrameBody frameBody, ByteBuffer payload)
{
return _connection.send(this.getSendingChannel(), frameBody, payload);
}
private UnsignedInteger findNextAvailableHandle()
{
int i = 0;
do
{
if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
{
return UnsignedInteger.valueOf(i);
}
} while(++i != 0);
// TODO
throw new RuntimeException();
}
public void receiveDetach(final Detach detach)
{
UnsignedInteger handle = detach.getHandle();
detach(handle, detach);
}
private void detach(UnsignedInteger handle, Detach detach)
{
if(_remoteLinkEndpoints.containsKey(handle))
{
LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
endpoint.remoteDetached(detach);
_localLinkEndpoints.remove(endpoint);
}
else
{
// TODO
}
}
public void receiveTransfer(final Transfer transfer)
{
synchronized(getLock())
{
_nextIncomingTransferId.incr();
/*
_availableIncomingCredit--;
*/
UnsignedInteger handle = transfer.getHandle();
LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
if(endpoint == null)
{
//TODO - error unknown link
System.err.println("Unknown endpoint " + transfer);
}
UnsignedInteger deliveryId = transfer.getDeliveryId();
if(deliveryId == null)
{
deliveryId = ((ReceivingLinkEndpoint)endpoint).getLastDeliveryId();
}
Delivery delivery = _incomingUnsettled.get(deliveryId);
if(delivery == null)
{
delivery = new Delivery(transfer, endpoint);
_incomingUnsettled.put(deliveryId,delivery);
if(delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
{
/*
_availableIncomingCredit++;
*/
}
if(Boolean.TRUE.equals(transfer.getMore()))
{
((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(transfer.getDeliveryId());
}
}
else
{
if(delivery.getDeliveryId().equals(deliveryId))
{
delivery.addTransfer(transfer);
if(delivery.isSettled())
{
/*
_availableIncomingCredit++;
*/
}
else if(Boolean.TRUE.equals(transfer.getAborted()))
{
/*
_availableIncomingCredit += delivery.getTransfers().size();
*/
}
if(!Boolean.TRUE.equals(transfer.getMore()))
{
((ReceivingLinkEndpoint)endpoint).setLastDeliveryId(null);
}
}
else
{
// TODO - error
System.err.println("Incorrect transfer id " + transfer);
}
}
if(endpoint != null)
{
endpoint.receiveTransfer(transfer, delivery);
}
if((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
{
_incomingUnsettled.remove(deliveryId);
}
}
}
public void receiveFlow(final Flow flow)
{
synchronized(getLock())
{
UnsignedInteger handle = flow.getHandle();
LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
_outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
if(endpoint != null)
{
endpoint.receiveFlow( flow );
}
else
{
for(LinkEndpoint le : _remoteLinkEndpoints.values())
{
le.flowStateChanged();
}
}
getLock().notifyAll();
}
}
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
if(dispositionRole == Role.RECEIVER)
{
unsettledTransfers = _outgoingUnsettled;
}
else
{
unsettledTransfers = _incomingUnsettled;
}
UnsignedInteger deliveryId = disposition.getFirst();
UnsignedInteger last = disposition.getLast();
if(last == null)
{
last = deliveryId;
}
while(deliveryId.compareTo(last)<=0)
{
Delivery delivery = unsettledTransfers.get(deliveryId);
if(delivery != null)
{
delivery.getLinkEndpoint().receiveDeliveryState(delivery,
disposition.getState(),
disposition.getSettled());
}
deliveryId = deliveryId.add(UnsignedInteger.ONE);
}
if(disposition.getSettled())
{
checkSendFlow();
}
}
private void checkSendFlow()
{
//TODO
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr)
{
return createSendingLinkEndpoint(name, targetAddr, sourceAddr, null);
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr, final String sourceAddr, Map<Binary, Outcome> unsettled)
{
return createSendingLinkEndpoint(name, targetAddr, sourceAddr, false, unsettled);
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final String targetAddr,
final String sourceAddr, boolean durable,
Map<Binary, Outcome> unsettled)
{
Source source = new Source();
source.setAddress(sourceAddr);
Target target = new Target();
target.setAddress(targetAddr);
if(durable)
{
target.setDurable(TerminusDurability.UNSETTLED_STATE);
target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
}
return createSendingLinkEndpoint(name, source, target, unsettled);
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target)
{
return createSendingLinkEndpoint(name, source, target, null);
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
{
return createSendingLinkEndpoint(name, source, target, unsettled, null);
}
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source,
final org.apache.qpid.amqp_1_0.type.Target target,
Map<Binary, Outcome> unsettled,
DeliveryStateHandler deliveryStateHandler)
{
SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled, deliveryStateHandler);
endpoint.setSource(source);
endpoint.setTarget(target);
UnsignedInteger handle = findNextAvailableHandle();
_localLinkEndpoints.put(endpoint, handle);
endpoint.setLocalHandle(handle);
getLinkMap().put(name, endpoint);
return endpoint;
}
public void sendAttach(Attach attach)
{
send(attach);
}
public void sendTransfer(final Transfer xfr, SendingLinkEndpoint endpoint, boolean newDelivery)
{
_nextOutgoingTransferId.incr();
UnsignedInteger deliveryId;
if(newDelivery)
{
deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
endpoint.setLastDeliveryId(deliveryId);
}
else
{
deliveryId = endpoint.getLastDeliveryId();
}
xfr.setDeliveryId(deliveryId);
if(!Boolean.TRUE.equals(xfr.getSettled()))
{
Delivery delivery;
if((delivery = _outgoingUnsettled.get(deliveryId))== null)
{
delivery = new Delivery(xfr, endpoint);
_outgoingUnsettled.put(deliveryId, delivery);
}
else
{
delivery.addTransfer(xfr);
}
_outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
endpoint.addUnsettled(delivery);
}
try
{
ByteBuffer payload = xfr.getPayload();
int payloadSent = send(xfr, payload);
if(payload != null && payloadSent < payload.remaining())
{
payload = payload.duplicate();
payload.position(payload.position()+payloadSent);
Transfer secondTransfer = new Transfer();
secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
secondTransfer.setHandle(xfr.getHandle());
secondTransfer.setSettled(xfr.getSettled());
secondTransfer.setState(xfr.getState());
secondTransfer.setMessageFormat(xfr.getMessageFormat());
secondTransfer.setPayload(payload);
sendTransfer(secondTransfer, endpoint, false);
}
}
catch(OversizeFrameException e)
{
e.printStackTrace();
}
}
public Object getLock()
{
return _connection.getLock();
}
public long getSyncTimeout()
{
return _connection.getSyncTimeout();
}
public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
{
_connection.waitUntil(predicate);
}
public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
{
_connection.waitUntil(predicate, timeout);
}
public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
String targetAddr,
String sourceAddr,
UnsignedInteger initialCredit,
final DistributionMode distributionMode)
{
Source source = new Source();
source.setAddress(sourceAddr);
source.setDistributionMode(distributionMode);
Target target = new Target();
target.setAddress(targetAddr);
return createReceivingLinkEndpoint(name, target, source, initialCredit);
}
public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
Target target,
Source source,
UnsignedInteger initialCredit)
{
ReceivingLinkEndpoint endpoint = new ReceivingLinkEndpoint(this, name);
endpoint.setLinkCredit(initialCredit);
endpoint.setSource(source);
endpoint.setTarget(target);
UnsignedInteger handle = findNextAvailableHandle();
_localLinkEndpoints.put(endpoint, handle);
endpoint.setLocalHandle(handle);
getLinkMap().put(name, endpoint);
return endpoint;
}
public void updateDisposition(final Role role,
final UnsignedInteger first,
final UnsignedInteger last,
final DeliveryState state,
final boolean settled)
{
Disposition disposition = new Disposition();
disposition.setRole(role);
disposition.setFirst(first);
disposition.setLast(last);
disposition.setSettled(settled);
disposition.setState(state);
if(settled)
{
if(role == Role.RECEIVER)
{
SequenceNumber pos = new SequenceNumber(first.intValue());
SequenceNumber end = new SequenceNumber(last.intValue());
while(pos.compareTo(end)<=0)
{
Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
/*
_availableIncomingCredit += d.getTransfers().size();
*/
pos.incr();
}
}
}
send(disposition);
checkSendFlow();
}
public void settle(Role role, final UnsignedInteger deliveryId)
{
if(role == Role.RECEIVER)
{
Delivery d = _incomingUnsettled.remove(deliveryId);
if(d != null)
{
/*
_availableIncomingCredit += d.getTransfers().size();
*/
}
}
else
{
Delivery d = _outgoingUnsettled.remove(deliveryId);
/* if(d != null)
{
_availableOutgoingCredit += d.getTransfers().size();
}*/
}
}
public void sendFlow()
{
sendFlow(new Flow());
}
public void sendFlow(final Flow flow)
{
final int nextIncomingId = _nextIncomingTransferId.intValue();
flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
_lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
send(flow);
}
public void sendFlowConditional()
{
UnsignedInteger clientsCredit = _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
if(i >=0)
{
sendFlow();
}
}
public void sendDetach(Detach detach)
{
send(detach);
}
void doEnd(End end)
{
}
public void setNextIncomingId(final UnsignedInteger nextIncomingId)
{
_nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
}
public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
{
_outgoingSessionCredit = outgoingSessionCredit;
}
public UnsignedInteger getNextOutgoingId()
{
return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
}
public UnsignedInteger getOutgoingWindowSize()
{
return UnsignedInteger.valueOf(_availableOutgoingCredit);
}
public boolean hasCreditToSend()
{
boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
public UnsignedInteger getIncomingWindowSize()
{
return UnsignedInteger.valueOf(_availableIncomingCredit);
}
public SessionEventListener getSessionEventListener()
{
return _sessionEventListener;
}
public void setSessionEventListener(final SessionEventListener sessionEventListener)
{
_sessionEventListener = sessionEventListener;
}
public ConnectionEndpoint getConnection()
{
return _connection;
}
public SendingLinkEndpoint createTransactionController(String name, TxnCapability... capabilities)
{
Coordinator coordinator = new Coordinator();
coordinator.setCapabilities(capabilities);
Source src = new Source();
return createSendingLinkEndpoint(name, src, coordinator);
}
Map<String, LinkEndpoint> getLinkMap()
{
return _linkMap;
}
public boolean isEnded()
{
return _state == SessionState.ENDED || _connection.isClosed();
}
public boolean isActive()
{
return _state == SessionState.ACTIVE;
}
}