| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.transport; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Map; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQUnknownExchangeType; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.server.exchange.Exchange; |
| import org.apache.qpid.server.exchange.ExchangeFactory; |
| import org.apache.qpid.server.exchange.ExchangeInUseException; |
| import org.apache.qpid.server.exchange.ExchangeRegistry; |
| import org.apache.qpid.server.exchange.ExchangeType; |
| import org.apache.qpid.server.exchange.HeadersExchange; |
| import org.apache.qpid.server.filter.FilterManager; |
| import org.apache.qpid.server.filter.FilterManagerFactory; |
| import org.apache.qpid.server.flow.FlowCreditManager_0_10; |
| import org.apache.qpid.server.flow.WindowCreditManager; |
| import org.apache.qpid.server.logging.messages.ExchangeMessages; |
| import org.apache.qpid.server.message.MessageMetaData_0_10; |
| import org.apache.qpid.server.message.MessageTransferMessage; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.queue.AMQQueueFactory; |
| import org.apache.qpid.server.queue.BaseQueue; |
| import org.apache.qpid.server.queue.QueueRegistry; |
| import org.apache.qpid.server.registry.ApplicationRegistry; |
| import org.apache.qpid.server.security.SecurityManager; |
| import org.apache.qpid.server.store.DurableConfigurationStore; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; |
| import org.apache.qpid.server.subscription.Subscription_0_10; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| import org.apache.qpid.transport.Acquired; |
| import org.apache.qpid.transport.DeliveryProperties; |
| import org.apache.qpid.transport.ExchangeBind; |
| import org.apache.qpid.transport.ExchangeBound; |
| import org.apache.qpid.transport.ExchangeBoundResult; |
| import org.apache.qpid.transport.ExchangeDeclare; |
| import org.apache.qpid.transport.ExchangeDelete; |
| import org.apache.qpid.transport.ExchangeQuery; |
| import org.apache.qpid.transport.ExchangeQueryResult; |
| import org.apache.qpid.transport.ExchangeUnbind; |
| import org.apache.qpid.transport.ExecutionErrorCode; |
| import org.apache.qpid.transport.ExecutionException; |
| import org.apache.qpid.transport.MessageAccept; |
| import org.apache.qpid.transport.MessageAcceptMode; |
| import org.apache.qpid.transport.MessageAcquire; |
| import org.apache.qpid.transport.MessageAcquireMode; |
| import org.apache.qpid.transport.MessageCancel; |
| import org.apache.qpid.transport.MessageFlow; |
| import org.apache.qpid.transport.MessageFlowMode; |
| import org.apache.qpid.transport.MessageFlush; |
| import org.apache.qpid.transport.MessageReject; |
| import org.apache.qpid.transport.MessageRejectCode; |
| import org.apache.qpid.transport.MessageRelease; |
| import org.apache.qpid.transport.MessageResume; |
| import org.apache.qpid.transport.MessageSetFlowMode; |
| import org.apache.qpid.transport.MessageStop; |
| import org.apache.qpid.transport.MessageSubscribe; |
| import org.apache.qpid.transport.MessageTransfer; |
| import org.apache.qpid.transport.Method; |
| import org.apache.qpid.transport.QueueDeclare; |
| import org.apache.qpid.transport.QueueDelete; |
| import org.apache.qpid.transport.QueuePurge; |
| import org.apache.qpid.transport.QueueQuery; |
| import org.apache.qpid.transport.QueueQueryResult; |
| import org.apache.qpid.transport.RangeSet; |
| import org.apache.qpid.transport.Session; |
| import org.apache.qpid.transport.SessionDelegate; |
| import org.apache.qpid.transport.TxCommit; |
| import org.apache.qpid.transport.TxRollback; |
| import org.apache.qpid.transport.TxSelect; |
| |
| public class ServerSessionDelegate extends SessionDelegate |
| { |
| private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); |
| |
| public ServerSessionDelegate() |
| { |
| |
| } |
| |
| @Override |
| public void command(Session session, Method method) |
| { |
| try |
| { |
| setThreadSubject(session); |
| |
| if(!session.isClosing()) |
| { |
| super.command(session, method); |
| if (method.isSync()) |
| { |
| session.flushProcessed(); |
| } |
| } |
| } |
| catch(RuntimeException e) |
| { |
| LOGGER.error("Exception processing command", e); |
| exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e); |
| } |
| } |
| |
| @Override |
| public void messageAccept(Session session, MessageAccept method) |
| { |
| ((ServerSession)session).accept(method.getTransfers()); |
| } |
| |
| @Override |
| public void messageReject(Session session, MessageReject method) |
| { |
| ((ServerSession)session).reject(method.getTransfers()); |
| } |
| |
| @Override |
| public void messageRelease(Session session, MessageRelease method) |
| { |
| ((ServerSession)session).release(method.getTransfers()); |
| } |
| |
| @Override |
| public void messageAcquire(Session session, MessageAcquire method) |
| { |
| RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers()); |
| |
| Acquired result = new Acquired(acquiredRanges); |
| |
| |
| session.executionResult((int) method.getId(), result); |
| |
| |
| } |
| |
| @Override |
| public void messageResume(Session session, MessageResume method) |
| { |
| super.messageResume(session, method); |
| } |
| |
| @Override |
| public void messageSubscribe(Session session, MessageSubscribe method) |
| { |
| //TODO - work around broken Python tests |
| if(!method.hasAcceptMode()) |
| { |
| method.setAcceptMode(MessageAcceptMode.EXPLICIT); |
| } |
| if(!method.hasAcquireMode()) |
| { |
| method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED); |
| |
| } |
| |
| /* if(!method.hasAcceptMode()) |
| { |
| exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Accept-mode not supplied"); |
| } |
| else if(!method.hasAcquireMode()) |
| { |
| exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"); |
| } |
| else */if(!method.hasQueue()) |
| { |
| exception(session,method,ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied"); |
| } |
| else |
| { |
| String destination = method.getDestination(); |
| |
| if(((ServerSession)session).getSubscription(destination)!=null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'"); |
| } |
| else |
| { |
| String queueName = method.getQueue(); |
| QueueRegistry queueRegistry = getQueueRegistry(session); |
| |
| |
| final AMQQueue queue = queueRegistry.getQueue(queueName); |
| |
| if(queue == null) |
| { |
| exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); |
| } |
| else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) |
| { |
| exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); |
| } |
| else |
| { |
| if(queue.isExclusive()) |
| { |
| ServerSession s = (ServerSession) session; |
| queue.setExclusiveOwningSession(s); |
| if(queue.getAuthorizationHolder() == null) |
| { |
| queue.setAuthorizationHolder(s); |
| queue.setExclusiveOwningSession(s); |
| ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() |
| { |
| public void doTask(ServerSession session) |
| { |
| if(queue.getAuthorizationHolder() == session) |
| { |
| queue.setAuthorizationHolder(null); |
| queue.setExclusiveOwningSession(null); |
| } |
| } |
| }); |
| } |
| |
| } |
| |
| FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); |
| |
| FilterManager filterManager = null; |
| try |
| { |
| filterManager = FilterManagerFactory.createManager(method.getArguments()); |
| } |
| catch (AMQException amqe) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager"); |
| return; |
| } |
| |
| Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session, |
| destination, |
| method.getAcceptMode(), |
| method.getAcquireMode(), |
| MessageFlowMode.WINDOW, |
| creditManager, |
| filterManager, |
| method.getArguments()); |
| |
| ((ServerSession)session).register(destination, sub); |
| try |
| { |
| queue.registerSubscription(sub, method.getExclusive()); |
| } |
| catch (AMQQueue.ExistingExclusiveSubscription existing) |
| { |
| exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); |
| } |
| catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) |
| { |
| exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot subscribe to '" + destination); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void messageTransfer(Session ssn, MessageTransfer xfr) |
| { |
| final Exchange exchange = getExchangeForMessage(ssn, xfr); |
| |
| DeliveryProperties delvProps = null; |
| if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) |
| { |
| delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); |
| } |
| |
| final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); |
| |
| if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) |
| { |
| ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; |
| String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; |
| exception(ssn, xfr, errorCode, description); |
| |
| return; |
| } |
| |
| final Exchange exchangeInUse; |
| ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData); |
| if(queues.isEmpty() && exchange.getAlternateExchange() != null) |
| { |
| final Exchange alternateExchange = exchange.getAlternateExchange(); |
| queues = alternateExchange.route(messageMetaData); |
| if (!queues.isEmpty()) |
| { |
| exchangeInUse = alternateExchange; |
| } |
| else |
| { |
| exchangeInUse = exchange; |
| } |
| } |
| else |
| { |
| exchangeInUse = exchange; |
| } |
| |
| if(!queues.isEmpty()) |
| { |
| final MessageStore store = getVirtualHost(ssn).getMessageStore(); |
| final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store); |
| MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); |
| ((ServerSession) ssn).enqueue(message, queues); |
| } |
| else |
| { |
| if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) |
| { |
| RangeSet rejects = new RangeSet(); |
| rejects.add(xfr.getId()); |
| MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); |
| ssn.invoke(reject); |
| } |
| else |
| { |
| ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); |
| } |
| } |
| |
| ssn.processed(xfr); |
| } |
| |
| private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr, |
| final MessageMetaData_0_10 messageMetaData, final MessageStore store) |
| { |
| final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); |
| ByteBuffer body = xfr.getBody(); |
| if(body != null) |
| { |
| storeMessage.addContent(0, body); |
| } |
| storeMessage.flushToStore(); |
| return storeMessage; |
| } |
| |
| @Override |
| public void messageCancel(Session session, MessageCancel method) |
| { |
| String destination = method.getDestination(); |
| |
| Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); |
| |
| if(sub == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); |
| } |
| else |
| { |
| AMQQueue queue = sub.getQueue(); |
| ((ServerSession)session).unregister(sub); |
| if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) |
| { |
| queue.setAuthorizationHolder(null); |
| } |
| } |
| } |
| |
| @Override |
| public void messageFlush(Session session, MessageFlush method) |
| { |
| String destination = method.getDestination(); |
| |
| Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); |
| |
| if(sub == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); |
| } |
| else |
| { |
| |
| try |
| { |
| sub.flush(); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot flush subscription '" + destination); |
| } |
| } |
| } |
| |
| @Override |
| public void txSelect(Session session, TxSelect method) |
| { |
| // TODO - check current tx mode |
| ((ServerSession)session).selectTx(); |
| } |
| |
| @Override |
| public void txCommit(Session session, TxCommit method) |
| { |
| // TODO - check current tx mode |
| ((ServerSession)session).commit(); |
| } |
| |
| @Override |
| public void txRollback(Session session, TxRollback method) |
| { |
| // TODO - check current tx mode |
| ((ServerSession)session).rollback(); |
| } |
| |
| |
| @Override |
| public void exchangeDeclare(Session session, ExchangeDeclare method) |
| { |
| String exchangeName = method.getExchange(); |
| VirtualHost virtualHost = getVirtualHost(session); |
| Exchange exchange = getExchange(session, exchangeName); |
| |
| //we must check for any unsupported arguments present and throw not-implemented |
| if(method.hasArguments()) |
| { |
| Map<String,Object> args = method.getArguments(); |
| |
| //QPID-3392: currently we don't support any! |
| if(!args.isEmpty()) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString()); |
| return; |
| } |
| } |
| |
| if(method.getPassive()) |
| { |
| if(exchange == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'"); |
| |
| } |
| else |
| { |
| if(!exchange.getTypeShortString().toString().equals(method.getType())) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); |
| } |
| } |
| |
| } |
| else |
| { |
| if (exchange == null) |
| { |
| ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); |
| ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); |
| |
| |
| |
| try |
| { |
| |
| exchange = exchangeFactory.createExchange(method.getExchange(), |
| method.getType(), |
| method.getDurable(), |
| method.getAutoDelete()); |
| |
| String alternateExchangeName = method.getAlternateExchange(); |
| if(alternateExchangeName != null && alternateExchangeName.length() != 0) |
| { |
| Exchange alternate = getExchange(session, alternateExchangeName); |
| exchange.setAlternateExchange(alternate); |
| } |
| |
| if (exchange.isDurable()) |
| { |
| DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); |
| store.createExchange(exchange); |
| } |
| |
| exchangeRegistry.registerExchange(exchange); |
| } |
| catch(AMQUnknownExchangeType e) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot declare exchange '" + exchangeName); |
| } |
| } |
| else |
| { |
| if(!exchange.getTypeShortString().toString().equals(method.getType())) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); |
| } |
| } |
| |
| } |
| } |
| |
| // TODO decouple AMQException and AMQConstant error codes |
| private void exception(Session session, Method method, AMQException exception, String message) |
| { |
| ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; |
| if (exception.getErrorCode() != null) |
| { |
| try |
| { |
| errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode()); |
| } |
| catch (IllegalArgumentException iae) |
| { |
| // ignore, already set to INTERNAL_ERROR |
| } |
| } |
| String description = message + "': " + exception.getMessage(); |
| |
| exception(session, method, errorCode, description); |
| } |
| |
| private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) |
| { |
| ExecutionException ex = new ExecutionException(); |
| ex.setErrorCode(errorCode); |
| ex.setCommandId(method.getId()); |
| ex.setDescription(description); |
| |
| session.invoke(ex); |
| |
| session.close(); |
| } |
| |
| private Exchange getExchange(Session session, String exchangeName) |
| { |
| ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); |
| return exchangeRegistry.getExchange(exchangeName); |
| } |
| |
| private ExchangeRegistry getExchangeRegistry(Session session) |
| { |
| VirtualHost virtualHost = getVirtualHost(session); |
| return virtualHost.getExchangeRegistry(); |
| |
| } |
| |
| private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) |
| { |
| final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); |
| Exchange exchange; |
| if(xfr.hasDestination()) |
| { |
| exchange = exchangeRegistry.getExchange(xfr.getDestination()); |
| if(exchange == null) |
| { |
| exchange = exchangeRegistry.getDefaultExchange(); |
| } |
| } |
| else |
| { |
| exchange = exchangeRegistry.getDefaultExchange(); |
| } |
| return exchange; |
| } |
| |
| private VirtualHost getVirtualHost(Session session) |
| { |
| ServerConnection conn = getServerConnection(session); |
| VirtualHost vhost = conn.getVirtualHost(); |
| return vhost; |
| } |
| |
| private ServerConnection getServerConnection(Session session) |
| { |
| ServerConnection conn = (ServerConnection) session.getConnection(); |
| return conn; |
| } |
| |
| @Override |
| public void exchangeDelete(Session session, ExchangeDelete method) |
| { |
| VirtualHost virtualHost = getVirtualHost(session); |
| ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); |
| |
| try |
| { |
| if (nameNullOrEmpty(method.getExchange())) |
| { |
| exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange"); |
| return; |
| } |
| |
| Exchange exchange = getExchange(session, method.getExchange()); |
| |
| if(exchange == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "No such exchange '" + method.getExchange() + "'"); |
| } |
| else if(exchange.hasReferrers()) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); |
| } |
| else if(isStandardExchange(exchange, virtualHost.getExchangeFactory().getRegisteredTypes())) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); |
| } |
| else |
| { |
| exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); |
| |
| if (exchange.isDurable() && !exchange.isAutoDelete()) |
| { |
| DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); |
| store.removeExchange(exchange); |
| } |
| } |
| } |
| catch (ExchangeInUseException e) |
| { |
| exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot delete exchange '" + method.getExchange() ); |
| } |
| } |
| |
| private boolean nameNullOrEmpty(String name) |
| { |
| if(name == null || name.length() == 0) |
| { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes) |
| { |
| for(ExchangeType type : registeredTypes) |
| { |
| if(type.getDefaultExchangeName().toString().equals( exchange.getName() )) |
| { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void exchangeQuery(Session session, ExchangeQuery method) |
| { |
| |
| ExchangeQueryResult result = new ExchangeQueryResult(); |
| |
| Exchange exchange = getExchange(session, method.getName()); |
| |
| if(exchange != null) |
| { |
| result.setDurable(exchange.isDurable()); |
| result.setType(exchange.getTypeShortString().toString()); |
| result.setNotFound(false); |
| } |
| else |
| { |
| result.setNotFound(true); |
| } |
| |
| session.executionResult((int) method.getId(), result); |
| } |
| |
| @Override |
| public void exchangeBind(Session session, ExchangeBind method) |
| { |
| |
| VirtualHost virtualHost = getVirtualHost(session); |
| ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); |
| QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); |
| |
| if (!method.hasQueue()) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); |
| } |
| else if (nameNullOrEmpty(method.getExchange())) |
| { |
| exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); |
| } |
| /* |
| else if (!method.hasBindingKey()) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); |
| } |
| */ |
| else |
| { |
| //TODO - here because of non-compiant python tests |
| if (!method.hasBindingKey()) |
| { |
| method.setBindingKey(method.getQueue()); |
| } |
| AMQQueue queue = queueRegistry.getQueue(method.getQueue()); |
| Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); |
| if(queue == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); |
| } |
| else if(exchange == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); |
| } |
| else if(exchange.getTypeShortString().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) |
| { |
| exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); |
| } |
| else |
| { |
| AMQShortString routingKey = new AMQShortString(method.getBindingKey()); |
| FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments()); |
| |
| if (!exchange.isBound(routingKey, fieldTable, queue)) |
| { |
| try |
| { |
| virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot add binding '" + method.getBindingKey()); |
| } |
| } |
| else |
| { |
| // todo |
| } |
| } |
| |
| |
| } |
| |
| |
| |
| } |
| |
| @Override |
| public void exchangeUnbind(Session session, ExchangeUnbind method) |
| { |
| VirtualHost virtualHost = getVirtualHost(session); |
| ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); |
| QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); |
| |
| if (!method.hasQueue()) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); |
| } |
| else if (nameNullOrEmpty(method.getExchange())) |
| { |
| exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange"); |
| } |
| else if (!method.hasBindingKey()) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); |
| } |
| else |
| { |
| AMQQueue queue = queueRegistry.getQueue(method.getQueue()); |
| Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); |
| if(queue == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); |
| } |
| else if(exchange == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); |
| } |
| else |
| { |
| try |
| { |
| virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot remove binding '" + method.getBindingKey()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void exchangeBound(Session session, ExchangeBound method) |
| { |
| |
| ExchangeBoundResult result = new ExchangeBoundResult(); |
| Exchange exchange; |
| AMQQueue queue; |
| if(method.hasExchange()) |
| { |
| exchange = getExchange(session, method.getExchange()); |
| |
| if(exchange == null) |
| { |
| result.setExchangeNotFound(true); |
| } |
| } |
| else |
| { |
| exchange = getExchangeRegistry(session).getDefaultExchange(); |
| } |
| |
| |
| if(method.hasQueue()) |
| { |
| |
| queue = getQueue(session, method.getQueue()); |
| if(queue == null) |
| { |
| result.setQueueNotFound(true); |
| } |
| |
| |
| if(exchange != null && queue != null) |
| { |
| |
| boolean queueMatched = exchange.isBound(queue); |
| |
| result.setQueueNotMatched(!queueMatched); |
| |
| |
| if(method.hasBindingKey()) |
| { |
| |
| if(method.hasArguments()) |
| { |
| FieldTable args = FieldTable.convertToFieldTable(method.getArguments()); |
| |
| result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue)); |
| } |
| if(queueMatched) |
| { |
| result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); |
| } |
| else |
| { |
| result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); |
| } |
| } |
| else if (method.hasArguments()) |
| { |
| // TODO |
| |
| } |
| |
| result.setQueueNotMatched(!exchange.isBound(queue)); |
| |
| } |
| else if(exchange != null && method.hasBindingKey()) |
| { |
| if(method.hasArguments()) |
| { |
| // TODO |
| } |
| result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); |
| |
| } |
| |
| } |
| else if(exchange != null && method.hasBindingKey()) |
| { |
| if(method.hasArguments()) |
| { |
| // TODO |
| } |
| result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); |
| |
| } |
| |
| |
| session.executionResult((int) method.getId(), result); |
| |
| |
| } |
| |
| private AMQQueue getQueue(Session session, String queue) |
| { |
| QueueRegistry queueRegistry = getQueueRegistry(session); |
| return queueRegistry.getQueue(queue); |
| } |
| |
| private QueueRegistry getQueueRegistry(Session session) |
| { |
| return getVirtualHost(session).getQueueRegistry(); |
| } |
| |
| @Override |
| public void queueDeclare(Session session, final QueueDeclare method) |
| { |
| |
| VirtualHost virtualHost = getVirtualHost(session); |
| DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); |
| |
| String queueName = method.getQueue(); |
| AMQQueue queue; |
| QueueRegistry queueRegistry = getQueueRegistry(session); |
| //TODO: do we need to check that the queue already exists with exactly the same "configuration"? |
| |
| synchronized (queueRegistry) |
| { |
| |
| if (((queue = queueRegistry.getQueue(queueName)) == null)) |
| { |
| |
| if (method.getPassive()) |
| { |
| String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; |
| ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; |
| |
| exception(session, method, errorCode, description); |
| |
| return; |
| } |
| else |
| { |
| try |
| { |
| queue = createQueue(queueName, method, virtualHost, (ServerSession)session); |
| if(method.getExclusive()) |
| { |
| queue.setExclusive(true); |
| } |
| else if(method.getAutoDelete()) |
| { |
| queue.setDeleteOnNoConsumers(true); |
| } |
| |
| final String alternateExchangeName = method.getAlternateExchange(); |
| if(alternateExchangeName != null && alternateExchangeName.length() != 0) |
| { |
| Exchange alternate = getExchange(session, alternateExchangeName); |
| queue.setAlternateExchange(alternate); |
| } |
| |
| if(method.hasArguments() && method.getArguments() != null) |
| { |
| if(method.getArguments().containsKey("no-local")) |
| { |
| Object no_local = method.getArguments().get("no-local"); |
| if(no_local instanceof Boolean && ((Boolean)no_local)) |
| { |
| queue.setNoLocal(true); |
| } |
| } |
| } |
| |
| |
| if (queue.isDurable() && !queue.isAutoDelete()) |
| { |
| if(method.hasArguments() && method.getArguments() != null) |
| { |
| Map<String,Object> args = method.getArguments(); |
| FieldTable ftArgs = new FieldTable(); |
| for(Map.Entry<String, Object> entry : args.entrySet()) |
| { |
| ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); |
| } |
| store.createQueue(queue, ftArgs); |
| } |
| else |
| { |
| store.createQueue(queue); |
| } |
| } |
| queueRegistry.registerQueue(queue); |
| boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister(); |
| |
| if (autoRegister) |
| { |
| |
| ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); |
| |
| Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); |
| |
| virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null); |
| |
| } |
| |
| if (method.hasAutoDelete() |
| && method.getAutoDelete() |
| && method.hasExclusive() |
| && method.getExclusive()) |
| { |
| final AMQQueue q = queue; |
| final ServerSession.Task deleteQueueTask = new ServerSession.Task() |
| { |
| public void doTask(ServerSession session) |
| { |
| try |
| { |
| q.delete(); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot delete '" + method.getQueue()); |
| } |
| } |
| }; |
| final ServerSession s = (ServerSession) session; |
| s.addSessionCloseTask(deleteQueueTask); |
| queue.addQueueDeleteTask(new AMQQueue.Task() |
| { |
| public void doTask(AMQQueue queue) throws AMQException |
| { |
| s.removeSessionCloseTask(deleteQueueTask); |
| } |
| }); |
| } |
| if (method.hasExclusive() |
| && method.getExclusive()) |
| { |
| final AMQQueue q = queue; |
| final ServerSession.Task removeExclusive = new ServerSession.Task() |
| { |
| public void doTask(ServerSession session) |
| { |
| q.setAuthorizationHolder(null); |
| q.setExclusiveOwningSession(null); |
| } |
| }; |
| final ServerSession s = (ServerSession) session; |
| q.setExclusiveOwningSession(s); |
| s.addSessionCloseTask(removeExclusive); |
| queue.addQueueDeleteTask(new AMQQueue.Task() |
| { |
| public void doTask(AMQQueue queue) throws AMQException |
| { |
| s.removeSessionCloseTask(removeExclusive); |
| } |
| }); |
| } |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot declare queue '" + queueName); |
| } |
| } |
| } |
| else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) |
| { |
| String description = "Cannot declare queue('" + queueName + "')," |
| + " as exclusive queue with same name " |
| + "declared on another session"; |
| ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; |
| |
| exception(session, method, errorCode, description); |
| |
| return; |
| } |
| } |
| } |
| |
| protected AMQQueue createQueue(final String queueName, |
| final QueueDeclare body, |
| VirtualHost virtualHost, |
| final ServerSession session) |
| throws AMQException |
| { |
| String owner = body.getExclusive() ? session.getClientID() : null; |
| |
| final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), |
| body.getExclusive(), virtualHost, body.getArguments()); |
| |
| return queue; |
| } |
| |
| @Override |
| public void queueDelete(Session session, QueueDelete method) |
| { |
| String queueName = method.getQueue(); |
| if(queueName == null || queueName.length()==0) |
| { |
| exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); |
| |
| } |
| else |
| { |
| AMQQueue queue = getQueue(session, queueName); |
| |
| |
| if (queue == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); |
| } |
| else |
| { |
| if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) |
| { |
| exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); |
| } |
| else if (method.getIfEmpty() && !queue.isEmpty()) |
| { |
| exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty"); |
| } |
| else if (method.getIfUnused() && !queue.isUnused()) |
| { |
| // TODO - Error code |
| exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use"); |
| |
| } |
| else |
| { |
| VirtualHost virtualHost = getVirtualHost(session); |
| |
| try |
| { |
| queue.delete(); |
| if (queue.isDurable() && !queue.isAutoDelete()) |
| { |
| DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); |
| store.removeQueue(queue); |
| } |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot delete queue '" + queueName); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void queuePurge(Session session, QueuePurge method) |
| { |
| String queueName = method.getQueue(); |
| if(queueName == null || queueName.length()==0) |
| { |
| exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied"); |
| } |
| else |
| { |
| AMQQueue queue = getQueue(session, queueName); |
| |
| if (queue == null) |
| { |
| exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); |
| } |
| else |
| { |
| try |
| { |
| queue.clearQueue(); |
| } |
| catch (AMQException e) |
| { |
| exception(session, method, e, "Cannot purge queue '" + queueName); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void queueQuery(Session session, QueueQuery method) |
| { |
| QueueQueryResult result = new QueueQueryResult(); |
| |
| AMQQueue queue = getQueue(session, method.getQueue()); |
| |
| if(queue != null) |
| { |
| result.setQueue(queue.getNameShortString().toString()); |
| result.setDurable(queue.isDurable()); |
| result.setExclusive(queue.isExclusive()); |
| result.setAutoDelete(queue.isAutoDelete()); |
| result.setArguments(queue.getArguments()); |
| result.setMessageCount(queue.getMessageCount()); |
| result.setSubscriberCount(queue.getConsumerCount()); |
| |
| } |
| |
| |
| session.executionResult((int) method.getId(), result); |
| |
| } |
| |
| @Override |
| public void messageSetFlowMode(Session session, MessageSetFlowMode sfm) |
| { |
| String destination = sfm.getDestination(); |
| |
| Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); |
| |
| if(sub == null) |
| { |
| exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); |
| } |
| else if(sub.isStopped()) |
| { |
| sub.setFlowMode(sfm.getFlowMode()); |
| } |
| } |
| |
| @Override |
| public void messageStop(Session session, MessageStop stop) |
| { |
| String destination = stop.getDestination(); |
| |
| Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); |
| |
| if(sub == null) |
| { |
| exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); |
| } |
| else |
| { |
| sub.stop(); |
| } |
| |
| } |
| |
| @Override |
| public void messageFlow(Session session, MessageFlow flow) |
| { |
| String destination = flow.getDestination(); |
| |
| Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); |
| |
| if(sub == null) |
| { |
| exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); |
| } |
| else |
| { |
| sub.addCredit(flow.getUnit(), flow.getValue()); |
| } |
| |
| } |
| |
| @Override |
| public void closed(Session session) |
| { |
| setThreadSubject(session); |
| |
| for(Subscription_0_10 sub : getSubscriptions(session)) |
| { |
| ((ServerSession)session).unregister(sub); |
| } |
| ((ServerSession)session).onClose(); |
| } |
| |
| @Override |
| public void detached(Session session) |
| { |
| closed(session); |
| } |
| |
| public Collection<Subscription_0_10> getSubscriptions(Session session) |
| { |
| return ((ServerSession)session).getSubscriptions(); |
| } |
| |
| private void setThreadSubject(Session session) |
| { |
| final ServerConnection scon = (ServerConnection) session.getConnection(); |
| SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); |
| } |
| } |