blob: a0ed824c58fa4cccdf3af89eb49a89652a8e1b7b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.text.MessageFormat;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
{
_vhost = vhost;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
}
public void remoteLinkCreation(final LinkEndpoint endpoint)
{
Destination destination;
Link_1_0 link = null;
Error error = null;
final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
if(endpoint.getRole() == Role.SENDER)
{
SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName());
if(previousLink == null)
{
Target target = (Target) endpoint.getTarget();
Source source = (Source) endpoint.getSource();
if(source != null)
{
if(Boolean.TRUE.equals(source.getDynamic()))
{
AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
if(queue != null)
{
destination = new QueueDestination(queue);
}
else
{
Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
}
else
{
endpoint.setSource(null);
destination = null;
}
}
}
else
{
destination = null;
}
if(destination != null)
{
final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
try
{
final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
_vhost,
(SendingDestination) destination
);
sendingLinkEndpoint.setLinkEventListener(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
}
catch(AmqpErrorException e)
{
e.printStackTrace();
destination = null;
sendingLinkEndpoint.setSource(null);
error = e.getError();
}
}
}
else
{
Source newSource = (Source) endpoint.getSource();
Source oldSource = (Source) previousLink.getEndpoint().getSource();
final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
if(newSourceDurable != null)
{
oldSource.setDurable(newSourceDurable);
if(newSourceDurable.equals(TerminusDurability.NONE))
{
linkRegistry.unregisterSendingLink(endpoint.getName());
}
}
endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
sendingLinkEndpoint.setLinkEventListener(previousLink);
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
}
else
{
if(endpoint.getTarget() instanceof Coordinator)
{
Coordinator coordinator = (Coordinator) endpoint.getTarget();
TxnCapability[] capabilities = coordinator.getCapabilities();
boolean localTxn = false;
boolean multiplePerSession = false;
if(capabilities != null)
{
for(TxnCapability capability : capabilities)
{
if(capability.equals(TxnCapability.LOCAL_TXN))
{
localTxn = true;
}
else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
{
multiplePerSession = true;
}
else
{
error = new Error();
error.setCondition(AmqpError.NOT_IMPLEMENTED);
error.setDescription("Unsupported capability: " + capability);
break;
}
}
}
/* if(!localTxn)
{
capabilities.add(TxnCapabilities.LOCAL_TXN);
}*/
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
link = coordinatorLink;
}
else
{
ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName());
if(previousLink == null)
{
Target target = (Target) endpoint.getTarget();
if(target != null)
{
if(Boolean.TRUE.equals(target.getDynamic()))
{
AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
target.setAddress(tempQueue.getName());
}
String addr = target.getAddress();
Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, target.getDurable(),
target.getExpiryPolicy());
}
else
{
AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
if(queue != null)
{
destination = new QueueDestination(queue);
}
else
{
endpoint.setTarget(null);
destination = null;
}
}
}
else
{
destination = null;
}
if(destination != null)
{
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
(ReceivingDestination) destination);
receivingLinkEndpoint.setLinkEventListener(receivingLink);
link = receivingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
{
linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}
}
}
else
{
ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
receivingLinkEndpoint.setLinkEventListener(previousLink);
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
}
}
endpoint.attach();
if(link == null)
{
if(error == null)
{
error = new Error();
error.setCondition(AmqpError.NOT_FOUND);
}
endpoint.detach(error);
}
else
{
link.start();
}
}
private AMQQueue createTemporaryQueue(Map properties)
{
final String queueName = UUID.randomUUID().toString();
AMQQueue queue = null;
try
{
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
queueName,
false, // durable
null, // owner
false, // autodelete
false, // exclusive
_vhost,
properties);
if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
{
final Connection_1_0.Task deleteQueueTask =
new Connection_1_0.Task()
{
public void doTask(Connection_1_0 session)
{
if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
{
try
{
tempQueue.delete();
}
catch (AMQException e)
{
e.printStackTrace(); //TODO.
}
}
}
};
_connection.addConnectionCloseTask(deleteQueueTask);
queue.addQueueDeleteTask(new AMQQueue.Task()
{
public void doTask(AMQQueue queue)
{
_connection.removeConnectionCloseTask(deleteQueueTask);
}
});
}
else if(lifetimePolicy instanceof DeleteOnNoLinks)
{
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
}
else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
{
}
}
catch (AMQSecurityException e)
{
e.printStackTrace(); //TODO.
} catch (AMQException e)
{
e.printStackTrace(); //TODO
}
return queue;
}
public ServerTransaction getTransaction(Binary transactionId)
{
// TODO should treat invalid id differently to null
ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
return transaction == null ? _transaction : transaction;
}
public void remoteEnd(End end)
{
Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry<Integer, ServerTransaction> entry = iter.next();
entry.getValue().rollback();
iter.remove();
}
_connection.sessionEnded(this);
}
Integer binaryToInteger(final Binary txnId)
{
if(txnId == null)
{
return null;
}
if(txnId.getLength() > 4)
throw new IllegalArgumentException();
int id = 0;
byte[] data = txnId.getArray();
for(int i = 0; i < txnId.getLength(); i++)
{
id <<= 8;
id += data[i+txnId.getArrayOffset()];
}
return id;
}
Binary integerToBinary(final int txnId)
{
byte[] data = new byte[4];
data[3] = (byte) (txnId & 0xff);
data[2] = (byte) ((txnId & 0xff00) >> 8);
data[1] = (byte) ((txnId & 0xff0000) >> 16);
data[0] = (byte) ((txnId & 0xff000000) >> 24);
return new Binary(data);
}
public void forceEnd()
{
}
@Override
public UUID getId()
{
return _id;
}
@Override
public AMQConnectionModel getConnectionModel()
{
return _connection.getModel();
}
@Override
public String getClientID()
{
// TODO
return "";
}
@Override
public void close() throws AMQException
{
// TODO - required for AMQSessionModel / management initiated closing
}
@Override
public LogSubject getLogSubject()
{
return this;
}
@Override
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
// TODO - required for AMQSessionModel / long running transaction detection
}
@Override
public void block(AMQQueue queue)
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void unblock(AMQQueue queue)
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void block()
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void unblock()
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public boolean getBlocking()
{
// TODO
return false;
}
@Override
public boolean onSameConnection(InboundMessage inbound)
{
// TODO
return false;
}
@Override
public int getUnacknowledgedMessageCount()
{
// TODO
return 0;
}
@Override
public Long getTxnCount()
{
// TODO
return 0l;
}
@Override
public Long getTxnStart()
{
// TODO
return 0l;
}
@Override
public Long getTxnCommits()
{
// TODO
return 0l;
}
@Override
public Long getTxnRejects()
{
// TODO
return 0l;
}
@Override
public int getChannelId()
{
// TODO
return 0;
}
@Override
public int getConsumerCount()
{
// TODO
return 0;
}
public String toLogString()
{
long connectionId = getConnectionModel().getConnectionId();
String remoteAddress = getConnectionModel().getRemoteAddressString();
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
getClientID(),
remoteAddress,
_vhost.getName(), // TODO - virtual host
0) // TODO - channel)
+ "] ";
}
@Override
public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
}