OPENWIRE-12 Remove dependency on HawtBuf and replace with internal Buffer implementation.
diff --git a/openwire-core/pom.xml b/openwire-core/pom.xml
index 75549e7..8b6d99c 100644
--- a/openwire-core/pom.xml
+++ b/openwire-core/pom.xml
@@ -44,10 +44,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf</artifactId>
- </dependency>
<!-- =================================== -->
<!-- Testing Dependencies -->
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/Buffer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/Buffer.java
new file mode 100644
index 0000000..922bfea
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/Buffer.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.buffer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.openwire.utils.HexSupport;
+
+/**
+ * Wrapper for byte[] instances used to manage marshaled data
+ */
+public class Buffer implements Comparable<Buffer> {
+
+ public byte[] data;
+ public int offset;
+ public int length;
+
+ public Buffer(ByteBuffer other) {
+ this(other.array(), other.arrayOffset()+other.position(), other.remaining());
+ }
+
+ public Buffer(Buffer other) {
+ this(other.data, other.offset, other.length);
+ }
+
+ public Buffer(int size) {
+ this(new byte[size]);
+ }
+
+ public Buffer(byte data[]) {
+ this(data, 0, data.length);
+ }
+
+ public Buffer(byte data[], int offset, int length) {
+
+ if (data == null) {
+ throw new IllegalArgumentException("byte array value cannot by null");
+ }
+
+ if (offset + length > data.length) {
+ throw new IndexOutOfBoundsException(
+ String.format("offset %d + length %d must be <= the data.length %d", data, length, data.length));
+ }
+
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ //-----Implementation ----------------------------------------------------//
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ final public boolean isEmpty() {
+ return length == 0;
+ }
+
+ final public byte[] toByteArray() {
+ byte[] data = this.data;
+ int length = this.length;
+
+ if (length != data.length) {
+ byte t[] = new byte[length];
+ System.arraycopy(data, offset, t, 0, length);
+ data = t;
+ }
+
+ return data;
+ }
+
+ final public boolean equals(Buffer obj) {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
+ if (length != obj.length) {
+ return false;
+ }
+
+ byte[] objData = obj.data;
+ int objOffset = obj.offset;
+
+ for (int i = 0; i < length; i++) {
+ if (objData[objOffset + i] != data[offset + i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ //----- Platform overrides -----------------------------------------------//
+
+ @Override
+ public String toString() {
+ int size = length;
+ boolean asciiPrintable = true;
+
+ for (int i = 0; i < size; i++) {
+ int c = data[offset + i] & 0xFF;
+ if (c > 126 || c < 32) { // not a printable char
+ if (!(c == '\n' || c == '\r' | c == '\n' | c == 27)) {
+ // except these.
+ asciiPrintable = false;
+ break;
+ }
+ }
+ }
+
+ if (asciiPrintable) {
+ char decoded[] = new char[length];
+ for (int i = 0; i < size; i++) {
+ decoded[i] = (char) (data[offset + i] & 0xFF);
+ }
+ return "ascii: " + new String(decoded);
+ } else {
+ return "hex: " + HexSupport.toHexFromBuffer(this);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || obj.getClass() != Buffer.class) {
+ return false;
+ }
+
+ return equals((Buffer) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
+ byte[] target = new byte[4];
+ for (int i = 0; i < length; i++) {
+ target[i % 4] ^= data[offset + i];
+ }
+
+ return target[0] << 24 | target[1] << 16 | target[2] << 8 | target[3];
+ }
+
+ @Override
+ public int compareTo(Buffer o) {
+ if (this == o) {
+ return 0;
+ }
+
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
+ int oLength = o.length;
+ int oOffset = o.offset;
+ byte[] oData = o.data;
+
+ int minLength = Math.min(length, oLength);
+ if (offset == oOffset) {
+ int pos = offset;
+ int limit = minLength + offset;
+ while (pos < limit) {
+ int b1 = 0xFF & data[pos];
+ int b2 = 0xFF & oData[pos];
+ if (b1 != b2) {
+ return b1 - b2;
+ }
+ pos++;
+ }
+ } else {
+ int offset1 = offset;
+ int offset2 = oOffset;
+ while (minLength-- != 0) {
+ int b1 = 0xFF & data[offset1++];
+ int b2 = 0xFF & oData[offset2++];
+ if (b1 != b2) {
+ return b1 - b2;
+ }
+ }
+ }
+
+ return length - oLength;
+ }
+
+ //----- Utility Stream write methods -------------------------------------//
+
+ /**
+ * same as out.write(data, offset, length);
+ */
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(data, offset, length);
+ }
+
+ /**
+ * same as out.write(data, offset, length);
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(data, offset, length);
+ }
+
+ /**
+ * same as in.readFully(data, offset, length);
+ */
+ public void readFrom(DataInput in) throws IOException {
+ in.readFully(data, offset, length);
+ }
+
+ /**
+ * same as in.read(data, offset, length);
+ */
+ public int readFrom(InputStream in) throws IOException {
+ return in.read(data, offset, length);
+ }
+}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayInputStream.java b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayInputStream.java
new file mode 100644
index 0000000..2f5248f
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayInputStream.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.buffer;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayInputStream that can be used more than once
+ */
+public final class DataByteArrayInputStream extends InputStream implements DataInput {
+
+ private byte[] buf;
+ private int pos;
+ private int offset;
+ private int length;
+
+ /**
+ * Creates a <code>StoreByteArrayInputStream</code>.
+ *
+ * @param buf
+ * the input buffer.
+ */
+ public DataByteArrayInputStream(byte buf[]) {
+ restart(buf);
+ }
+
+ /**
+ * Creates a <code>StoreByteArrayInputStream</code>.
+ *
+ * @param buffer
+ * the input buffer.
+ */
+ public DataByteArrayInputStream(Buffer buffer) {
+ restart(buffer);
+ }
+
+ /**
+ * reset the <code>StoreByteArrayInputStream</code> to use an new Buffer
+ *
+ * @param buffer
+ */
+ public void restart(Buffer buffer) {
+ this.buf = buffer.getData();
+ this.offset = buffer.getOffset();
+ this.pos = this.offset;
+ this.length = buffer.getLength();
+ }
+
+ /**
+ * re-start the input stream - reusing the current buffer
+ *
+ * @param size
+ */
+ public void restart(int size) {
+ if (buf == null || buf.length < size) {
+ buf = new byte[size];
+ }
+ restart(buf);
+ this.length = size;
+ }
+
+ /**
+ * Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
+ */
+ public DataByteArrayInputStream() {
+ this(new byte[0]);
+ }
+
+ /**
+ * @return the size
+ */
+ public int size() {
+ return pos - offset;
+ }
+
+ /**
+ * @return the underlying data array
+ */
+ public byte[] getRawData() {
+ return buf;
+ }
+
+ public Buffer readBuffer(int len) {
+ int endpos = offset + length;
+ if (pos > endpos) {
+ return null;
+ }
+ if (pos + len > endpos) {
+ len = length - pos;
+ }
+ Buffer rc = new Buffer(buf, pos, len);
+ pos += len;
+ return rc;
+ }
+
+ /**
+ * reset the <code>StoreByteArrayInputStream</code> to use an new byte array
+ *
+ * @param newBuff
+ */
+ public void restart(byte[] newBuff) {
+ buf = newBuff;
+ pos = 0;
+ length = newBuff.length;
+ }
+
+ public void restart() {
+ pos = 0;
+ length = buf.length;
+ }
+
+ public int getPos() {
+ return pos;
+ }
+
+ public void setPos(int pos) {
+ this.pos = pos;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public int skip(int n) {
+ return skipBytes(n);
+ }
+
+ //----- InputStream implementation ---------------------------------------//
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the stream
+ * has been reached, the value <code>-1</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream has been reached.
+ */
+ @Override
+ public int read() {
+ return (pos < offset + length) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data into an array of bytes from
+ * this input stream.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @param off
+ * the start offset of the data.
+ * @param len
+ * the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if there is no more data because the end of the
+ * stream has been reached.
+ */
+ @Override
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+
+ int endpos = offset + length;
+ if (pos >= endpos) {
+ return -1;
+ }
+ if (pos + len > endpos) {
+ len = length - pos;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ //----- DataInput Implementation -----------------------------------------//
+
+ /**
+ * @return the number of bytes that can be read from the input stream
+ * without blocking.
+ */
+ @Override
+ public int available() {
+ return offset + length - pos;
+ }
+
+ @Override
+ public void readFully(byte[] b) {
+ read(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) {
+ read(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) {
+ int endpos = offset + length;
+ if (pos + n > endpos) {
+ n = endpos - pos;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return read() != 0;
+ }
+
+ @Override
+ public byte readByte() {
+ return (byte) read();
+ }
+
+ @Override
+ public int readUnsignedByte() {
+ return read();
+ }
+
+ @Override
+ public short readShort() {
+ int ch1 = read();
+ int ch2 = read();
+ return (short) ((ch1 << 8) + (ch2 << 0));
+ }
+
+ @Override
+ public int readUnsignedShort() {
+ int ch1 = read();
+ int ch2 = read();
+ return (ch1 << 8) + (ch2 << 0);
+ }
+
+ @Override
+ public char readChar() {
+ int ch1 = read();
+ int ch2 = read();
+ return (char) ((ch1 << 8) + (ch2 << 0));
+ }
+
+ @Override
+ public int readInt() {
+ int ch1 = read();
+ int ch2 = read();
+ int ch3 = read();
+ int ch4 = read();
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
+ }
+
+ @Override
+ public long readLong() {
+ long rc = ((long) buf[pos++] << 56) + ((long) (buf[pos++] & 255) << 48) + ((long) (buf[pos++] & 255) << 40) + ((long) (buf[pos++] & 255) << 32);
+ return rc + ((long) (buf[pos++] & 255) << 24) + ((buf[pos++] & 255) << 16) + ((buf[pos++] & 255) << 8) + ((buf[pos++] & 255) << 0);
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public String readLine() {
+ int start = pos;
+ while (pos < offset + length) {
+ int c = read();
+ if (c == '\n') {
+ break;
+ }
+ if (c == '\r') {
+ c = read();
+ if (c != '\n' && c != -1) {
+ pos--;
+ }
+ break;
+ }
+ }
+ return new String(buf, start, pos);
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ int length = readUnsignedShort();
+ char[] characters = new char[length];
+ int c;
+ int c2;
+ int c3;
+ int count = 0;
+ int total = pos + length;
+
+ while (pos < total) {
+ c = buf[pos] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ pos++;
+ characters[count++] = (char) c;
+ }
+
+ while (pos < total) {
+ c = buf[pos] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ pos++;
+ characters[count++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ pos += 2;
+ if (pos > total) {
+ throw new UTFDataFormatException("bad string");
+ }
+ c2 = buf[pos - 1];
+ if ((c2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("bad string");
+ }
+ characters[count++] = (char) (((c & 0x1F) << 6) | (c2 & 0x3F));
+ break;
+ case 14:
+ pos += 3;
+ if (pos > total) {
+ throw new UTFDataFormatException("bad string");
+ }
+ c2 = buf[pos - 2];
+ c3 = buf[pos - 1];
+ if (((c2 & 0xC0) != 0x80) || ((c3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("bad string");
+ }
+ characters[count++] = (char) (((c & 0x0F) << 12) | ((c2 & 0x3F) << 6) | ((c3 & 0x3F) << 0));
+ break;
+ default:
+ throw new UTFDataFormatException("bad string");
+ }
+ }
+
+ return new String(characters, 0, count);
+ }
+}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayOutputStream.java b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayOutputStream.java
new file mode 100644
index 0000000..01448bc
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/DataByteArrayOutputStream.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayOutputStream
+ */
+public final class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+
+ private static final int DEFAULT_SIZE = 2048;
+
+ protected byte buf[];
+ protected int pos;
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of the
+ * specified size, in bytes.
+ *
+ * @param size
+ * the initial size.
+ * @exception IllegalArgumentException
+ * if size is negative.
+ */
+ public DataByteArrayOutputStream(int size) {
+ if (size <= 0) {
+ throw new IllegalArgumentException("Invalid size: " + size);
+ }
+ buf = new byte[size];
+ }
+
+ public DataByteArrayOutputStream(byte buf[]) {
+ if (buf == null || buf.length == 0) {
+ throw new IllegalArgumentException("Invalid buffer");
+ }
+ this.buf = buf;
+ }
+
+ /**
+ * Creates a new byte array output stream.
+ */
+ public DataByteArrayOutputStream() {
+ this(DEFAULT_SIZE);
+ }
+
+ /**
+ * start using a fresh byte array
+ *
+ * @param size
+ */
+ public void restart(int size) {
+ buf = new byte[size];
+ pos = 0;
+ }
+
+ /**
+ * start using a fresh byte array
+ */
+ public void restart() {
+ restart(DEFAULT_SIZE);
+ }
+
+ /**
+ * Get a Buffer from the stream
+ *
+ * @return the byte sequence
+ */
+ public Buffer toBuffer() {
+ return new Buffer(buf, 0, pos);
+ }
+
+ public void write(Buffer data) throws IOException {
+ write(data.data, data.offset, data.length);
+ }
+
+ /**
+ * @return the underlying byte[] buffer
+ */
+ public byte[] getData() {
+ return buf;
+ }
+
+ /**
+ * reset the output stream
+ */
+ public void reset() {
+ pos = 0;
+ }
+
+ /**
+ * Set the current position for writing
+ *
+ * @param offset
+ * @throws IOException
+ */
+ public void position(int offset) throws IOException {
+ ensureEnoughBuffer(offset);
+ pos = offset;
+ }
+
+ public int position() {
+ return pos;
+ }
+
+ public int size() {
+ return pos;
+ }
+
+ public void skip(int size) throws IOException {
+ ensureEnoughBuffer(pos + size);
+ pos += size;
+ }
+
+ //----- Implementation of OutputStream -----------------------------------//
+
+ /**
+ * Writes the specified byte to this byte array output stream.
+ *
+ * @param b
+ * the byte to be written.
+ * @throws IOException
+ */
+ @Override
+ public void write(int b) throws IOException {
+ int newcount = pos + 1;
+ ensureEnoughBuffer(newcount);
+ buf[pos] = (byte) b;
+ pos = newcount;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this byte array output stream.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @throws IOException
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+ int newcount = pos + len;
+ ensureEnoughBuffer(newcount);
+ System.arraycopy(b, off, buf, pos, len);
+ pos = newcount;
+ }
+
+ //----- Implementation of DataOutput -------------------------------------//
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ ensureEnoughBuffer(pos + 1);
+ buf[pos++] = (byte) (v ? 1 : 0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ ensureEnoughBuffer(pos + 1);
+ buf[pos++] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ ensureEnoughBuffer(pos + 2);
+ buf[pos++] = (byte) (v >>> 8);
+ buf[pos++] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ ensureEnoughBuffer(pos + 2);
+ buf[pos++] = (byte) (v >>> 8);
+ buf[pos++] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ ensureEnoughBuffer(pos + 4);
+ buf[pos++] = (byte) (v >>> 24);
+ buf[pos++] = (byte) (v >>> 16);
+ buf[pos++] = (byte) (v >>> 8);
+ buf[pos++] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ ensureEnoughBuffer(pos + 8);
+ buf[pos++] = (byte) (v >>> 56);
+ buf[pos++] = (byte) (v >>> 48);
+ buf[pos++] = (byte) (v >>> 40);
+ buf[pos++] = (byte) (v >>> 32);
+ buf[pos++] = (byte) (v >>> 24);
+ buf[pos++] = (byte) (v >>> 16);
+ buf[pos++] = (byte) (v >>> 8);
+ buf[pos++] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ int length = s.length();
+ for (int i = 0; i < length; i++) {
+ write((byte) s.charAt(i));
+ }
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ int length = s.length();
+ for (int i = 0; i < length; i++) {
+ int c = s.charAt(i);
+ write((c >>> 8) & 0xFF);
+ write((c >>> 0) & 0xFF);
+ }
+ }
+
+ @Override
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int encodedsize = 0;
+ int c;
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ encodedsize++;
+ } else if (c > 0x07FF) {
+ encodedsize += 3;
+ } else {
+ encodedsize += 2;
+ }
+ }
+ if (encodedsize > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + encodedsize + " bytes");
+ }
+ ensureEnoughBuffer(pos + encodedsize + 2);
+ writeShort(encodedsize);
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buf[pos++] = (byte) c;
+ }
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buf[pos++] = (byte) c;
+ } else if (c > 0x07FF) {
+ buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ }
+
+ //----- Indexed Write Operations -----------------------------------------//
+
+ /**
+ * Write the given int value starting at the given index in the internal
+ * data buffer, if there is not enough space in the current buffer or the
+ * index is beyond the current buffer capacity then the size of the buffer
+ * is increased to fit the value.
+ *
+ * This method does not modify the tracked position for non-index writes
+ * which means that a subsequent write operation can overwrite the value
+ * written by this operation if the index given is beyond the current
+ * write position.
+ *
+ * @param index
+ * @param value
+ * @throws IOException
+ */
+ public void writeInt(int index, int value) throws IOException {
+ ensureEnoughBuffer(index + 4);
+ buf[index++] = (byte) (value >>> 24);
+ buf[index++] = (byte) (value >>> 16);
+ buf[index++] = (byte) (value >>> 8);
+ buf[index++] = (byte) (value >>> 0);
+ }
+
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void resize(int newcount) {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, pos);
+ buf = newbuf;
+ }
+
+ private void ensureEnoughBuffer(int newcount) {
+ if (newcount > buf.length) {
+ resize(newcount);
+ }
+ }
+}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/UTF8Buffer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/UTF8Buffer.java
new file mode 100644
index 0000000..28e094f
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/buffer/UTF8Buffer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.ref.SoftReference;
+
+/**
+ * Simple Buffer type class used to hold data that is know to be in UTF8
+ * format.
+ */
+final public class UTF8Buffer extends Buffer {
+
+ private SoftReference<String> value = new SoftReference<String>(null);
+ private int hashCode;
+
+ public UTF8Buffer(Buffer other) {
+ super(other);
+ }
+
+ public UTF8Buffer(byte[] data, int offset, int length) {
+ super(data, offset, length);
+ }
+
+ public UTF8Buffer(byte[] data) {
+ super(data);
+ }
+
+ public UTF8Buffer(String input) {
+ super(encode(input));
+ }
+
+ //----- Implementations --------------------------------------------------//
+
+ @Override
+ public int compareTo(Buffer other) {
+ // Do a char comparison.. not a byte for byte comparison.
+ return toString().compareTo(other.toString());
+ }
+
+ @Override
+ public String toString() {
+ String result = value.get();
+ if (result == null) {
+ result = decode(this);
+ value = new SoftReference<String>(result);
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || obj.getClass() != UTF8Buffer.class) {
+ return false;
+ }
+
+ return equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = super.hashCode();
+ }
+
+ return hashCode;
+ }
+
+ //----- static convenience methods ---------------------------------------//
+
+ public static final byte[] encode(String input) {
+ try {
+ return input.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("A UnsupportedEncodingException was thrown for teh UTF-8 encoding. (This should never happen)");
+ }
+ }
+
+ static public String decode(Buffer buffer) {
+ try {
+ return new String(buffer.getData(), buffer.getOffset(), buffer.getLength(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("A UnsupportedEncodingException was thrown for teh UTF-8 encoding. (This should never happen)");
+ }
+ }
+}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java
index 6566f8f..418b1b9 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/BaseDataStreamMarshaller.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
+import org.apache.activemq.openwire.buffer.Buffer;
import org.apache.activemq.openwire.commands.DataStructure;
-import org.fusesource.hawtbuf.Buffer;
/**
* Root of all OpenWire marshalers.
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/OpenWireFormat.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/OpenWireFormat.java
index 9e0c063..e7c2e01 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/OpenWireFormat.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/OpenWireFormat.java
@@ -23,12 +23,12 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.apache.activemq.openwire.commands.CommandTypes;
import org.apache.activemq.openwire.commands.DataStructure;
import org.apache.activemq.openwire.commands.WireFormatInfo;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
/**
* The OpenWire Protocol Encoder and Decoder implementation.
@@ -146,16 +146,13 @@
}
bytesOut.writeByte(type);
dsm.looseMarshal(this, c, bytesOut);
- sequence = bytesOut.toBuffer();
if (!sizePrefixDisabled) {
- size = sequence.getLength() - 4;
- int length = sequence.length;
- int offset = sequence.offset;
- sequence.bigEndianEditor().writeInt(size);
- sequence.length = length;
- sequence.offset = offset;
+ size = bytesOut.size() - 4;
+ bytesOut.writeInt(0, size);
}
+
+ sequence = bytesOut.toBuffer();
}
} else {
bytesOut.restart(5);
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/MessageMarshaller.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/MessageMarshaller.java
index f7b91f8..783789e 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/MessageMarshaller.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/MessageMarshaller.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.openwire.codec.universal;
-import org.fusesource.hawtbuf.Buffer;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/WireFormatInfoMarshaller.java b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/WireFormatInfoMarshaller.java
index ca4c1fe..0da19fe 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/WireFormatInfoMarshaller.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/codec/universal/WireFormatInfoMarshaller.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.openwire.codec.universal;
-import org.fusesource.hawtbuf.Buffer;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
index 132ca54..422de3b 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
@@ -18,6 +18,7 @@
import static org.apache.activemq.openwire.codec.OpenWireConstants.ADIVSORY_MESSAGE_TYPE;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,13 +32,13 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireProperty;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.UTF8Buffer;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
/**
* Represents an ActiveMQ message
@@ -265,14 +266,14 @@
}
private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
- return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
+ return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new DataByteArrayInputStream(marshalledProperties)));
}
@Override
public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
// Need to marshal the properties.
if (marshalledProperties == null && properties != null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
OpenWireMarshallingSupport.marshalPrimitiveMap(properties, os);
os.close();
@@ -741,10 +742,13 @@
}
protected Buffer doDecompress() throws IOException {
+
+ // TODO
+
ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
InflaterInputStream inflater = new InflaterInputStream(input);
- ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
try {
byte[] buffer = new byte[8*1024];
int read = 0;
@@ -762,7 +766,7 @@
protected void doCompress() throws IOException {
compressed = true;
Buffer bytes = getContent();
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
OutputStream os = new DeflaterOutputStream(bytesOut);
os.write(bytes.data, bytes.offset, bytes.length);
os.close();
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java
index 1adbb0c..5a76dbe 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java
@@ -22,9 +22,9 @@
import java.util.zip.Inflater;
import org.apache.activemq.openwire.annotations.OpenWireType;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.BufferEditor;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
/**
* Provides an abstraction layer around the standard OpenWireMessage object for
@@ -74,7 +74,7 @@
if (compressed) {
return getBodyBytes().length;
} else if (content != null) {
- return content.length();
+ return content.getLength();
} else {
return 0;
}
@@ -143,11 +143,13 @@
protected Buffer doDecompress() throws IOException {
Buffer compressed = getContent();
Inflater inflater = new Inflater();
- ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+ DataByteArrayOutputStream decompressed = new DataByteArrayOutputStream();
try {
- BufferEditor editor = BufferEditor.big(compressed);
- int length = editor.readInt();
- compressed.offset = 0;
+ // Copy to avoid race on concurrent reads of compressed message payload.
+ compressed = new Buffer(compressed);
+ DataByteArrayInputStream compressedIn = new DataByteArrayInputStream(compressed);
+ int length = compressedIn.readInt();
+ compressedIn.close();
byte[] data = Arrays.copyOfRange(compressed.getData(), 4, compressed.getLength());
inflater.setInput(data);
byte[] buffer = new byte[length];
@@ -168,7 +170,7 @@
Buffer bytes = getContent();
if (bytes != null) {
int length = bytes.getLength();
- ByteArrayOutputStream compressed = new ByteArrayOutputStream();
+ DataByteArrayOutputStream compressed = new DataByteArrayOutputStream();
compressed.write(new byte[4]);
Deflater deflater = new Deflater();
try {
@@ -180,11 +182,8 @@
compressed.write(buffer, 0, count);
}
- bytes = compressed.toBuffer();
- bytes.bigEndianEditor().writeInt(length);
- bytes.offset = 0;
- bytes.length += 4;
- setContent(bytes);
+ compressed.writeInt(0, length);
+ setContent(compressed.toBuffer());
} finally {
deflater.end();
compressed.close();
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java
index fbc5fa5..0fbc86e 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java
@@ -31,13 +31,13 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.UTF8Buffer;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
/**
* openwire:marshaller code="25"
@@ -92,7 +92,7 @@
public void storeContent() {
try {
if (getContent() == null && !map.isEmpty()) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
OutputStream os = bytesOut;
if (isUseCompression()) {
compressed = true;
@@ -117,7 +117,7 @@
try {
if (getContent() != null && map.isEmpty()) {
Buffer content = getContent();
- InputStream is = new ByteArrayInputStream(content);
+ InputStream is = new DataByteArrayInputStream(content);
if (isCompressed()) {
is = new InflaterInputStream(is);
}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java
index e2b6a1f..4aff834 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java
@@ -25,9 +25,9 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.UTF8Buffer;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.UTF8Buffer;
/**
* Base implementation of a JMS Message object.
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java
index 3214556..3f3a0fe 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.openwire.commands;
import java.io.DataInputStream;
@@ -29,12 +28,12 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
import org.apache.activemq.openwire.utils.ObjectMessageInputStream;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
/**
* openwire:marshaller code="26"
@@ -72,7 +71,7 @@
Buffer bodyAsBytes = getContent();
if (bodyAsBytes == null && object != null) {
try {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
OutputStream os = bytesOut;
if (isUseCompression()) {
compressed = true;
@@ -148,7 +147,7 @@
if (object == null && getContent() != null) {
try {
Buffer content = getContent();
- InputStream is = new ByteArrayInputStream(content);
+ InputStream is = new DataByteArrayInputStream(content);
if (isCompressed()) {
is = new InflaterInputStream(is);
}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java
index 82db2e4..1f3c3ea 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java
@@ -25,11 +25,11 @@
import java.util.List;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
@OpenWireType(typeCode = 27)
public class OpenWireStreamMessage extends OpenWireMessage {
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java
index 1003148..335e18c 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java
@@ -24,12 +24,12 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.utils.IOExceptionSupport;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
@OpenWireType(typeCode = 28)
public class OpenWireTextMessage extends OpenWireMessage {
@@ -79,7 +79,7 @@
if (hasContent()) {
InputStream is = null;
try {
- is = new ByteArrayInputStream(getPayload());
+ is = new DataByteArrayInputStream(getPayload());
DataInputStream dataIn = new DataInputStream(is);
text = OpenWireMarshallingSupport.readUTF8(dataIn);
dataIn.close();
@@ -115,7 +115,7 @@
try {
Buffer content = getContent();
if (content == null && text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
OutputStream os = bytesOut;
DataOutputStream dataOut = new DataOutputStream(os);
OpenWireMarshallingSupport.writeUTF8(dataOut, this.text);
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
index a205aeb..3f7e8ea 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
@@ -27,12 +27,12 @@
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireProperty;
import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.UTF8Buffer;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
/**
* @openwire:marshaller code="1"
@@ -151,14 +151,14 @@
}
private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
- return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+ return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new DataByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
}
@Override
public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
// Need to marshal the properties.
if (marshalledProperties == null && properties != null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
OpenWireMarshallingSupport.marshalPrimitiveMap(properties, os);
os.close();
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
index 6ed8551..cc80576 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
@@ -21,11 +21,11 @@
import javax.transaction.xa.Xid;
-import org.apache.activemq.openwire.annotations.OpenWireType;
import org.apache.activemq.openwire.annotations.OpenWireExtension;
import org.apache.activemq.openwire.annotations.OpenWireProperty;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.annotations.OpenWireType;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
/**
* @openwire:marshaller code="112"
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/HexSupport.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/HexSupport.java
new file mode 100644
index 0000000..adf9710
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/HexSupport.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import org.apache.activemq.openwire.buffer.Buffer;
+
+/**
+ * Used to convert to hex from byte arrays and back.
+ */
+public final class HexSupport {
+
+ private static final String[] HEX_TABLE = new String[]{
+ "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f",
+ "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f",
+ "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f",
+ "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f",
+ "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f",
+ "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f",
+ "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f",
+ "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f",
+ "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f",
+ "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f",
+ "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af",
+ "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf",
+ "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf",
+ "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df",
+ "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef",
+ "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff",
+ };
+
+ private static final int[] INT_OFFSETS = new int[]{
+ 24,16,8,0
+ };
+
+ private HexSupport() {
+ }
+
+ /**
+ * Create a buffer from a previously encoded HEX String.
+ *
+ * @param hex
+ * the encoded string value of a buffer.
+ *
+ * @return a new Buffer instance with the decoded value.
+ */
+ public static Buffer toBufferFromHex(String hex) {
+ byte rc[] = new byte[hex.length() / 2];
+
+ for (int i = 0; i < rc.length; i++) {
+ String h = hex.substring(i * 2, i * 2 + 2);
+ int x = Integer.parseInt(h, 16);
+ rc[i] = (byte) x;
+ }
+
+ return new Buffer(rc);
+ }
+
+ /**
+ * Return a new String instance that represents the input buffer
+ * encoded in Hexadecimal form.
+ *
+ * @param buffer
+ * The Buffer to encode.
+ *
+ * @return the contents of the Buffer encoded as Hexadecimal String.
+ */
+ public static String toHexFromBuffer(Buffer buffer) {
+ byte[] data = buffer.data;
+ StringBuffer rc = new StringBuffer(buffer.length * 2);
+ int end = buffer.offset + buffer.length;
+
+ for (int i = buffer.offset; i < end; i++) {
+ rc.append(HEX_TABLE[0xFF & data[i]]);
+ }
+
+ return rc.toString();
+ }
+
+ /**
+ * Convert an Integer value into a string in Hexadecimal form.
+ *
+ * @param value
+ * The integer value to convert.
+ * @param trim
+ * True if the leading 0's should be trimmed off.
+ *
+ * @return a new String with the input value encoded as Hexadecimal.
+ */
+ public static String toHexFromInt(int value, boolean trim) {
+ StringBuffer rc = new StringBuffer(INT_OFFSETS.length*2);
+
+ for (int i = 0; i < INT_OFFSETS.length; i++) {
+ int b = 0xFF & (value>>INT_OFFSETS[i]);
+ if( !(trim && b == 0) ) {
+ rc.append(HEX_TABLE[b]);
+ trim=false;
+ }
+ }
+
+ return rc.toString();
+ }
+}
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
index 4189d12..807072e 100644
--- a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
@@ -26,9 +26,9 @@
import java.util.Map;
import java.util.Properties;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.UTF8Buffer;
/**
* The fixed version of the UTF8 encoding function. Some older JVM's UTF8
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
index 286fa22..d877415 100644
--- a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
@@ -25,9 +25,8 @@
import junit.framework.AssertionFailedError;
+import org.apache.activemq.openwire.buffer.Buffer;
import org.apache.activemq.openwire.codec.OpenWireFormat;
-import org.apache.activemq.openwire.commands.Command;
-import org.fusesource.hawtbuf.Buffer;
import org.junit.Before;
public abstract class DataStructureTestSupport {
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireMessageTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireMessageTest.java
index 30f8084..3cd11c3 100644
--- a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireMessageTest.java
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireMessageTest.java
@@ -26,8 +26,8 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.activemq.openwire.buffer.Buffer;
import org.apache.activemq.openwire.codec.OpenWireFormat;
-import org.fusesource.hawtbuf.Buffer;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireStreamMessageTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireStreamMessageTest.java
index cd3c299..a357045 100644
--- a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireStreamMessageTest.java
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireStreamMessageTest.java
@@ -26,15 +26,10 @@
import java.util.Collections;
import java.util.List;
-import org.apache.activemq.openwire.commands.CommandTypes;
-import org.apache.activemq.openwire.commands.OpenWireStreamMessage;
-import org.fusesource.hawtbuf.Buffer;
+import org.apache.activemq.openwire.buffer.Buffer;
import org.junit.Before;
import org.junit.Test;
-/**
- *
- */
public class OpenWireStreamMessageTest {
private final List<Object> elements = new ArrayList<Object>();
@@ -88,7 +83,7 @@
Buffer rawContent = message.getContent();
Buffer processedContent = message.getPayload();
- assertTrue(rawContent.length() < processedContent.length());
+ assertTrue(rawContent.getLength() < processedContent.getLength());
}
@Test
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireTextMessageTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireTextMessageTest.java
index 45a4487..18b7ca2 100644
--- a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireTextMessageTest.java
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireTextMessageTest.java
@@ -20,17 +20,13 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.activemq.openwire.buffer.Buffer;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.apache.activemq.openwire.utils.MarshallingSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.junit.Test;
-/**
- *
- */
public class OpenWireTextMessageTest {
@Test
@@ -108,10 +104,9 @@
}
void setContent(Message message, String text) throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dataOut = new DataOutputStream(baos);
- MarshallingSupport.writeUTF8(dataOut, text);
- dataOut.close();
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ MarshallingSupport.writeUTF8(baos, text);
+ baos.close();
message.setContent(baos.toBuffer());
}
}
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupportTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupportTest.java
index f40a31a..886c7ae 100644
--- a/openwire-core/src/test/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupportTest.java
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupportTest.java
@@ -21,9 +21,8 @@
import java.io.DataInputStream;
-import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayInputStream;
+import org.apache.activemq.openwire.buffer.DataByteArrayOutputStream;
import org.junit.Test;
public class OpenWireMarshallingSupportTest {
diff --git a/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/builtin/UniversalMarshallerGenerator.java b/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/builtin/UniversalMarshallerGenerator.java
index 2d0bec4..6ca91fb 100644
--- a/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/builtin/UniversalMarshallerGenerator.java
+++ b/openwire-generator/src/main/java/org/apache/activemq/openwire/generator/builtin/UniversalMarshallerGenerator.java
@@ -101,9 +101,10 @@
final Class<?> type = property.getType();
if (type.getCanonicalName().startsWith("java.util")) {
languageTypes.add(type.getCanonicalName());
- } else if (type.getCanonicalName().startsWith("org.fusesource.")) {
- languageTypes.add(type.getCanonicalName());
}
+// } else if (type.getCanonicalName().startsWith("org.fusesource.")) {
+// languageTypes.add(type.getCanonicalName());
+// }
}
for (String languageType : languageTypes) {
diff --git a/openwire-interop-tests/pom.xml b/openwire-interop-tests/pom.xml
index 0a8ff01..3a4a9d4 100644
--- a/openwire-interop-tests/pom.xml
+++ b/openwire-interop-tests/pom.xml
@@ -44,10 +44,6 @@
<artifactId>openwire-legacy</artifactId>
</dependency>
<dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf</artifactId>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
index fed1abd..ff1a57c 100644
--- a/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
+++ b/openwire-interop-tests/src/test/java/org/apache/activemq/openwire/codec/WireFormatInfoMarshaledSizeTest.java
@@ -25,11 +25,9 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import org.apache.activemq.openwire.codec.OpenWireFormat;
-import org.apache.activemq.openwire.codec.OpenWireFormatFactory;
+import org.apache.activemq.openwire.buffer.Buffer;
import org.apache.activemq.openwire.commands.WireFormatInfo;
import org.apache.activemq.util.ByteSequence;
-import org.fusesource.hawtbuf.Buffer;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
diff --git a/openwire-legacy/pom.xml b/openwire-legacy/pom.xml
index f5b974d..9695616 100644
--- a/openwire-legacy/pom.xml
+++ b/openwire-legacy/pom.xml
@@ -39,10 +39,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>openwire-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf</artifactId>
- </dependency>
</dependencies>
<build>
diff --git a/pom.xml b/pom.xml
index 85b0bec..755297d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,6 @@
<ant-version>1.9.6</ant-version>
<junit-version>4.12</junit-version>
<slf4j-version>1.7.13</slf4j-version>
- <hawtbuf-version>1.11</hawtbuf-version>
<activemq-version>5.12.1</activemq-version>
<jetty-version>8.1.15.v20140411</jetty-version>
<mockito-version>1.10.19</mockito-version>
@@ -166,11 +165,6 @@
<version>${slf4j-version}</version>
</dependency>
<dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf</artifactId>
- <version>${hawtbuf-version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
<version>${ant-version}</version>