blob: 79172a7f7bf7e2ec126de1a26a92919db3726d83 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.server.util.GZIPUtils;
@PluggableService
public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<MessageTransferMessage>
{
@Override
public Class<MessageTransferMessage> getInputClass()
{
return MessageTransferMessage.class;
}
@Override
protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage,
final EncodingRetainingSection<?> bodySection,
SectionEncoder sectionEncoder)
{
Properties props = new Properties();
props.setCreationTime(new Date(serverMessage.getArrivalTime()));
final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
Header header = new Header();
if(deliveryProps != null)
{
header.setDurable(deliveryProps.hasDeliveryMode()
&& deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
if (deliveryProps.hasPriority())
{
header.setPriority(UnsignedByte.valueOf((byte) deliveryProps.getPriority().getValue()));
}
if (deliveryProps.hasTtl())
{
header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
}
else if (deliveryProps.hasExpiration())
{
long ttl = Math.max(0, deliveryProps.getExpiration() - serverMessage.getArrivalTime());
header.setTtl(UnsignedInteger.valueOf(ttl));
}
if(deliveryProps.hasTimestamp())
{
props.setCreationTime(new Date(deliveryProps.getTimestamp()));
}
String to = deliveryProps.getExchange();
if (deliveryProps.getRoutingKey() != null)
{
String routingKey = deliveryProps.getRoutingKey();
if (to != null && !"".equals(to))
{
to += "/" + routingKey;
}
else
{
to = routingKey;
}
}
props.setTo(to);
}
ApplicationProperties applicationProperties = null;
String originalContentMimeType = null;
if(msgProps != null)
{
if(msgProps.hasContentEncoding()
&& !GZIPUtils.GZIP_CONTENT_ENCODING.equals(msgProps.getContentEncoding())
&& bodySection instanceof DataSection)
{
props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
}
if(msgProps.hasCorrelationId())
{
CharsetDecoder charsetDecoder = StandardCharsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT);
try
{
String correlationIdAsString = charsetDecoder.decode(ByteBuffer.wrap(msgProps.getCorrelationId())).toString();
props.setCorrelationId(correlationIdAsString);
}
catch (CharacterCodingException e)
{
props.setCorrelationId(new Binary(msgProps.getCorrelationId()));
}
}
if(msgProps.hasMessageId())
{
props.setMessageId(msgProps.getMessageId());
}
if(msgProps.hasReplyTo())
{
ReplyTo replyTo = msgProps.getReplyTo();
String to = null;
if (replyTo.hasExchange() && !"".equals(replyTo.getExchange()))
{
to = replyTo.getExchange();
}
if (replyTo.hasRoutingKey())
{
if (to != null)
{
to += "/" + replyTo.getRoutingKey();
}
else
{
to = replyTo.getRoutingKey();
}
}
props.setReplyTo(to);
}
if(msgProps.hasContentType())
{
originalContentMimeType = msgProps.getContentType();
final Symbol contentType =
MessageConverter_to_1_0.getContentType(originalContentMimeType);
props.setContentType(contentType);
}
if(msgProps.hasUserId())
{
props.setUserId(new Binary(msgProps.getUserId()));
}
Map<String, Object> applicationPropertiesMap = msgProps.getApplicationHeaders();
if(applicationPropertiesMap != null)
{
applicationPropertiesMap = new LinkedHashMap<>(applicationPropertiesMap);
if (applicationPropertiesMap.containsKey("x-jms-type"))
{
props.setSubject(String.valueOf(applicationPropertiesMap.get("x-jms-type")));
applicationPropertiesMap.remove("x-jms-type");
}
if(applicationPropertiesMap.containsKey("qpid.subject"))
{
props.setSubject(String.valueOf(applicationPropertiesMap.get("qpid.subject")));
applicationPropertiesMap.remove("qpid.subject");
}
if(applicationPropertiesMap.containsKey("JMSXGroupID"))
{
props.setGroupId(String.valueOf(applicationPropertiesMap.get("JMSXGroupID")));
applicationPropertiesMap.remove("JMSXGroupID");
}
if(applicationPropertiesMap.containsKey("JMSXGroupSeq"))
{
Object jmsxGroupSeq = applicationPropertiesMap.get("JMSXGroupSeq");
if (jmsxGroupSeq instanceof Integer)
{
props.setGroupSequence(UnsignedInteger.valueOf((Integer)jmsxGroupSeq));
applicationPropertiesMap.remove("JMSXGroupSeq");
}
}
try
{
applicationProperties = new ApplicationProperties(applicationPropertiesMap);
}
catch (IllegalArgumentException e)
{
throw new MessageConversionException("Could not convert message from 0-10 to 1.0 because application headers conversion failed.", e);
}
}
}
final MessageAnnotations messageAnnotation =
MessageConverter_to_1_0.createMessageAnnotation(bodySection, originalContentMimeType);
return new MessageMetaData_1_0(header.createEncodingRetainingSection(),
null,
messageAnnotation == null ? null : messageAnnotation.createEncodingRetainingSection(),
props.createEncodingRetainingSection(),
applicationProperties == null ? null : applicationProperties.createEncodingRetainingSection(),
null,
serverMessage.getArrivalTime(),
bodySection.getEncodedSize());
}
@Override
public String getType()
{
return "v0-10 to v1-0";
}
}