| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.client; |
| |
| import static org.apache.qpid.transport.Option.NONE; |
| import static org.apache.qpid.transport.Option.SYNC; |
| import static org.apache.qpid.transport.Option.UNRELIABLE; |
| |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import javax.jms.DeliveryMode; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.client.AMQDestination.DestSyntax; |
| import org.apache.qpid.client.message.AMQMessageDelegate_0_10; |
| import org.apache.qpid.client.message.AbstractJMSMessage; |
| import org.apache.qpid.client.message.QpidMessageProperties; |
| import org.apache.qpid.client.protocol.AMQProtocolHandler; |
| import org.apache.qpid.messaging.QpidDestination.CheckMode; |
| import org.apache.qpid.messaging.address.Link.Reliability; |
| import org.apache.qpid.transport.DeliveryProperties; |
| import org.apache.qpid.transport.Header; |
| import org.apache.qpid.transport.MessageAcceptMode; |
| import org.apache.qpid.transport.MessageAcquireMode; |
| import org.apache.qpid.transport.MessageDeliveryMode; |
| import org.apache.qpid.transport.MessageDeliveryPriority; |
| import org.apache.qpid.transport.MessageProperties; |
| import org.apache.qpid.transport.Option; |
| import org.apache.qpid.transport.TransportException; |
| import org.apache.qpid.util.Strings; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This is a 0_10 message producer. |
| */ |
| public class BasicMessageProducer_0_10 extends BasicMessageProducer |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class); |
| private byte[] userIDBytes; |
| |
| BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, |
| AMQSession session, AMQProtocolHandler protocolHandler, long producerId, |
| boolean immediate, boolean mandatory) throws AMQException |
| { |
| super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); |
| |
| userIDBytes = Strings.toUTF8(_userID); |
| } |
| |
| void declareDestination(AMQDestination destination) throws AMQException |
| { |
| if (destination.getDestSyntax() == DestSyntax.BURL) |
| { |
| if (getSession().isDeclareExchanges()) |
| { |
| String name = destination.getExchangeName().toString(); |
| ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare |
| (name, |
| destination.getExchangeClass().toString(), |
| null, null, |
| name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); |
| } |
| } |
| else |
| { |
| try |
| { |
| AddressBasedDestination addrDest = (AddressBasedDestination)destination; |
| addrDest.resolveAddress((AMQSession_0_10)getSession()); |
| addrDest.create((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); |
| addrDest.azzert((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); |
| } |
| catch(Exception e) |
| { |
| AMQException ex = new AMQException("Exception occured while verifying destination",e); |
| throw ex; |
| } |
| } |
| } |
| |
| //--- Overwritten methods |
| |
| /** |
| * Sends a message to a given destination |
| */ |
| void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, |
| UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, |
| boolean immediate) throws JMSException |
| { |
| message.prepareForSending(); |
| |
| AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate(); |
| |
| DeliveryProperties deliveryProp = delegate.getDeliveryProperties(); |
| MessageProperties messageProps = delegate.getMessageProperties(); |
| |
| // On the receiving side, this will be read in to the JMSXUserID as well. |
| messageProps.setUserId(userIDBytes); |
| |
| if (messageId != null) |
| { |
| messageProps.setMessageId(messageId); |
| } |
| else if (messageProps.hasMessageId()) |
| { |
| messageProps.clearMessageId(); |
| } |
| |
| long currentTime = 0; |
| if (timeToLive > 0 || !_disableTimestamps) |
| { |
| currentTime = System.currentTimeMillis(); |
| } |
| |
| if (timeToLive > 0) |
| { |
| deliveryProp.setTtl(timeToLive); |
| message.setJMSExpiration(currentTime + timeToLive); |
| } |
| |
| if (!_disableTimestamps) |
| { |
| |
| deliveryProp.setTimestamp(currentTime); |
| message.setJMSTimestamp(currentTime); |
| } |
| |
| if (!deliveryProp.hasDeliveryMode() || deliveryProp.getDeliveryMode().getValue() != deliveryMode) |
| { |
| MessageDeliveryMode mode; |
| switch (deliveryMode) |
| { |
| case DeliveryMode.PERSISTENT: |
| mode = MessageDeliveryMode.PERSISTENT; |
| break; |
| case DeliveryMode.NON_PERSISTENT: |
| mode = MessageDeliveryMode.NON_PERSISTENT; |
| break; |
| default: |
| throw new IllegalArgumentException("illegal delivery mode: " + deliveryMode); |
| } |
| deliveryProp.setDeliveryMode(mode); |
| message.setJMSDeliveryMode(deliveryMode); |
| } |
| if (!deliveryProp.hasPriority() || deliveryProp.getPriority().getValue() != priority) |
| { |
| deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); |
| message.setJMSPriority(priority); |
| } |
| |
| String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(); |
| if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) |
| { |
| deliveryProp.setExchange(exchangeName); |
| } |
| |
| String routingKey = destination.getRoutingKey().toString(); |
| if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey)) |
| { |
| deliveryProp.setRoutingKey(routingKey); |
| } |
| |
| if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && |
| (((AddressBasedDestination)destination).getAddress().getSubject() != null || |
| (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null)) |
| ) |
| { |
| Map<String,Object> appProps = messageProps.getApplicationHeaders(); |
| if (appProps == null) |
| { |
| appProps = new HashMap<String,Object>(); |
| messageProps.setApplicationHeaders(appProps); |
| } |
| |
| if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null) |
| { |
| // use default subject in address string |
| appProps.put(QpidMessageProperties.QPID_SUBJECT, |
| ((AddressBasedDestination)destination).getAddress().getSubject()); |
| } |
| |
| if (((AddressBasedDestination)destination).isTopic()) |
| { |
| deliveryProp.setRoutingKey((String) |
| messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT)); |
| } |
| } |
| |
| ByteBuffer data = message.getData(); |
| messageProps.setContentLength(data.remaining()); |
| |
| // send the message |
| try |
| { |
| org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session) |
| ((AMQSession_0_10) getSession()).getQpidSession(); |
| |
| // if true, we need to sync the delivery of this message |
| boolean sync = false; |
| |
| sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || |
| (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && |
| deliveryMode == DeliveryMode.PERSISTENT) |
| ); |
| |
| boolean unreliable = false; //(destination.getDestSyntax() == DestSyntax.ADDR) && |
| // (destination.getLink().getReliability() == Reliability.UNRELIABLE); |
| |
| |
| ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice(); |
| |
| ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), |
| MessageAcceptMode.NONE, |
| MessageAcquireMode.PRE_ACQUIRED, |
| new Header(deliveryProp, messageProps), |
| buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE); |
| if (sync) |
| { |
| ssn.sync(); |
| ((AMQSession_0_10) getSession()).getCurrentException(); |
| } |
| |
| } |
| catch (Exception e) |
| { |
| JMSException jmse = new JMSException("Exception when sending message"); |
| jmse.setLinkedException(e); |
| jmse.initCause(e); |
| throw jmse; |
| } |
| } |
| |
| @Override |
| public boolean isBound(AMQDestination destination) throws JMSException |
| { |
| return _session.isQueueBound(destination); |
| } |
| |
| @Override |
| public void close() throws JMSException |
| { |
| super.close(); |
| AMQDestination dest = _destination; |
| if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) |
| { |
| try |
| { |
| ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); |
| } |
| catch(TransportException e) |
| { |
| throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); |
| } |
| |
| } |
| } |
| |
| } |
| |