| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.message; |
| |
| import org.apache.qpid.server.store.StorableMessageMetaData; |
| import org.apache.qpid.server.store.MessageMetaDataType; |
| import org.apache.qpid.transport.MessageTransfer; |
| import org.apache.qpid.transport.DeliveryProperties; |
| import org.apache.qpid.transport.MessageProperties; |
| import org.apache.qpid.transport.Header; |
| import org.apache.qpid.transport.MessageDeliveryMode; |
| import org.apache.qpid.transport.Struct; |
| import org.apache.qpid.transport.codec.BBEncoder; |
| import org.apache.qpid.transport.codec.BBDecoder; |
| |
| import java.nio.ByteBuffer; |
| import java.lang.ref.SoftReference; |
| |
| public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage |
| { |
| private Header _header; |
| private DeliveryProperties _deliveryProps; |
| private MessageProperties _messageProps; |
| private MessageTransferHeader _messageHeader; |
| private long _arrivalTime; |
| private int _bodySize; |
| private volatile SoftReference<ByteBuffer> _body; |
| |
| private static final int ENCODER_SIZE = 1 << 16; |
| |
| public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); |
| |
| private volatile ByteBuffer _encoded; |
| |
| |
| public MessageMetaData_0_10(MessageTransfer xfr) |
| { |
| this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis()); |
| } |
| |
| private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) |
| { |
| this(header, bodySize, null, arrivalTime); |
| } |
| |
| private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime) |
| { |
| _header = header; |
| if(_header != null) |
| { |
| _deliveryProps = _header.get(DeliveryProperties.class); |
| _messageProps = _header.get(MessageProperties.class); |
| } |
| else |
| { |
| _deliveryProps = null; |
| _messageProps = null; |
| } |
| _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); |
| _arrivalTime = arrivalTime; |
| _bodySize = bodySize; |
| |
| |
| |
| if(xfrBody == null) |
| { |
| _body = null; |
| } |
| else |
| { |
| ByteBuffer body = ByteBuffer.allocate(_bodySize); |
| body.put(xfrBody); |
| body.flip(); |
| _body = new SoftReference<ByteBuffer>(body); |
| } |
| |
| |
| } |
| |
| |
| |
| public MessageMetaDataType getType() |
| { |
| return MessageMetaDataType.META_DATA_0_10; |
| } |
| |
| public int getStorableSize() |
| { |
| ByteBuffer buf = _encoded; |
| |
| if(buf == null) |
| { |
| buf = encodeAsBuffer(); |
| _encoded = buf; |
| } |
| |
| //TODO -- need to add stuff |
| return buf.limit(); |
| } |
| |
| private ByteBuffer encodeAsBuffer() |
| { |
| BBEncoder encoder = new BBEncoder(ENCODER_SIZE); |
| |
| encoder.writeInt64(_arrivalTime); |
| encoder.writeInt32(_bodySize); |
| Struct[] headers = _header == null ? new Struct[0] : _header.getStructs(); |
| encoder.writeInt32(headers.length); |
| |
| |
| for(Struct header : headers) |
| { |
| encoder.writeStruct32(header); |
| |
| } |
| |
| ByteBuffer buf = encoder.buffer(); |
| return buf; |
| } |
| |
| public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) |
| { |
| ByteBuffer buf = _encoded; |
| |
| if(buf == null) |
| { |
| buf = encodeAsBuffer(); |
| _encoded = buf; |
| } |
| |
| buf = buf.duplicate(); |
| |
| buf.position(offsetInMetaData); |
| |
| if(dest.remaining() < buf.limit()) |
| { |
| buf.limit(dest.remaining()); |
| } |
| dest.put(buf); |
| return buf.limit(); |
| } |
| |
| public int getContentSize() |
| { |
| return _bodySize; |
| } |
| |
| public boolean isPersistent() |
| { |
| return _deliveryProps == null ? false : _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT; |
| } |
| |
| public String getRoutingKey() |
| { |
| return _deliveryProps == null ? null : _deliveryProps.getRoutingKey(); |
| } |
| |
| public AMQMessageHeader getMessageHeader() |
| { |
| return _messageHeader; |
| } |
| |
| public long getSize() |
| { |
| |
| return _bodySize; |
| } |
| |
| public boolean isImmediate() |
| { |
| return _deliveryProps != null && _deliveryProps.getImmediate(); |
| } |
| |
| public long getExpiration() |
| { |
| return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); |
| } |
| |
| public boolean isRedelivered() |
| { |
| // The *Message* is never redelivered, only queue entries are... |
| return false; |
| } |
| |
| public long getArrivalTime() |
| { |
| return _arrivalTime; |
| } |
| |
| public Header getHeader() |
| { |
| return _header; |
| } |
| |
| public ByteBuffer getBody() |
| { |
| ByteBuffer body = _body == null ? null : _body.get(); |
| return body; |
| } |
| |
| public void setBody(ByteBuffer body) |
| { |
| _body = new SoftReference<ByteBuffer>(body); |
| } |
| |
| private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> |
| { |
| public MessageMetaData_0_10 createMetaData(ByteBuffer buf) |
| { |
| BBDecoder decoder = new BBDecoder(); |
| decoder.init(buf); |
| |
| long arrivalTime = decoder.readInt64(); |
| int bodySize = decoder.readInt32(); |
| int headerCount = decoder.readInt32(); |
| |
| Struct[] headers = new Struct[headerCount]; |
| |
| for(int i = 0 ; i < headerCount; i++) |
| { |
| headers[i] = decoder.readStruct32(); |
| } |
| |
| Header header = new Header(headers); |
| |
| return new MessageMetaData_0_10(header, bodySize, arrivalTime); |
| |
| } |
| } |
| |
| |
| } |