blob: 6840c7344a0e54f548274d8116cebdb33bbe9b00 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.text.MessageFormat;
import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
{
private static final Logger _logger = Logger.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private final SessionEndpoint _endpoint;
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList =
new CopyOnWriteArrayList<Action<? super Session_1_0>>();
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
private AtomicBoolean _closed = new AtomicBoolean();
public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
{
_vhost = vhost;
_endpoint = endpoint;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
}
public void remoteLinkCreation(final LinkEndpoint endpoint)
{
Destination destination;
Link_1_0 link = null;
Error error = null;
final
LinkRegistry
linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
if(endpoint.getRole() == Role.SENDER)
{
SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
if(previousLink == null)
{
Target target = (Target) endpoint.getTarget();
Source source = (Source) endpoint.getSource();
if(source != null)
{
if(Boolean.TRUE.equals(source.getDynamic()))
{
AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
MessageSource queue = _vhost.getMessageSource(addr);
if(queue != null)
{
destination = new MessageSourceDestination(queue);
}
else
{
Exchange exchg = _vhost.getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
}
else
{
endpoint.setSource(null);
destination = null;
}
}
}
else
{
destination = null;
}
if(destination != null)
{
final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
try
{
final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
_vhost,
(SendingDestination) destination
);
sendingLinkEndpoint.setLinkEventListener(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
}
catch(AmqpErrorException e)
{
_logger.error("Error creating sending link", e);
destination = null;
sendingLinkEndpoint.setSource(null);
error = e.getError();
}
}
}
else
{
Source newSource = (Source) endpoint.getSource();
Source oldSource = (Source) previousLink.getEndpoint().getSource();
final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
if(newSourceDurable != null)
{
oldSource.setDurable(newSourceDurable);
if(newSourceDurable.equals(TerminusDurability.NONE))
{
linkRegistry.unregisterSendingLink(endpoint.getName());
}
}
endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
sendingLinkEndpoint.setLinkEventListener(previousLink);
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
}
else
{
if(endpoint.getTarget() instanceof Coordinator)
{
Coordinator coordinator = (Coordinator) endpoint.getTarget();
TxnCapability[] capabilities = coordinator.getCapabilities();
boolean localTxn = false;
boolean multiplePerSession = false;
if(capabilities != null)
{
for(TxnCapability capability : capabilities)
{
if(capability.equals(TxnCapability.LOCAL_TXN))
{
localTxn = true;
}
else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
{
multiplePerSession = true;
}
else
{
error = new Error();
error.setCondition(AmqpError.NOT_IMPLEMENTED);
error.setDescription("Unsupported capability: " + capability);
break;
}
}
}
/* if(!localTxn)
{
capabilities.add(TxnCapabilities.LOCAL_TXN);
}*/
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
link = coordinatorLink;
}
else
{
ReceivingLink_1_0 previousLink =
(ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
if(previousLink == null)
{
Target target = (Target) endpoint.getTarget();
if(target != null)
{
if(Boolean.TRUE.equals(target.getDynamic()))
{
AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
target.setAddress(tempQueue.getName());
}
String addr = target.getAddress();
MessageDestination messageDestination = _vhost.getMessageDestination(addr);
if(messageDestination != null)
{
destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
target.getExpiryPolicy());
}
else
{
AMQQueue queue = _vhost.getQueue(addr);
if(queue != null)
{
destination = new QueueDestination(queue);
}
else
{
endpoint.setTarget(null);
destination = null;
}
}
}
else
{
destination = null;
}
if(destination != null)
{
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
(ReceivingDestination) destination);
receivingLinkEndpoint.setLinkEventListener(receivingLink);
link = receivingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
{
linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}
}
}
else
{
ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
receivingLinkEndpoint.setLinkEventListener(previousLink);
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
}
}
endpoint.attach();
if(link == null)
{
if(error == null)
{
error = new Error();
error.setCondition(AmqpError.NOT_FOUND);
}
endpoint.detach(error);
}
else
{
link.start();
}
}
private AMQQueue createTemporaryQueue(Map properties)
{
final String queueName = UUID.randomUUID().toString();
AMQQueue queue = null;
try
{
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
Map<String,Object> attributes = new HashMap<String,Object>();
attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
if(lifetimePolicy instanceof DeleteOnNoLinks)
{
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
}
else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
{
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else if(lifetimePolicy instanceof DeleteOnClose)
{
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else
{
attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
// TODO convert AMQP 1-0 node properties to queue attributes
final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
}
catch (QpidSecurityException e)
{
//TODO
_logger.info("Security error", e);
throw new ConnectionScopedRuntimeException(e);
}
catch (QueueExistsException e)
{
_logger.error("A temporary queue was created with a name which collided with an existing queue name");
throw new ConnectionScopedRuntimeException(e);
}
return queue;
}
public ServerTransaction getTransaction(Binary transactionId)
{
// TODO should treat invalid id differently to null
ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
return transaction == null ? _transaction : transaction;
}
public void remoteEnd(End end)
{
Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry<Integer, ServerTransaction> entry = iter.next();
entry.getValue().rollback();
iter.remove();
}
_connection.sessionEnded(this);
}
Integer binaryToInteger(final Binary txnId)
{
if(txnId == null)
{
return null;
}
if(txnId.getLength() > 4)
throw new IllegalArgumentException();
int id = 0;
byte[] data = txnId.getArray();
for(int i = 0; i < txnId.getLength(); i++)
{
id <<= 8;
id += data[i+txnId.getArrayOffset()];
}
return id;
}
Binary integerToBinary(final int txnId)
{
byte[] data = new byte[4];
data[3] = (byte) (txnId & 0xff);
data[2] = (byte) ((txnId & 0xff00) >> 8);
data[1] = (byte) ((txnId & 0xff0000) >> 16);
data[0] = (byte) ((txnId & 0xff000000) >> 24);
return new Binary(data);
}
@Override
public UUID getId()
{
return _id;
}
@Override
public Connection_1_0 getConnectionModel()
{
return _connection;
}
@Override
public String getClientID()
{
// TODO
return "";
}
@Override
public void close()
{
performCloseTasks();
_endpoint.end();
}
protected void performCloseTasks()
{
if(_closed.compareAndSet(false, true))
{
List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList);
_taskList.clear();
for(Action<? super Session_1_0> task : taskList)
{
task.performAction(this);
}
}
}
@Override
public void close(AMQConstant cause, String message)
{
performCloseTasks();
final End end = new End();
final Error theError = new Error();
theError.setDescription(message);
theError.setCondition(ConnectionError.CONNECTION_FORCED);
end.setError(theError);
_endpoint.end(end);
}
@Override
public LogSubject getLogSubject()
{
return this;
}
@Override
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
{
// TODO - required for AMQSessionModel / long running transaction detection
}
@Override
public void block(AMQQueue queue)
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void unblock(AMQQueue queue)
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void block()
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public void unblock()
{
// TODO - required for AMQSessionModel / producer side flow control
}
@Override
public boolean getBlocking()
{
// TODO
return false;
}
@Override
public Object getConnectionReference()
{
return getConnection().getReference();
}
@Override
public int getUnacknowledgedMessageCount()
{
// TODO
return 0;
}
@Override
public Long getTxnCount()
{
// TODO
return 0l;
}
@Override
public Long getTxnStart()
{
// TODO
return 0l;
}
@Override
public Long getTxnCommits()
{
// TODO
return 0l;
}
@Override
public Long getTxnRejects()
{
// TODO
return 0l;
}
@Override
public int getChannelId()
{
return _endpoint.getSendingChannel();
}
@Override
public int getConsumerCount()
{
// TODO
return 0;
}
public String toLogString()
{
long connectionId = getConnectionModel().getConnectionId();
String remoteAddress = getConnectionModel().getRemoteAddressString();
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
getClientID(),
remoteAddress,
_vhost.getName(),
_endpoint.getSendingChannel()) + "] ";
}
@Override
public int compareTo(Session_1_0 o)
{
return getId().compareTo(o.getId());
}
public Connection_1_0 getConnection()
{
return _connection;
}
@Override
public void addDeleteTask(final Action<? super Session_1_0> task)
{
if(!_closed.get())
{
_taskList.add(task);
}
}
@Override
public void removeDeleteTask(final Action<? super Session_1_0> task)
{
_taskList.remove(task);
}
}