blob: ea4b49e79e2bb9566ef3c5755db8c9588f689505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.jms.domain.message;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.rocketmq.jms.domain.JmsBaseConstant;
import org.apache.rocketmq.jms.util.ExceptionUtil;
public class JmsBaseMessage implements Message {
/**
* Message properties
*/
protected Map<String, Object> properties = Maps.newHashMap();
/**
* Message headers
*/
protected Map<String, Object> headers = Maps.newHashMap();
/**
* Message body
*/
protected Serializable body;
@Override
public String getJMSMessageID() {
return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
}
/**
* Sets the message ID.
* <p/>
* <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself.
*
* @param id the ID of the message
* @see javax.jms.Message#getJMSMessageID()
*/
@Override
public void setJMSMessageID(String id) {
ExceptionUtil.handleUnSupportedException();
}
@Override
public long getJMSTimestamp() {
if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
}
return 0;
}
@Override
public void setJMSTimestamp(long timestamp) {
ExceptionUtil.handleUnSupportedException();
}
@Override
public byte[] getJMSCorrelationIDAsBytes() {
String jmsCorrelationID = getJMSCorrelationID();
if (jmsCorrelationID != null) {
try {
return BaseEncoding.base64().decode(jmsCorrelationID);
}
catch (Exception e) {
return jmsCorrelationID.getBytes();
}
}
return null;
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
String encodedText = BaseEncoding.base64().encode(correlationID);
setJMSCorrelationID(encodedText);
}
@Override
public String getJMSCorrelationID() {
if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
}
return null;
}
@Override
public void setJMSCorrelationID(String correlationID) {
ExceptionUtil.handleUnSupportedException();
}
@Override
public Destination getJMSReplyTo() {
if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
}
return null;
}
@Override
public void setJMSReplyTo(Destination replyTo) {
ExceptionUtil.handleUnSupportedException();
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
@Override
public Destination getJMSDestination() {
if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
}
return null;
}
@Override
public void setJMSDestination(Destination destination) {
ExceptionUtil.handleUnSupportedException();
}
@SuppressWarnings("unchecked")
public <T> T getBody(Class<T> clazz) throws JMSException {
if (clazz.isInstance(body)) {
return (T) body;
}
else {
throw new IllegalArgumentException("The class " + clazz
+ " is unknown to this implementation");
}
}
@Override
public int getJMSDeliveryMode() {
if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
}
return 0;
}
/**
* Sets the <CODE>DeliveryMode</CODE> value for this message.
* <p/>
* <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not
* allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method.
*
* @param deliveryMode the delivery mode for this message
* @see javax.jms.Message#getJMSDeliveryMode()
* @see javax.jms.DeliveryMode
*/
@Override
public void setJMSDeliveryMode(int deliveryMode) {
ExceptionUtil.handleUnSupportedException();
}
public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
return clazz.isInstance(body);
}
@Override
public boolean getJMSRedelivered() {
return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
&& (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
}
@Override
public void setJMSRedelivered(boolean redelivered) {
ExceptionUtil.handleUnSupportedException();
}
/**
* copy meta data from source message
*
* @param sourceMessage source message
*/
public void copyMetaData(JmsBaseMessage sourceMessage) {
if (!sourceMessage.getHeaders().isEmpty()) {
for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) {
if (!headerExits(entry.getKey())) {
setHeader(entry.getKey(), entry.getValue());
}
}
}
if (!sourceMessage.getProperties().isEmpty()) {
for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) {
if (!propertyExists(entry.getKey())) {
setObjectProperty(entry.getKey(), entry.getValue());
}
}
}
}
@Override
public String getJMSType() {
return (String) headers.get(JmsBaseConstant.JMS_TYPE);
}
@Override
public void setJMSType(String type) {
ExceptionUtil.handleUnSupportedException();
}
public Map<String, Object> getHeaders() {
return this.headers;
}
@Override
public long getJMSExpiration() {
if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
}
return 0;
}
@Override
public void setJMSExpiration(long expiration) {
ExceptionUtil.handleUnSupportedException();
}
public boolean headerExits(String name) {
return this.headers.containsKey(name);
}
@Override
public int getJMSPriority() {
if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
}
return 5;
}
@Override
public void setJMSPriority(int priority) {
ExceptionUtil.handleUnSupportedException();
}
public void setHeader(String name, Object value) {
this.headers.put(name, value);
}
public Map<String, Object> getProperties() {
return this.properties;
}
public void setProperties(Map<String, Object> properties) {
this.properties = properties;
}
@Override
public void acknowledge() throws JMSException {
throw new UnsupportedOperationException("Unsupported!");
}
@Override
public void clearProperties() {
this.properties.clear();
}
@Override
public void clearBody() {
this.body = null;
}
@Override
public boolean propertyExists(String name) {
return properties.containsKey(name);
}
@Override
public boolean getBooleanProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString());
}
return false;
}
@Override
public byte getByteProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString());
}
return 0;
}
@Override
public short getShortProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Short ? (Short) value : Short.valueOf(value.toString());
}
return 0;
}
@Override
public int getIntProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString());
}
return 0;
}
@Override
public long getLongProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Long ? (Long) value : Long.valueOf(value.toString());
}
return 0L;
}
@Override
public float getFloatProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Float ? (Float) value : Float.valueOf(value.toString());
}
return 0f;
}
@Override
public double getDoubleProperty(String name) throws JMSException {
if (propertyExists(name)) {
Object value = getObjectProperty(name);
return value instanceof Double ? (Double) value : Double.valueOf(value.toString());
}
return 0d;
}
@Override
public String getStringProperty(String name) throws JMSException {
if (propertyExists(name)) {
return getObjectProperty(name).toString();
}
return null;
}
@Override
public Object getObjectProperty(String name) throws JMSException {
return this.properties.get(name);
}
@Override
public Enumeration<?> getPropertyNames() throws JMSException {
final Object[] keys = this.properties.keySet().toArray();
return new Enumeration<Object>() {
int i;
@Override
public boolean hasMoreElements() {
return i < keys.length;
}
@Override
public Object nextElement() {
return keys[i++];
}
};
}
@Override
public void setBooleanProperty(String name, boolean value) {
setObjectProperty(name, value);
}
@Override
public void setByteProperty(String name, byte value) {
setObjectProperty(name, value);
}
@Override
public void setShortProperty(String name, short value) {
setObjectProperty(name, value);
}
@Override
public void setIntProperty(String name, int value) {
setObjectProperty(name, value);
}
@Override
public void setLongProperty(String name, long value) {
setObjectProperty(name, value);
}
public void setFloatProperty(String name, float value) {
setObjectProperty(name, value);
}
@Override
public void setDoubleProperty(String name, double value) {
setObjectProperty(name, value);
}
@Override
public void setStringProperty(String name, String value) {
setObjectProperty(name, value);
}
@Override
public void setObjectProperty(String name, Object value) {
if (value instanceof Number || value instanceof String || value instanceof Boolean) {
this.properties.put(name, value);
}
else {
throw new IllegalArgumentException(
"Value should be boolean, byte, short, int, long, float, double, and String.");
}
}
}