blob: 4015a3a368746e8e70a457416c0e32199ba3a23a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.server.url.AMQBindingURL;
import org.apache.qpid.server.util.GZIPUtils;
@PluggableService
public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
{
@Override
public Class<AMQMessage> getInputClass()
{
return AMQMessage.class;
}
@Override
protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage,
final EncodingRetainingSection<?> bodySection,
SectionEncoder sectionEncoder)
{
Header header = new Header();
Properties props = new Properties();
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader = serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
if(contentHeader.hasExpiration())
{
final long expiration = serverMessage.getExpiration();
final long arrivalTime = serverMessage.getArrivalTime();
header.setTtl(UnsignedInteger.valueOf(Math.max(0, expiration - arrivalTime)));
}
if(!GZIPUtils.GZIP_CONTENT_ENCODING.equals(contentHeader.getEncodingAsString()) && bodySection instanceof DataSection)
{
props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
}
Symbol contentType = getContentType(contentHeader.getContentTypeAsString());
props.setContentType(contentType);
final AMQShortString correlationId = contentHeader.getCorrelationId();
if(correlationId != null)
{
final byte[] correlationIdAsBytes = correlationId.getBytes();
final String correlationIdAsString = contentHeader.getCorrelationIdAsString();
if (Arrays.equals(correlationIdAsBytes, correlationIdAsString.getBytes(StandardCharsets.UTF_8)))
{
props.setCorrelationId(convertMessageId(correlationId));
}
else
{
props.setCorrelationId(correlationIdAsBytes);
}
}
final AMQShortString messageId = contentHeader.getMessageId();
if(messageId != null)
{
props.setMessageId(convertMessageId(messageId));
}
if (contentHeader.getReplyTo() != null)
{
props.setReplyTo(convertReplyTo(contentHeader.getReplyTo()));
}
if(contentHeader.getUserId() != null)
{
props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
}
if (contentHeader.hasTimestamp())
{
props.setCreationTime(new Date(contentHeader.getTimestamp()));
}
else
{
props.setCreationTime(new Date(serverMessage.getArrivalTime()));
}
if (contentHeader.getType() != null)
{
props.setSubject(contentHeader.getType().toString());
}
Map<String, Object> applicationPropertiesMap = new LinkedHashMap<>(contentHeader.getHeadersAsMap());
if(applicationPropertiesMap.containsKey("qpid.subject"))
{
props.setSubject(String.valueOf(applicationPropertiesMap.get("qpid.subject")));
applicationPropertiesMap.remove("qpid.subject");
}
if(applicationPropertiesMap.containsKey("JMSXGroupID"))
{
props.setGroupId(String.valueOf(applicationPropertiesMap.get("JMSXGroupID")));
applicationPropertiesMap.remove("JMSXGroupID");
}
if(applicationPropertiesMap.containsKey("JMSXGroupSeq"))
{
Object jmsxGroupSeq = applicationPropertiesMap.get("JMSXGroupSeq");
if (jmsxGroupSeq instanceof Integer)
{
props.setGroupSequence(UnsignedInteger.valueOf((Integer)jmsxGroupSeq));
applicationPropertiesMap.remove("JMSXGroupSeq");
}
}
MessagePublishInfo messagePublishInfo = serverMessage.getMessagePublishInfo();
String to = AMQShortString.toString(messagePublishInfo.getExchange());
if (messagePublishInfo.getRoutingKey() != null)
{
String routingKey = AMQShortString.toString(messagePublishInfo.getRoutingKey());
if (to != null && !"".equals(to))
{
to += "/" + routingKey;
}
else
{
to = routingKey;
}
}
props.setTo(to);
final ApplicationProperties applicationProperties;
try
{
applicationProperties = new ApplicationProperties(applicationPropertiesMap);
}
catch (IllegalArgumentException e)
{
throw new MessageConversionException("Could not convert message from 0-8 to 1.0 because headers conversion failed.", e);
}
MessageAnnotations messageAnnotations = createMessageAnnotation(bodySection,
contentHeader.getContentTypeAsString());
return new MessageMetaData_1_0(header.createEncodingRetainingSection(),
null,
messageAnnotations == null ? null : messageAnnotations.createEncodingRetainingSection(),
props.createEncodingRetainingSection(),
applicationProperties.createEncodingRetainingSection(),
null,
serverMessage.getArrivalTime(),
bodySection.getEncodedSize());
}
private Object convertMessageId(final AMQShortString messageId)
{
try
{
String messageIdAsString = messageId.toString();
if(messageIdAsString.startsWith("ID:"))
{
messageIdAsString = messageIdAsString.substring(3);
}
return UUID.fromString(messageIdAsString);
}
catch (IllegalArgumentException e)
{
return messageId.toString();
}
}
private String convertReplyTo(final AMQShortString replyTo)
{
String convertedReplyTo;
final String originalReplyTo = String.valueOf(replyTo);
try
{
AMQBindingURL burl = new AMQBindingURL(originalReplyTo);
if (burl.getExchangeName() != null && !burl.getExchangeName().equals(""))
{
convertedReplyTo = burl.getExchangeName();
if (burl.getRoutingKey() != null)
{
convertedReplyTo += "/" + burl.getRoutingKey();
}
}
else if (burl.getQueueName() != null && !burl.getQueueName().equals(""))
{
convertedReplyTo = burl.getQueueName();
}
else if (burl.getRoutingKey() != null)
{
convertedReplyTo = burl.getRoutingKey();
}
else
{
convertedReplyTo = originalReplyTo;
}
}
catch (URISyntaxException e)
{
convertedReplyTo = originalReplyTo;
}
return convertedReplyTo;
}
@Override
public String getType()
{
return "v0-8 to v1-0";
}
}