| /* Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.client; |
| |
| import java.io.IOException; |
| import java.net.URISyntaxException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.nio.ByteBuffer; |
| |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.DeliveryMode; |
| |
| import org.apache.qpid.client.message.AbstractJMSMessage; |
| import org.apache.qpid.client.message.FiledTableSupport; |
| import org.apache.qpid.client.message.AMQMessageDelegate_0_10; |
| import org.apache.qpid.client.protocol.AMQProtocolHandler; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.url.AMQBindingURL; |
| import org.apache.qpid.util.Strings; |
| import org.apache.qpid.njms.ExceptionHelper; |
| import org.apache.qpid.transport.*; |
| import static org.apache.qpid.transport.Option.*; |
| |
| /** |
| * This is a 0_10 message producer. |
| */ |
| public class BasicMessageProducer_0_10 extends BasicMessageProducer |
| { |
| private byte[] userIDBytes; |
| |
| BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, |
| AMQSession session, AMQProtocolHandler protocolHandler, long producerId, |
| boolean immediate, boolean mandatory, boolean waitUntilSent) |
| { |
| super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, |
| mandatory, waitUntilSent); |
| |
| userIDBytes = Strings.toUTF8(_userID); |
| } |
| |
| void declareDestination(AMQDestination destination) |
| { |
| ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(), |
| destination.getExchangeClass().toString(), |
| null, null); |
| } |
| |
| //--- Overwritten methods |
| |
| /** |
| * Sends a message to a given destination |
| */ |
| void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, |
| UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, |
| boolean immediate, boolean wait) throws JMSException |
| { |
| message.prepareForSending(); |
| |
| AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate(); |
| |
| DeliveryProperties deliveryProp = delegate.getDeliveryProperties(); |
| MessageProperties messageProps = delegate.getMessageProperties(); |
| |
| // On the receiving side, this will be read in to the JMSXUserID as well. |
| messageProps.setUserId(userIDBytes); |
| |
| if (messageId != null) |
| { |
| messageProps.setMessageId(messageId); |
| } |
| else if (messageProps.hasMessageId()) |
| { |
| messageProps.clearMessageId(); |
| } |
| |
| if (!_disableTimestamps) |
| { |
| final long currentTime = System.currentTimeMillis(); |
| deliveryProp.setTimestamp(currentTime); |
| if (timeToLive > 0) |
| { |
| deliveryProp.setExpiration(currentTime + timeToLive); |
| message.setJMSExpiration(currentTime + timeToLive); |
| } |
| else |
| { |
| deliveryProp.setExpiration(0); |
| message.setJMSExpiration(0); |
| } |
| message.setJMSTimestamp(currentTime); |
| } |
| |
| if (!deliveryProp.hasDeliveryMode() || deliveryProp.getDeliveryMode().getValue() != deliveryMode) |
| { |
| MessageDeliveryMode mode; |
| switch (deliveryMode) |
| { |
| case DeliveryMode.PERSISTENT: |
| mode = MessageDeliveryMode.PERSISTENT; |
| break; |
| case DeliveryMode.NON_PERSISTENT: |
| mode = MessageDeliveryMode.NON_PERSISTENT; |
| break; |
| default: |
| throw new IllegalArgumentException("illegal delivery mode: " + deliveryMode); |
| } |
| deliveryProp.setDeliveryMode(mode); |
| message.setJMSDeliveryMode(deliveryMode); |
| } |
| if (!deliveryProp.hasPriority() || deliveryProp.getPriority().getValue() != priority) |
| { |
| deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); |
| message.setJMSPriority(priority); |
| } |
| String exchangeName = destination.getExchangeName().toString(); |
| if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) |
| { |
| deliveryProp.setExchange(exchangeName); |
| } |
| String routingKey = destination.getRoutingKey().toString(); |
| if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey)) |
| { |
| deliveryProp.setRoutingKey(routingKey); |
| } |
| |
| messageProps.setContentLength(message.getContentLength()); |
| |
| // send the message |
| try |
| { |
| org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session) |
| ((AMQSession_0_10) getSession()).getQpidSession(); |
| |
| // if true, we need to sync the delivery of this message |
| boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && |
| getSession().getAMQConnection().getSyncPersistence()); |
| |
| org.apache.mina.common.ByteBuffer data = message.getData(); |
| ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); |
| |
| ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE, |
| MessageAcquireMode.PRE_ACQUIRED, |
| new Header(deliveryProp, messageProps), |
| buffer, sync ? SYNC : NONE); |
| if (sync) |
| { |
| ssn.sync(); |
| } |
| |
| |
| } |
| catch (RuntimeException rte) |
| { |
| JMSException ex = new JMSException("Exception when sending message"); |
| rte.printStackTrace(); |
| ex.setLinkedException(rte); |
| throw ex; |
| } |
| } |
| |
| |
| public boolean isBound(AMQDestination destination) throws JMSException |
| { |
| return _session.isQueueBound(destination); |
| } |
| } |
| |