| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.protocol.v1_0; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler; |
| import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; |
| import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; |
| import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; |
| import org.apache.qpid.server.protocol.v1_0.type.Section; |
| import org.apache.qpid.server.protocol.v1_0.type.Symbol; |
| import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; |
| import org.apache.qpid.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.message.AMQMessageHeader; |
| import org.apache.qpid.server.plugin.MessageMetaDataType; |
| import org.apache.qpid.server.store.StorableMessageMetaData; |
| import org.apache.qpid.server.util.ConnectionScopedRuntimeException; |
| |
| public class MessageMetaData_1_0 implements StorableMessageMetaData |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(MessageMetaData_1_0.class); |
| // TODO move to somewhere more useful |
| public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); |
| public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory(); |
| private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0(); |
| |
| |
| private Header _header; |
| private Properties _properties; |
| private Map _deliveryAnnotations; |
| private Map _messageAnnotations; |
| private Map _appProperties; |
| private Map _footer; |
| |
| private volatile List<QpidByteBuffer> _encodedSections = new ArrayList<>(3); |
| |
| private volatile QpidByteBuffer _encoded; |
| private MessageHeader_1_0 _messageHeader; |
| |
| |
| public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder) |
| { |
| this(sections, encodeSections(sections, encoder)); |
| } |
| |
| public Properties getPropertiesSection() |
| { |
| return _properties; |
| } |
| |
| |
| public Header getHeaderSection() |
| { |
| return _header; |
| } |
| |
| private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) |
| { |
| ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size()); |
| for(Section section : sections) |
| { |
| encoder.encodeObject(section); |
| encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer())); |
| encoder.reset(); |
| } |
| return encodedSections; |
| } |
| |
| public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder) |
| { |
| this(fragments, decoder, new ArrayList<QpidByteBuffer>(3)); |
| } |
| |
| public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections) |
| { |
| this(constructSections(fragments, decoder,immutableSections), immutableSections); |
| } |
| |
| private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections) |
| { |
| _encodedSections = encodedSections; |
| |
| Iterator<Section> sectIter = sections.iterator(); |
| |
| Section section = sectIter.hasNext() ? sectIter.next() : null; |
| if(section instanceof Header) |
| { |
| _header = (Header) section; |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| if(section instanceof DeliveryAnnotations) |
| { |
| _deliveryAnnotations = ((DeliveryAnnotations) section).getValue(); |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| if(section instanceof MessageAnnotations) |
| { |
| _messageAnnotations = ((MessageAnnotations) section).getValue(); |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| if(section instanceof Properties) |
| { |
| _properties = (Properties) section; |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| if(section instanceof ApplicationProperties) |
| { |
| _appProperties = ((ApplicationProperties) section).getValue(); |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| if(section instanceof Footer) |
| { |
| _footer = ((Footer) section).getValue(); |
| section = sectIter.hasNext() ? sectIter.next() : null; |
| } |
| |
| _messageHeader = new MessageHeader_1_0(); |
| |
| } |
| |
| private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections) |
| { |
| List<Section> sections = new ArrayList<Section>(3); |
| |
| QpidByteBuffer src; |
| if(fragments.length == 1) |
| { |
| src = fragments[0].duplicate(); |
| } |
| else |
| { |
| int size = 0; |
| for(QpidByteBuffer buf : fragments) |
| { |
| size += buf.remaining(); |
| } |
| src = QpidByteBuffer.allocateDirect(size); |
| for(QpidByteBuffer buf : fragments) |
| { |
| QpidByteBuffer duplicate = buf.duplicate(); |
| src.put(duplicate); |
| duplicate.dispose(); |
| } |
| src.flip(); |
| |
| } |
| |
| try |
| { |
| int startBarePos = -1; |
| int lastPos = src.position(); |
| Section s = decoder.readSection(src); |
| |
| |
| |
| if(s instanceof Header) |
| { |
| sections.add(s); |
| lastPos = src.position(); |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| |
| if(s instanceof DeliveryAnnotations) |
| { |
| sections.add(s); |
| lastPos = src.position(); |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| |
| if(s instanceof MessageAnnotations) |
| { |
| sections.add(s); |
| lastPos = src.position(); |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| |
| if(s instanceof Properties) |
| { |
| sections.add(s); |
| if(startBarePos == -1) |
| { |
| startBarePos = lastPos; |
| } |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| |
| if(s instanceof ApplicationProperties) |
| { |
| sections.add(s); |
| if(startBarePos == -1) |
| { |
| startBarePos = lastPos; |
| } |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| |
| if(s instanceof AmqpValue) |
| { |
| if(startBarePos == -1) |
| { |
| startBarePos = lastPos; |
| } |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| else if(s instanceof Data) |
| { |
| if(startBarePos == -1) |
| { |
| startBarePos = lastPos; |
| } |
| do |
| { |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } while(s instanceof Data); |
| } |
| else if(s instanceof AmqpSequence) |
| { |
| if(startBarePos == -1) |
| { |
| startBarePos = lastPos; |
| } |
| do |
| { |
| s = src.hasRemaining() ? decoder.readSection(src) : null; |
| } |
| while(s instanceof AmqpSequence); |
| } |
| |
| if(s instanceof Footer) |
| { |
| sections.add(s); |
| } |
| |
| |
| for(QpidByteBuffer buf : fragments) |
| { |
| encodedSections.add(buf.duplicate()); |
| } |
| |
| return sections; |
| } |
| catch (AmqpErrorException e) |
| { |
| _logger.error("Decoding read section error", e); |
| throw new IllegalArgumentException(e); |
| } |
| finally |
| { |
| src.dispose(); |
| } |
| } |
| |
| |
| public MessageMetaDataType getType() |
| { |
| return TYPE; |
| } |
| |
| |
| public int getStorableSize() |
| { |
| int size = 0; |
| |
| for(QpidByteBuffer bin : _encodedSections) |
| { |
| size += bin.limit(); |
| } |
| |
| return size; |
| } |
| |
| private QpidByteBuffer encodeAsBuffer() |
| { |
| QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize()); |
| |
| for(QpidByteBuffer bin : _encodedSections) |
| { |
| QpidByteBuffer duplicate = bin.duplicate(); |
| buf.put(duplicate); |
| duplicate.dispose(); |
| } |
| buf.flip(); |
| |
| return buf; |
| } |
| |
| public int writeToBuffer(QpidByteBuffer dest) |
| { |
| QpidByteBuffer buf = _encoded; |
| |
| if(buf == null) |
| { |
| buf = encodeAsBuffer(); |
| _encoded = buf; |
| } |
| |
| buf = buf.duplicate(); |
| |
| buf.position(0); |
| |
| if(dest.remaining() < buf.limit()) |
| { |
| buf.limit(dest.remaining()); |
| } |
| final int length = buf.limit(); |
| dest.putCopyOf(buf); |
| buf.dispose(); |
| return length; |
| } |
| |
| public int getContentSize() |
| { |
| QpidByteBuffer buf = _encoded; |
| |
| if(buf == null) |
| { |
| buf = encodeAsBuffer(); |
| _encoded = buf; |
| } |
| return buf.remaining(); |
| } |
| |
| public boolean isPersistent() |
| { |
| return _header != null && Boolean.TRUE.equals(_header.getDurable()); |
| } |
| |
| public MessageHeader_1_0 getMessageHeader() |
| { |
| return _messageHeader; |
| } |
| |
| @Override |
| public void dispose() |
| { |
| for(QpidByteBuffer bin : _encodedSections) |
| { |
| bin.dispose(); |
| } |
| _encodedSections = null; |
| _encoded.dispose(); |
| _encoded = null; |
| } |
| |
| @Override |
| public void clearEncodedForm() |
| { |
| |
| } |
| |
| private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0> |
| { |
| private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); |
| |
| private MetaDataFactory() |
| { |
| _typeRegistry.registerTransportLayer(); |
| _typeRegistry.registerMessagingLayer(); |
| _typeRegistry.registerTransactionLayer(); |
| _typeRegistry.registerSecurityLayer(); |
| } |
| |
| public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf) |
| { |
| ValueHandler valueHandler = new ValueHandler(_typeRegistry); |
| |
| ArrayList<Section> sections = new ArrayList<Section>(3); |
| ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3); |
| |
| while(buf.hasRemaining()) |
| { |
| try |
| { |
| int start = buf.position(); |
| QpidByteBuffer encodedBuf = buf.slice(); |
| Object parse = valueHandler.parse(buf); |
| sections.add((Section) parse); |
| encodedBuf.limit(buf.position()-start); |
| encodedSections.add(encodedBuf); |
| |
| } |
| catch (AmqpErrorException e) |
| { |
| //TODO |
| throw new ConnectionScopedRuntimeException(e); |
| } |
| |
| } |
| |
| return new MessageMetaData_1_0(sections,encodedSections); |
| |
| } |
| } |
| |
| public class MessageHeader_1_0 implements AMQMessageHeader |
| { |
| |
| public String getCorrelationId() |
| { |
| if(_properties == null || _properties.getCorrelationId() == null) |
| { |
| return null; |
| } |
| else |
| { |
| return _properties.getCorrelationId().toString(); |
| } |
| } |
| |
| public long getExpiration() |
| { |
| return 0; //TODO |
| } |
| |
| public String getMessageId() |
| { |
| if(_properties == null || _properties.getMessageId() == null) |
| { |
| return null; |
| } |
| else |
| { |
| return _properties.getMessageId().toString(); |
| } |
| } |
| |
| public String getMimeType() |
| { |
| |
| if(_properties == null || _properties.getContentType() == null) |
| { |
| return null; |
| } |
| else |
| { |
| return _properties.getContentType().toString(); |
| } |
| } |
| |
| public String getEncoding() |
| { |
| return null; //TODO |
| } |
| |
| public byte getPriority() |
| { |
| if(_header == null || _header.getPriority() == null) |
| { |
| return 4; //javax.jms.Message.DEFAULT_PRIORITY; |
| } |
| else |
| { |
| return _header.getPriority().byteValue(); |
| } |
| } |
| |
| public long getTimestamp() |
| { |
| if(_properties == null || _properties.getCreationTime() == null) |
| { |
| return 0L; |
| } |
| else |
| { |
| return _properties.getCreationTime().getTime(); |
| } |
| |
| } |
| |
| |
| @Override |
| public long getNotValidBefore() |
| { |
| long notValidBefore; |
| Object annotation; |
| if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number) |
| { |
| notValidBefore = ((Number)annotation).longValue(); |
| } |
| else |
| { |
| notValidBefore = 0L; |
| } |
| return notValidBefore; |
| } |
| |
| public String getType() |
| { |
| String subject = getSubject(); |
| if(subject != null) |
| { |
| return subject; |
| } |
| |
| // Use legacy annotation if present and there was no subject |
| if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null) |
| { |
| return null; |
| } |
| else |
| { |
| return _messageAnnotations.get(JMS_TYPE).toString(); |
| } |
| } |
| |
| public String getReplyTo() |
| { |
| if(_properties == null || _properties.getReplyTo() == null) |
| { |
| return null; |
| } |
| else |
| { |
| return _properties.getReplyTo(); |
| } |
| } |
| |
| public String getAppId() |
| { |
| //TODO |
| return null; |
| } |
| |
| public String getUserId() |
| { |
| // TODO |
| return null; |
| } |
| |
| public Object getHeader(final String name) |
| { |
| return _appProperties == null ? null : _appProperties.get(name); |
| } |
| |
| public boolean containsHeaders(final Set<String> names) |
| { |
| if(_appProperties == null) |
| { |
| return false; |
| } |
| |
| for(String key : names) |
| { |
| if(!_appProperties.containsKey(key)) |
| { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public Collection<String> getHeaderNames() |
| { |
| if(_appProperties == null) |
| { |
| return Collections.emptySet(); |
| } |
| return Collections.unmodifiableCollection(_appProperties.keySet()); |
| } |
| |
| public boolean containsHeader(final String name) |
| { |
| return _appProperties != null && _appProperties.containsKey(name); |
| } |
| |
| public String getSubject() |
| { |
| return _properties == null ? null : _properties.getSubject(); |
| } |
| |
| public String getTo() |
| { |
| return _properties == null ? null : _properties.getTo(); |
| } |
| |
| public Map<String, Object> getHeadersAsMap() |
| { |
| return _appProperties == null ? new HashMap<String,Object>() : new HashMap<String,Object>(_appProperties); |
| } |
| } |
| |
| } |