blob: ae7793c489c28462e7e3deb597a19931ff1b219a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.jms;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.StreamMessage;
import org.apache.activemq.openwire.commands.OpenWireStreamMessage;
import org.apache.activemq.openwire.utils.ExceptionSupport;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
/**
* Wrapper class that provides StreamMessage compliant mappings to the OpenWireStreamMessage
*/
public class OpenWireJMSStreamMessage extends OpenWireJMSMessage implements StreamMessage {
private final OpenWireStreamMessage message;
protected transient DataOutputStream dataOut;
protected transient ByteArrayOutputStream bytesOut;
protected transient DataInputStream dataIn;
protected transient int remainingBytes = -1;
/**
* Creates a new instance that wraps a new OpenWireMessage instance.
*/
public OpenWireJMSStreamMessage() {
this(new OpenWireStreamMessage());
}
/**
* Creates a new instance that wraps the given OpenWireMessage
*
* @param message
* the OpenWireMessage to wrap.
*/
public OpenWireJMSStreamMessage(OpenWireStreamMessage message) {
this.message = message;
}
@Override
public OpenWireJMSStreamMessage copy() throws JMSException {
OpenWireJMSStreamMessage other = new OpenWireJMSStreamMessage(message.copy());
copy(other);
return other;
}
private void copy(OpenWireJMSStreamMessage copy) throws JMSException {
storeContent();
super.copy(copy);
copy.dataOut = null;
copy.bytesOut = null;
copy.dataIn = null;
}
@Override
public void clearBody() throws JMSException {
super.clearBody();
message.clearBody();
dataOut = null;
dataIn = null;
bytesOut = null;
remainingBytes = -1;
}
@Override
public boolean readBoolean() throws JMSException {
initializeReading();
try {
this.dataIn.mark(10);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.BOOLEAN_TYPE) {
return this.dataIn.readBoolean();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to boolean.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a boolean type");
}
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public byte readByte() throws JMSException {
initializeReading();
try {
this.dataIn.mark(10);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Byte.valueOf(this.dataIn.readUTF()).byteValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to byte.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a byte type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public short readShort() throws JMSException {
initializeReading();
try {
this.dataIn.mark(17);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Short.valueOf(this.dataIn.readUTF()).shortValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to short.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a short type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public char readChar() throws JMSException {
initializeReading();
try {
this.dataIn.mark(17);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.CHAR_TYPE) {
return this.dataIn.readChar();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to char.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a char type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public int readInt() throws JMSException {
initializeReading();
try {
this.dataIn.mark(33);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.INTEGER_TYPE) {
return this.dataIn.readInt();
} else if (type == OpenWireMarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Integer.valueOf(this.dataIn.readUTF()).intValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to int.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not an int type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public long readLong() throws JMSException {
initializeReading();
try {
this.dataIn.mark(65);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.LONG_TYPE) {
return this.dataIn.readLong();
} else if (type == OpenWireMarshallingSupport.INTEGER_TYPE) {
return this.dataIn.readInt();
} else if (type == OpenWireMarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Long.valueOf(this.dataIn.readUTF()).longValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to long.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a long type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public float readFloat() throws JMSException {
initializeReading();
try {
this.dataIn.mark(33);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.FLOAT_TYPE) {
return this.dataIn.readFloat();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Float.valueOf(this.dataIn.readUTF()).floatValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to float.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a float type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public double readDouble() throws JMSException {
initializeReading();
try {
this.dataIn.mark(65);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.DOUBLE_TYPE) {
return this.dataIn.readDouble();
} else if (type == OpenWireMarshallingSupport.FLOAT_TYPE) {
return this.dataIn.readFloat();
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return Double.valueOf(this.dataIn.readUTF()).doubleValue();
} else if (type == OpenWireMarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to double.");
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a double type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public String readString() throws JMSException {
initializeReading();
try {
this.dataIn.mark(65);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.NULL) {
return null;
} else if (type == OpenWireMarshallingSupport.BIG_STRING_TYPE) {
return OpenWireMarshallingSupport.readUTF8(dataIn);
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return this.dataIn.readUTF();
} else if (type == OpenWireMarshallingSupport.LONG_TYPE) {
return new Long(this.dataIn.readLong()).toString();
} else if (type == OpenWireMarshallingSupport.INTEGER_TYPE) {
return new Integer(this.dataIn.readInt()).toString();
} else if (type == OpenWireMarshallingSupport.SHORT_TYPE) {
return new Short(this.dataIn.readShort()).toString();
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return new Byte(this.dataIn.readByte()).toString();
} else if (type == OpenWireMarshallingSupport.FLOAT_TYPE) {
return new Float(this.dataIn.readFloat()).toString();
} else if (type == OpenWireMarshallingSupport.DOUBLE_TYPE) {
return new Double(this.dataIn.readDouble()).toString();
} else if (type == OpenWireMarshallingSupport.BOOLEAN_TYPE) {
return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
} else if (type == OpenWireMarshallingSupport.CHAR_TYPE) {
return new Character(this.dataIn.readChar()).toString();
} else {
this.dataIn.reset();
throw new MessageFormatException(" not a String type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
throw ExceptionSupport.createMessageEOFException(e);
} catch (IOException e) {
throw ExceptionSupport.createMessageFormatException(e);
}
}
@Override
public int readBytes(byte[] value) throws JMSException {
initializeReading();
try {
if (value == null) {
throw new NullPointerException();
}
if (remainingBytes == -1) {
this.dataIn.mark(value.length + 1);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type != OpenWireMarshallingSupport.BYTE_ARRAY_TYPE) {
throw new MessageFormatException("Not a byte array");
}
remainingBytes = this.dataIn.readInt();
} else if (remainingBytes == 0) {
remainingBytes = -1;
return -1;
}
if (value.length <= remainingBytes) {
// small buffer
remainingBytes -= value.length;
this.dataIn.readFully(value);
return value.length;
} else {
// big buffer
int rc = this.dataIn.read(value, 0, remainingBytes);
remainingBytes = 0;
return rc;
}
} catch (EOFException e) {
JMSException jmsEx = new MessageEOFException(e.getMessage());
jmsEx.setLinkedException(e);
throw jmsEx;
} catch (IOException e) {
JMSException jmsEx = new MessageFormatException(e.getMessage());
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
@Override
public Object readObject() throws JMSException {
initializeReading();
try {
this.dataIn.mark(65);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
} else if (type == OpenWireMarshallingSupport.NULL) {
return null;
} else if (type == OpenWireMarshallingSupport.BIG_STRING_TYPE) {
return OpenWireMarshallingSupport.readUTF8(dataIn);
} else if (type == OpenWireMarshallingSupport.STRING_TYPE) {
return this.dataIn.readUTF();
} else if (type == OpenWireMarshallingSupport.LONG_TYPE) {
return Long.valueOf(this.dataIn.readLong());
} else if (type == OpenWireMarshallingSupport.INTEGER_TYPE) {
return Integer.valueOf(this.dataIn.readInt());
} else if (type == OpenWireMarshallingSupport.SHORT_TYPE) {
return Short.valueOf(this.dataIn.readShort());
} else if (type == OpenWireMarshallingSupport.BYTE_TYPE) {
return Byte.valueOf(this.dataIn.readByte());
} else if (type == OpenWireMarshallingSupport.FLOAT_TYPE) {
return new Float(this.dataIn.readFloat());
} else if (type == OpenWireMarshallingSupport.DOUBLE_TYPE) {
return new Double(this.dataIn.readDouble());
} else if (type == OpenWireMarshallingSupport.BOOLEAN_TYPE) {
return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
} else if (type == OpenWireMarshallingSupport.CHAR_TYPE) {
return Character.valueOf(this.dataIn.readChar());
} else if (type == OpenWireMarshallingSupport.BYTE_ARRAY_TYPE) {
int len = this.dataIn.readInt();
byte[] value = new byte[len];
this.dataIn.readFully(value);
return value;
} else {
this.dataIn.reset();
throw new MessageFormatException("unknown type");
}
} catch (NumberFormatException mfe) {
try {
this.dataIn.reset();
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
throw mfe;
} catch (EOFException e) {
JMSException jmsEx = new MessageEOFException(e.getMessage());
jmsEx.setLinkedException(e);
throw jmsEx;
} catch (IOException e) {
JMSException jmsEx = new MessageFormatException(e.getMessage());
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
@Override
public void writeBoolean(boolean value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalBoolean(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeByte(byte value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalByte(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeShort(short value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalShort(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeChar(char value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalChar(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeInt(int value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalInt(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeLong(long value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalLong(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeFloat(float value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalFloat(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeDouble(double value) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalDouble(dataOut, value);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeString(String value) throws JMSException {
initializeWriting();
try {
if (value == null) {
OpenWireMarshallingSupport.marshalNull(dataOut);
} else {
OpenWireMarshallingSupport.marshalString(dataOut, value);
}
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeBytes(byte[] value) throws JMSException {
writeBytes(value, 0, value.length);
}
@Override
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
initializeWriting();
try {
OpenWireMarshallingSupport.marshalByteArray(dataOut, value, offset, length);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
}
@Override
public void writeObject(Object value) throws JMSException {
initializeWriting();
if (value == null) {
try {
OpenWireMarshallingSupport.marshalNull(dataOut);
} catch (IOException ioe) {
throw ExceptionSupport.create(ioe);
}
} else if (value instanceof String) {
writeString(value.toString());
} else if (value instanceof Character) {
writeChar(((Character)value).charValue());
} else if (value instanceof Boolean) {
writeBoolean(((Boolean)value).booleanValue());
} else if (value instanceof Byte) {
writeByte(((Byte)value).byteValue());
} else if (value instanceof Short) {
writeShort(((Short)value).shortValue());
} else if (value instanceof Integer) {
writeInt(((Integer)value).intValue());
} else if (value instanceof Float) {
writeFloat(((Float)value).floatValue());
} else if (value instanceof Double) {
writeDouble(((Double)value).doubleValue());
} else if (value instanceof byte[]) {
writeBytes((byte[])value);
} else if (value instanceof Long) {
writeLong(((Long)value).longValue());
} else {
throw new MessageFormatException("Unsupported Object type: " + value.getClass());
}
}
@Override
public void reset() throws JMSException {
storeContent();
setReadOnlyBody(true);
this.bytesOut = null;
this.dataIn = null;
this.dataOut = null;
this.remainingBytes = -1;
}
private void initializeWriting() throws JMSException {
checkReadOnlyBody();
if (this.dataOut == null) {
this.bytesOut = new ByteArrayOutputStream();
this.dataOut = new DataOutputStream(bytesOut);
}
// For a message that already had a body and was sent we need to restore the content
// if the message is used again without having its clearBody method called.
if (message.hasContent()) {
Buffer content = message.getPayload();
try {
this.dataOut.write(content.getData(), content.getOffset(), content.getLength());
} catch (IOException e) {
throw ExceptionSupport.create(e);
}
message.setContent(null);
}
}
private void initializeReading() throws MessageNotReadableException {
checkWriteOnlyBody();
if (this.dataIn == null) {
Buffer data;
try {
data = message.getPayload();
} catch (JMSException e) {
throw new MessageNotReadableException("Failed to read content from message.");
}
InputStream is = new ByteArrayInputStream(data);
this.dataIn = new DataInputStream(is);
}
}
private void storeContent() throws JMSException {
if (dataOut != null) {
try {
dataOut.close();
message.setPayload(bytesOut.toBuffer());
bytesOut = null;
dataOut = null;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
}