blob: db5aea66bd7e0c543f5ce94fc42d7e908702c3b4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.url.AMQBindingURL;
@PluggableService
public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
{
@Override
public Class<AMQMessage> getInputClass()
{
return AMQMessage.class;
}
protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage,
final Section bodySection,
SectionEncoder sectionEncoder)
{
List<Section> sections = new ArrayList<>(3);
Header header = new Header();
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader =
serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
final long expiration = serverMessage.getExpiration();
final long arrivalTime = serverMessage.getArrivalTime();
if(expiration > arrivalTime)
{
header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
}
sections.add(header);
Properties props = new Properties();
/*
TODO: The following properties are not currently set:
creationTime
groupId
groupSequence
replyToGroupId
to
*/
props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
// Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
{
props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
}
final AMQShortString correlationId = contentHeader.getCorrelationId();
if(correlationId != null)
{
props.setCorrelationId(new Binary(correlationId.getBytes()));
}
final AMQShortString messageId = contentHeader.getMessageId();
if(messageId != null)
{
props.setMessageId(new Binary(messageId.getBytes()));
}
final String originalReplyTo = String.valueOf(contentHeader.getReplyTo());
try
{
AMQBindingURL burl = new AMQBindingURL(originalReplyTo);
String replyTo;
if(burl.getExchangeName() != null && !burl.getExchangeName().equals(""))
{
replyTo = burl.getExchangeName();
if(burl.getRoutingKey() != null)
{
replyTo += "/" + burl.getRoutingKey();
}
}
else if(burl.getQueueName() != null && !burl.getQueueName().equals(""))
{
replyTo = burl.getQueueName();
}
else if(burl.getRoutingKey() != null)
{
replyTo = burl.getRoutingKey();
}
else
{
replyTo = originalReplyTo;
}
props.setReplyTo(replyTo);
}
catch (URISyntaxException e)
{
props.setReplyTo(originalReplyTo);
}
props.setSubject(serverMessage.getInitialRoutingAddress());
if(contentHeader.getUserId() != null)
{
props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
}
sections.add(props);
Map<String, Object> applicationProperties = FieldTable.convertToMap(contentHeader.getHeaders());
if(applicationProperties.containsKey("qpid.subject"))
{
props.setSubject(String.valueOf(applicationProperties.get("qpid.subject")));
applicationProperties = new LinkedHashMap<>(applicationProperties);
applicationProperties.remove("qpid.subject");
}
// TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application
// Adhere to "the values are restricted to be of simple types only, that is, excluding map, list, and array types".
// 0-8..0-91 for instance suppoorted field tables with maps as values.
sections.add(new ApplicationProperties(applicationProperties));
if(bodySection != null)
{
sections.add(bodySection);
}
return new MessageMetaData_1_0(sections, sectionEncoder);
}
@Override
public String getType()
{
return "v0-8 to v1-0";
}
}