blob: f1decc8fb45f1005dd326c6250e6086e81dfd2ef [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.message.internal;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
private static final String NON_AMQP_MESSAGE = "Non-AMQP Message";
private final Object _messageBody;
private InternalMessageHeader _header;
private String _initialRoutingAddress = "";
private final String _destinationName;
public InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
final InternalMessageHeader header,
final Object messageBody,
final String destinationName)
{
super(handle, null);
_header = header;
_messageBody = messageBody;
_destinationName = destinationName;
}
// used by recovery path
InternalMessage(final StoredMessage<InternalMessageMetaData> msg,
final String destinationName)
{
super(msg, null);
_header = msg.getMetaData().getHeader();
long contentSize = getSize();
if (contentSize > 0)
{
try (QpidByteBuffer buf = msg.getContent(0, (int) contentSize);
ObjectInputStream is = new ObjectInputStream(buf.asInputStream()))
{
_messageBody = is.readObject();
}
catch (IOException e)
{
throw new ConnectionScopedRuntimeException("Unexpected IO Exception in operation in memory", e);
}
catch (ClassNotFoundException e)
{
throw new ConnectionScopedRuntimeException("Object message contained an object which could not " +
"be deserialized", e);
}
}
else
{
_messageBody = null;
}
_destinationName = destinationName;
}
@Override
public String getInitialRoutingAddress()
{
return _initialRoutingAddress;
}
@Override
public String getTo()
{
return _destinationName;
}
@Override
public InternalMessageHeader getMessageHeader()
{
return _header;
}
@Override
public long getExpiration()
{
return _header.getExpiration();
}
@Override
public String getMessageType()
{
return NON_AMQP_MESSAGE;
}
@Override
public long getArrivalTime()
{
return _header.getArrivalTime();
}
@Override
public boolean isResourceAcceptable(final TransactionLogResource resource)
{
return true;
}
public Object getMessageBody()
{
return _messageBody;
}
public static InternalMessage createMessage(final MessageStore store,
final AMQMessageHeader header,
final Serializable bodyObject,
final boolean persistent,
final String destinationName)
{
InternalMessageHeader internalHeader;
if(header instanceof InternalMessageHeader)
{
internalHeader = (InternalMessageHeader) header;
}
else
{
internalHeader = new InternalMessageHeader(header);
}
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (ObjectOutputStream os = new ObjectOutputStream(bytesOut))
{
os.writeObject(bodyObject);
os.close();
byte[] bytes = bytesOut.toByteArray();
final InternalMessageMetaData metaData = InternalMessageMetaData.create(persistent, internalHeader, bytes.length);
MessageHandle<InternalMessageMetaData> handle = store.addMessage(metaData);
final StoredMessage<InternalMessageMetaData> storedMessage;
try (QpidByteBuffer wrap = QpidByteBuffer.wrap(bytes))
{
handle.addContent(wrap);
}
storedMessage = handle.allContentAdded();
return new InternalMessage(storedMessage, internalHeader, bodyObject, destinationName);
}
catch (IOException e)
{
throw new ConnectionScopedRuntimeException("Unexpected IO Exception on operation in memory", e);
}
}
public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody)
{
return createStringMessage(store, header, messageBody, false);
}
public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody, boolean persistent)
{
return createMessage(store, header, messageBody, persistent, null);
}
public static InternalMessage createMapMessage(MessageStore store, AMQMessageHeader header, Map<? extends Object,? extends Object> messageBody)
{
return createMessage(store, header, new LinkedHashMap<Object,Object>(messageBody), false, null);
}
public static InternalMessage createListMessage(MessageStore store, AMQMessageHeader header, List<? extends Object> messageBody)
{
return createMessage(store, header, new ArrayList<Object>(messageBody), false, null);
}
public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody)
{
return createBytesMessage(store, header, messageBody, false);
}
public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody, boolean persist)
{
return createMessage(store, header, messageBody, persist, null);
}
public static InternalMessage convert(final ServerMessage serverMessage,
AMQMessageHeader header,
Object messageBody)
{
long messageNumber = serverMessage.getMessageNumber();
boolean persistent = serverMessage.isPersistent();
String destinationName = serverMessage.getTo();
InternalMessageHeader convertedHeader = new InternalMessageHeader(header, serverMessage.getArrivalTime());
StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody);
InternalMessage internalMessage = new InternalMessage(handle, convertedHeader, messageBody, destinationName);
internalMessage.setInitialRoutingAddress(serverMessage.getInitialRoutingAddress());
return internalMessage;
}
private static StoredMessage<InternalMessageMetaData> createReadOnlyHandle(final long messageNumber,
final boolean persistent,
final InternalMessageHeader header,
final Object messageBody)
{
try(ByteArrayOutputStream bytesOut = new ByteArrayOutputStream())
{
try(ObjectOutputStream os = new ObjectOutputStream(bytesOut))
{
os.writeObject(messageBody);
final byte[] bytes = bytesOut.toByteArray();
final InternalMessageMetaData metaData =
InternalMessageMetaData.create(persistent, header, bytes.length);
final int metadataSize = metaData.getStorableSize();
return new StoredMessage<InternalMessageMetaData>()
{
@Override
public InternalMessageMetaData getMetaData()
{
return metaData;
}
@Override
public long getMessageNumber()
{
return messageNumber;
}
@Override
public QpidByteBuffer getContent(final int offset, final int length)
{
return QpidByteBuffer.wrap(bytes, offset, length);
}
@Override
public int getContentSize()
{
return bytes.length;
}
@Override
public int getMetadataSize()
{
return metadataSize;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isInContentInMemory()
{
return true;
}
@Override
public long getInMemorySize()
{
return getContentSize() + getMetadataSize();
}
@Override
public boolean flowToDisk()
{
return false;
}
@Override
public void reallocate()
{
}
};
}
}
catch (IOException e)
{
throw new ConnectionScopedRuntimeException("Unexpected IO Exception on operation in memory", e);
}
}
public void setInitialRoutingAddress(final String initialRoutingAddress)
{
_initialRoutingAddress = initialRoutingAddress == null ? "" : initialRoutingAddress;
}
}