| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.protocol.v1_0; |
| |
| import java.text.MessageFormat; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; |
| import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; |
| import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; |
| import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; |
| import org.apache.qpid.amqp_1_0.transport.SessionEventListener; |
| import org.apache.qpid.amqp_1_0.type.*; |
| import org.apache.qpid.amqp_1_0.type.LifetimePolicy; |
| import org.apache.qpid.amqp_1_0.type.messaging.*; |
| import org.apache.qpid.amqp_1_0.type.messaging.Source; |
| import org.apache.qpid.amqp_1_0.type.messaging.Target; |
| import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; |
| import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; |
| import org.apache.qpid.amqp_1_0.type.transport.*; |
| |
| import org.apache.qpid.amqp_1_0.type.transport.Error; |
| import org.apache.qpid.server.model.*; |
| import org.apache.qpid.server.security.QpidSecurityException; |
| import org.apache.qpid.protocol.AMQConstant; |
| import org.apache.qpid.server.exchange.Exchange; |
| import org.apache.qpid.server.logging.LogSubject; |
| import org.apache.qpid.server.message.MessageDestination; |
| import org.apache.qpid.server.message.MessageSource; |
| import org.apache.qpid.server.protocol.AMQConnectionModel; |
| import org.apache.qpid.server.protocol.AMQSessionModel; |
| import org.apache.qpid.server.protocol.LinkRegistry; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.txn.AutoCommitTransaction; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.util.Action; |
| import org.apache.qpid.server.util.ConnectionScopedRuntimeException; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| import org.apache.qpid.server.virtualhost.QueueExistsException; |
| |
| import java.util.*; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; |
| |
| public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject |
| { |
| private static final Logger _logger = Logger.getLogger(Session_1_0.class); |
| private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); |
| private final SessionEndpoint _endpoint; |
| private VirtualHost _vhost; |
| private AutoCommitTransaction _transaction; |
| |
| private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = |
| new LinkedHashMap<Integer, ServerTransaction>(); |
| |
| private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList = |
| new CopyOnWriteArrayList<Action<? super Session_1_0>>(); |
| |
| private final Connection_1_0 _connection; |
| private UUID _id = UUID.randomUUID(); |
| private AtomicBoolean _closed = new AtomicBoolean(); |
| |
| |
| public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) |
| { |
| _vhost = vhost; |
| _endpoint = endpoint; |
| _transaction = new AutoCommitTransaction(vhost.getMessageStore()); |
| _connection = connection; |
| |
| } |
| |
| public void remoteLinkCreation(final LinkEndpoint endpoint) |
| { |
| |
| |
| Destination destination; |
| Link_1_0 link = null; |
| Error error = null; |
| |
| final |
| LinkRegistry |
| linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); |
| |
| |
| if(endpoint.getRole() == Role.SENDER) |
| { |
| |
| SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); |
| |
| if(previousLink == null) |
| { |
| |
| Target target = (Target) endpoint.getTarget(); |
| Source source = (Source) endpoint.getSource(); |
| |
| |
| if(source != null) |
| { |
| if(Boolean.TRUE.equals(source.getDynamic())) |
| { |
| AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties()); |
| source.setAddress(tempQueue.getName()); |
| } |
| String addr = source.getAddress(); |
| MessageSource queue = _vhost.getMessageSource(addr); |
| if(queue != null) |
| { |
| |
| destination = new MessageSourceDestination(queue); |
| |
| |
| |
| } |
| else |
| { |
| Exchange exchg = _vhost.getExchange(addr); |
| if(exchg != null) |
| { |
| destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); |
| } |
| else |
| { |
| |
| endpoint.setSource(null); |
| destination = null; |
| } |
| } |
| |
| } |
| else |
| { |
| destination = null; |
| } |
| |
| if(destination != null) |
| { |
| final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; |
| try |
| { |
| final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint), |
| _vhost, |
| (SendingDestination) destination |
| ); |
| sendingLinkEndpoint.setLinkEventListener(sendingLink); |
| link = sendingLink; |
| if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) |
| { |
| linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); |
| } |
| } |
| catch(AmqpErrorException e) |
| { |
| _logger.error("Error creating sending link", e); |
| destination = null; |
| sendingLinkEndpoint.setSource(null); |
| error = e.getError(); |
| } |
| } |
| } |
| else |
| { |
| Source newSource = (Source) endpoint.getSource(); |
| |
| Source oldSource = (Source) previousLink.getEndpoint().getSource(); |
| final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable(); |
| if(newSourceDurable != null) |
| { |
| oldSource.setDurable(newSourceDurable); |
| if(newSourceDurable.equals(TerminusDurability.NONE)) |
| { |
| linkRegistry.unregisterSendingLink(endpoint.getName()); |
| } |
| } |
| endpoint.setSource(oldSource); |
| SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; |
| previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); |
| sendingLinkEndpoint.setLinkEventListener(previousLink); |
| link = previousLink; |
| endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); |
| } |
| } |
| else |
| { |
| if(endpoint.getTarget() instanceof Coordinator) |
| { |
| Coordinator coordinator = (Coordinator) endpoint.getTarget(); |
| TxnCapability[] capabilities = coordinator.getCapabilities(); |
| boolean localTxn = false; |
| boolean multiplePerSession = false; |
| if(capabilities != null) |
| { |
| for(TxnCapability capability : capabilities) |
| { |
| if(capability.equals(TxnCapability.LOCAL_TXN)) |
| { |
| localTxn = true; |
| } |
| else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) |
| { |
| multiplePerSession = true; |
| } |
| else |
| { |
| error = new Error(); |
| error.setCondition(AmqpError.NOT_IMPLEMENTED); |
| error.setDescription("Unsupported capability: " + capability); |
| break; |
| } |
| } |
| } |
| |
| /* if(!localTxn) |
| { |
| capabilities.add(TxnCapabilities.LOCAL_TXN); |
| }*/ |
| |
| final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; |
| final TxnCoordinatorLink_1_0 coordinatorLink = |
| new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); |
| receivingLinkEndpoint.setLinkEventListener(coordinatorLink); |
| link = coordinatorLink; |
| |
| |
| } |
| else |
| { |
| |
| ReceivingLink_1_0 previousLink = |
| (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName()); |
| |
| if(previousLink == null) |
| { |
| |
| Target target = (Target) endpoint.getTarget(); |
| |
| if(target != null) |
| { |
| if(Boolean.TRUE.equals(target.getDynamic())) |
| { |
| |
| AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties()); |
| target.setAddress(tempQueue.getName()); |
| } |
| |
| String addr = target.getAddress(); |
| MessageDestination messageDestination = _vhost.getMessageDestination(addr); |
| if(messageDestination != null) |
| { |
| destination = new NodeReceivingDestination(messageDestination, target.getDurable(), |
| target.getExpiryPolicy()); |
| } |
| else |
| { |
| AMQQueue queue = _vhost.getQueue(addr); |
| if(queue != null) |
| { |
| |
| destination = new QueueDestination(queue); |
| } |
| else |
| { |
| endpoint.setTarget(null); |
| destination = null; |
| } |
| |
| } |
| |
| |
| } |
| else |
| { |
| destination = null; |
| } |
| if(destination != null) |
| { |
| final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; |
| final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, |
| (ReceivingDestination) destination); |
| receivingLinkEndpoint.setLinkEventListener(receivingLink); |
| link = receivingLink; |
| if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) |
| { |
| linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); |
| } |
| } |
| } |
| else |
| { |
| ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; |
| previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint)); |
| receivingLinkEndpoint.setLinkEventListener(previousLink); |
| link = previousLink; |
| endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); |
| |
| } |
| } |
| } |
| |
| endpoint.attach(); |
| |
| if(link == null) |
| { |
| if(error == null) |
| { |
| error = new Error(); |
| error.setCondition(AmqpError.NOT_FOUND); |
| } |
| endpoint.detach(error); |
| } |
| else |
| { |
| link.start(); |
| } |
| } |
| |
| |
| private AMQQueue createTemporaryQueue(Map properties) |
| { |
| final String queueName = UUID.randomUUID().toString(); |
| AMQQueue queue = null; |
| try |
| { |
| LifetimePolicy lifetimePolicy = properties == null |
| ? null |
| : (LifetimePolicy) properties.get(LIFETIME_POLICY); |
| Map<String,Object> attributes = new HashMap<String,Object>(); |
| attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); |
| attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); |
| attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); |
| |
| if(lifetimePolicy instanceof DeleteOnNoLinks) |
| { |
| attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, |
| org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); |
| } |
| else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) |
| { |
| attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, |
| org.apache.qpid.server.model.LifetimePolicy.IN_USE); |
| } |
| else if(lifetimePolicy instanceof DeleteOnClose) |
| { |
| attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, |
| org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); |
| } |
| else if(lifetimePolicy instanceof DeleteOnNoMessages) |
| { |
| attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, |
| org.apache.qpid.server.model.LifetimePolicy.IN_USE); |
| } |
| else |
| { |
| attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, |
| org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); |
| } |
| |
| |
| // TODO convert AMQP 1-0 node properties to queue attributes |
| |
| final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); |
| } |
| catch (QpidSecurityException e) |
| { |
| //TODO |
| _logger.info("Security error", e); |
| throw new ConnectionScopedRuntimeException(e); |
| } |
| catch (QueueExistsException e) |
| { |
| _logger.error("A temporary queue was created with a name which collided with an existing queue name"); |
| throw new ConnectionScopedRuntimeException(e); |
| } |
| |
| return queue; |
| } |
| |
| public ServerTransaction getTransaction(Binary transactionId) |
| { |
| // TODO should treat invalid id differently to null |
| ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId)); |
| return transaction == null ? _transaction : transaction; |
| } |
| |
| public void remoteEnd(End end) |
| { |
| Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator(); |
| |
| while(iter.hasNext()) |
| { |
| Map.Entry<Integer, ServerTransaction> entry = iter.next(); |
| entry.getValue().rollback(); |
| iter.remove(); |
| } |
| |
| _connection.sessionEnded(this); |
| |
| } |
| |
| Integer binaryToInteger(final Binary txnId) |
| { |
| if(txnId == null) |
| { |
| return null; |
| } |
| |
| if(txnId.getLength() > 4) |
| throw new IllegalArgumentException(); |
| |
| int id = 0; |
| byte[] data = txnId.getArray(); |
| for(int i = 0; i < txnId.getLength(); i++) |
| { |
| id <<= 8; |
| id += data[i+txnId.getArrayOffset()]; |
| } |
| |
| return id; |
| |
| } |
| |
| Binary integerToBinary(final int txnId) |
| { |
| byte[] data = new byte[4]; |
| data[3] = (byte) (txnId & 0xff); |
| data[2] = (byte) ((txnId & 0xff00) >> 8); |
| data[1] = (byte) ((txnId & 0xff0000) >> 16); |
| data[0] = (byte) ((txnId & 0xff000000) >> 24); |
| return new Binary(data); |
| |
| } |
| |
| @Override |
| public UUID getId() |
| { |
| return _id; |
| } |
| |
| @Override |
| public Connection_1_0 getConnectionModel() |
| { |
| return _connection; |
| } |
| |
| @Override |
| public String getClientID() |
| { |
| // TODO |
| return ""; |
| } |
| |
| @Override |
| public void close() |
| { |
| performCloseTasks(); |
| _endpoint.end(); |
| } |
| |
| protected void performCloseTasks() |
| { |
| |
| if(_closed.compareAndSet(false, true)) |
| { |
| List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList); |
| _taskList.clear(); |
| for(Action<? super Session_1_0> task : taskList) |
| { |
| task.performAction(this); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void close(AMQConstant cause, String message) |
| { |
| performCloseTasks(); |
| final End end = new End(); |
| final Error theError = new Error(); |
| theError.setDescription(message); |
| theError.setCondition(ConnectionError.CONNECTION_FORCED); |
| end.setError(theError); |
| _endpoint.end(end); |
| } |
| |
| @Override |
| public LogSubject getLogSubject() |
| { |
| return this; |
| } |
| |
| @Override |
| public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) |
| { |
| // TODO - required for AMQSessionModel / long running transaction detection |
| } |
| |
| @Override |
| public void block(AMQQueue queue) |
| { |
| // TODO - required for AMQSessionModel / producer side flow control |
| } |
| |
| @Override |
| public void unblock(AMQQueue queue) |
| { |
| // TODO - required for AMQSessionModel / producer side flow control |
| } |
| |
| @Override |
| public void block() |
| { |
| // TODO - required for AMQSessionModel / producer side flow control |
| } |
| |
| @Override |
| public void unblock() |
| { |
| // TODO - required for AMQSessionModel / producer side flow control |
| } |
| |
| @Override |
| public boolean getBlocking() |
| { |
| // TODO |
| return false; |
| } |
| |
| @Override |
| public Object getConnectionReference() |
| { |
| return getConnection().getReference(); |
| } |
| |
| @Override |
| public int getUnacknowledgedMessageCount() |
| { |
| // TODO |
| return 0; |
| } |
| |
| @Override |
| public Long getTxnCount() |
| { |
| // TODO |
| return 0l; |
| } |
| |
| @Override |
| public Long getTxnStart() |
| { |
| // TODO |
| return 0l; |
| } |
| |
| @Override |
| public Long getTxnCommits() |
| { |
| // TODO |
| return 0l; |
| } |
| |
| @Override |
| public Long getTxnRejects() |
| { |
| // TODO |
| return 0l; |
| } |
| |
| @Override |
| public int getChannelId() |
| { |
| return _endpoint.getSendingChannel(); |
| } |
| |
| @Override |
| public int getConsumerCount() |
| { |
| // TODO |
| return 0; |
| } |
| |
| |
| public String toLogString() |
| { |
| long connectionId = getConnectionModel().getConnectionId(); |
| |
| String remoteAddress = getConnectionModel().getRemoteAddressString(); |
| |
| return "[" + |
| MessageFormat.format(CHANNEL_FORMAT, |
| connectionId, |
| getClientID(), |
| remoteAddress, |
| _vhost.getName(), |
| _endpoint.getSendingChannel()) + "] "; |
| } |
| |
| @Override |
| public int compareTo(Session_1_0 o) |
| { |
| return getId().compareTo(o.getId()); |
| } |
| |
| public Connection_1_0 getConnection() |
| { |
| return _connection; |
| } |
| |
| @Override |
| public void addDeleteTask(final Action<? super Session_1_0> task) |
| { |
| if(!_closed.get()) |
| { |
| _taskList.add(task); |
| } |
| } |
| |
| @Override |
| public void removeDeleteTask(final Action<? super Session_1_0> task) |
| { |
| _taskList.remove(task); |
| } |
| } |