blob: 1209abfbc07946ef06f08448c3331110ca7a4d4a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.bytebuffer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.ScatteringByteChannel;
import java.util.ArrayList;
import java.util.List;
import com.google.common.primitives.Chars;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.primitives.Shorts;
class MultiQpidByteBuffer implements QpidByteBuffer
{
private final SingleQpidByteBuffer[] _fragments;
private volatile int _resetFragmentIndex = -1;
private MultiQpidByteBuffer(final SingleQpidByteBuffer... fragments)
{
if (fragments == null)
{
throw new IllegalArgumentException();
}
_fragments = fragments;
}
MultiQpidByteBuffer(final List<SingleQpidByteBuffer> fragments)
{
if (fragments == null)
{
throw new IllegalArgumentException();
}
_fragments = fragments.toArray(new SingleQpidByteBuffer[fragments.size()]);
}
//////////////////
// Absolute puts
//////////////////
@Override
public QpidByteBuffer put(final int index, final byte b)
{
return put(index, new byte[]{b});
}
@Override
public QpidByteBuffer putShort(final int index, final short value)
{
byte[] valueArray = Shorts.toByteArray(value);
return put(index, valueArray);
}
@Override
public QpidByteBuffer putChar(final int index, final char value)
{
byte[] valueArray = Chars.toByteArray(value);
return put(index, valueArray);
}
@Override
public QpidByteBuffer putInt(final int index, final int value)
{
byte[] valueArray = Ints.toByteArray(value);
return put(index, valueArray);
}
@Override
public QpidByteBuffer putLong(final int index, final long value)
{
byte[] valueArray = Longs.toByteArray(value);
return put(index, valueArray);
}
@Override
public QpidByteBuffer putFloat(final int index, final float value)
{
int intValue = Float.floatToRawIntBits(value);
return putInt(index, intValue);
}
@Override
public QpidByteBuffer putDouble(final int index, final double value)
{
long longValue = Double.doubleToRawLongBits(value);
return putLong(index, longValue);
}
private QpidByteBuffer put(final int index, final byte[] src)
{
final int valueWidth = src.length;
if (index < 0 || index + valueWidth > limit())
{
throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit()));
}
int written = 0;
int bytesToSkip = index;
for (int i = 0; i < _fragments.length && written != valueWidth; i++)
{
final SingleQpidByteBuffer buffer = _fragments[i];
final int limit = buffer.limit();
boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit;
if (!isLastFragmentToConsider && limit != buffer.capacity())
{
throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i));
}
if (bytesToSkip >= limit)
{
bytesToSkip -= limit;
}
else
{
final int bytesToCopy = Math.min(limit - bytesToSkip, valueWidth - written);
final int originalPosition = buffer.position();
buffer.position(bytesToSkip);
buffer.put(src, written, bytesToCopy);
buffer.position(originalPosition);
written += bytesToCopy;
bytesToSkip = 0;
}
}
if (valueWidth != written)
{
throw new BufferOverflowException();
}
return this;
}
////////////////
// Relative Puts
////////////////
@Override
public final QpidByteBuffer put(final byte b)
{
return put(new byte[]{b});
}
@Override
public final QpidByteBuffer putUnsignedByte(final short s)
{
put((byte) s);
return this;
}
@Override
public final QpidByteBuffer putShort(final short value)
{
byte[] valueArray = Shorts.toByteArray(value);
return put(valueArray);
}
@Override
public final QpidByteBuffer putUnsignedShort(final int i)
{
putShort((short) i);
return this;
}
@Override
public final QpidByteBuffer putChar(final char value)
{
byte[] valueArray = Chars.toByteArray(value);
return put(valueArray);
}
@Override
public final QpidByteBuffer putInt(final int value)
{
byte[] valueArray = Ints.toByteArray(value);
return put(valueArray);
}
@Override
public final QpidByteBuffer putUnsignedInt(final long value)
{
putInt((int) value);
return this;
}
@Override
public final QpidByteBuffer putLong(final long value)
{
byte[] valueArray = Longs.toByteArray(value);
return put(valueArray);
}
@Override
public final QpidByteBuffer putFloat(final float value)
{
int intValue = Float.floatToRawIntBits(value);
return putInt(intValue);
}
@Override
public final QpidByteBuffer putDouble(final double value)
{
long longValue = Double.doubleToRawLongBits(value);
return putLong(longValue);
}
@Override
public final QpidByteBuffer put(byte[] src)
{
return put(src, 0, src.length);
}
@Override
public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
{
if (!hasRemaining(length))
{
throw new BufferOverflowException();
}
int written = 0;
for (int i = 0; i < _fragments.length && written != length; i++)
{
final SingleQpidByteBuffer buffer = _fragments[i];
int bytesToWrite = Math.min(buffer.remaining(), length - written);
buffer.put(src, offset + written, bytesToWrite);
written += bytesToWrite;
}
if (written != length)
{
throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length));
}
return this;
}
@Override
public final QpidByteBuffer put(final ByteBuffer src)
{
final int valueWidth = src.remaining();
if (!hasRemaining(valueWidth))
{
throw new BufferOverflowException();
}
int written = 0;
for (int i = 0; i < _fragments.length && written != valueWidth; i++)
{
final SingleQpidByteBuffer dstFragment = _fragments[i];
if (dstFragment.hasRemaining())
{
final int srcFragmentRemaining = src.remaining();
final int dstFragmentRemaining = dstFragment.remaining();
if (dstFragmentRemaining >= srcFragmentRemaining)
{
dstFragment.put(src);
written += srcFragmentRemaining;
}
else
{
int srcOriginalLimit = src.limit();
src.limit(src.position() + dstFragmentRemaining);
dstFragment.put(src);
src.limit(srcOriginalLimit);
written += dstFragmentRemaining;
}
}
}
if (written != valueWidth)
{
throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth));
}
return this;
}
@Override
public final QpidByteBuffer put(final QpidByteBuffer qpidByteBuffer)
{
final int valueWidth = qpidByteBuffer.remaining();
if (!hasRemaining(valueWidth))
{
throw new BufferOverflowException();
}
int written = 0;
final SingleQpidByteBuffer[] fragments;
if (qpidByteBuffer instanceof SingleQpidByteBuffer)
{
final SingleQpidByteBuffer srcFragment = (SingleQpidByteBuffer) qpidByteBuffer;
for (int i = 0; i < _fragments.length && written != valueWidth; i++)
{
final SingleQpidByteBuffer dstFragment = _fragments[i];
if (dstFragment.hasRemaining())
{
final int dstFragmentRemaining = dstFragment.remaining();
if (dstFragmentRemaining >= valueWidth)
{
dstFragment.put(srcFragment);
written += valueWidth;
}
else
{
int srcOriginalLimit = srcFragment.limit();
srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
dstFragment.put(srcFragment);
srcFragment.limit(srcOriginalLimit);
written += dstFragmentRemaining;
}
}
}
}
else if (qpidByteBuffer instanceof MultiQpidByteBuffer)
{
fragments = ((MultiQpidByteBuffer) qpidByteBuffer)._fragments;
int i = 0;
for (int i1 = 0; i1 < fragments.length; i1++)
{
final SingleQpidByteBuffer srcFragment = fragments[i1];
for (; i < _fragments.length; i++)
{
final SingleQpidByteBuffer dstFragment = _fragments[i];
if (dstFragment.hasRemaining())
{
final int srcFragmentRemaining = srcFragment.remaining();
final int dstFragmentRemaining = dstFragment.remaining();
if (dstFragmentRemaining >= srcFragmentRemaining)
{
dstFragment.put(srcFragment);
written += srcFragmentRemaining;
break;
}
else
{
int srcOriginalLimit = srcFragment.limit();
srcFragment.limit(srcFragment.position() + dstFragmentRemaining);
dstFragment.put(srcFragment);
srcFragment.limit(srcOriginalLimit);
written += dstFragmentRemaining;
}
}
}
}
}
else
{
throw new IllegalStateException("unknown QBB implementation");
}
if (written != valueWidth)
{
throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.",
written,
valueWidth));
}
return this;
}
///////////////////
// Absolute Gets
///////////////////
@Override
public byte get(final int index)
{
final byte[] byteArray = getByteArray(index, 1);
return byteArray[0];
}
@Override
public short getShort(final int index)
{
final byte[] byteArray = getByteArray(index, 2);
return Shorts.fromByteArray(byteArray);
}
@Override
public final int getUnsignedShort(int index)
{
return ((int) getShort(index)) & 0xFFFF;
}
@Override
public char getChar(final int index)
{
final byte[] byteArray = getByteArray(index, 2);
return Chars.fromByteArray(byteArray);
}
@Override
public int getInt(final int index)
{
final byte[] byteArray = getByteArray(index, 4);
return Ints.fromByteArray(byteArray);
}
@Override
public long getLong(final int index)
{
final byte[] byteArray = getByteArray(index, 8);
return Longs.fromByteArray(byteArray);
}
@Override
public float getFloat(final int index)
{
final int intValue = getInt(index);
return Float.intBitsToFloat(intValue);
}
@Override
public double getDouble(final int index)
{
final long longValue = getLong(index);
return Double.longBitsToDouble(longValue);
}
private byte[] getByteArray(final int index, final int length)
{
if (index < 0 || index + length > limit())
{
throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit()));
}
byte[] value = new byte[length];
int consumed = 0;
int bytesToSkip = index;
for (int i = 0; i < _fragments.length && consumed != length; i++)
{
final SingleQpidByteBuffer buffer = _fragments[i];
final int limit = buffer.limit();
boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit;
if (!isLastFragmentToConsider && limit != buffer.capacity())
{
throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i));
}
if (bytesToSkip >= limit)
{
bytesToSkip -= limit;
}
else
{
final int bytesToCopy = Math.min(limit - bytesToSkip, length - consumed);
final int originalPosition = buffer.position();
buffer.position(bytesToSkip);
buffer.get(value, consumed, bytesToCopy);
buffer.position(originalPosition);
consumed += bytesToCopy;
bytesToSkip = 0;
}
}
if (consumed != length)
{
throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
}
return value;
}
//////////////////
// Relative Gets
//////////////////
@Override
public final byte get()
{
byte[] value = new byte[1];
get(value, 0, 1);
return value[0];
}
@Override
public final short getUnsignedByte()
{
return (short) (get() & 0xFF);
}
@Override
public final short getShort()
{
byte[] value = new byte[2];
get(value, 0, value.length);
return Shorts.fromByteArray(value);
}
@Override
public final int getUnsignedShort()
{
return ((int) getShort()) & 0xFFFF;
}
@Override
public final char getChar()
{
byte[] value = new byte[2];
get(value, 0, value.length);
return Chars.fromByteArray(value);
}
@Override
public final int getInt()
{
byte[] value = new byte[4];
get(value, 0, value.length);
return Ints.fromByteArray(value);
}
@Override
public final long getUnsignedInt()
{
return ((long) getInt()) & 0xFFFFFFFFL;
}
@Override
public final long getLong()
{
byte[] value = new byte[8];
get(value, 0, value.length);
return Longs.fromByteArray(value);
}
@Override
public final float getFloat()
{
final int intValue = getInt();
return Float.intBitsToFloat(intValue);
}
@Override
public final double getDouble()
{
final long longValue = getLong();
return Double.longBitsToDouble(longValue);
}
@Override
public final QpidByteBuffer get(final byte[] dst)
{
return get(dst, 0, dst.length);
}
@Override
public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
{
if (!hasRemaining(length))
{
throw new BufferUnderflowException();
}
int consumed = 0;
for (int i = 0; i < _fragments.length && consumed != length; i++)
{
final SingleQpidByteBuffer buffer = _fragments[i];
int bytesToCopy = Math.min(buffer.remaining(), length - consumed);
buffer.get(dst, offset + consumed, bytesToCopy);
consumed += bytesToCopy;
}
if (consumed != length)
{
throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length));
}
return this;
}
///////////////
// Other stuff
////////////////
@Override
public final void copyTo(final byte[] dst)
{
final int remaining = remaining();
if (remaining < dst.length)
{
throw new BufferUnderflowException();
}
if (remaining > dst.length)
{
throw new BufferOverflowException();
}
int offset = 0;
for (SingleQpidByteBuffer fragment : _fragments)
{
final int length = Math.min(fragment.remaining(), dst.length - offset);
fragment.getUnderlyingBuffer().duplicate().get(dst, offset, length);
offset += length;
}
}
@Override
public final void copyTo(final ByteBuffer dst)
{
if (dst.remaining() < remaining())
{
throw new BufferOverflowException();
}
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
dst.put(fragment.getUnderlyingBuffer().duplicate());
}
}
@Override
public final void putCopyOf(final QpidByteBuffer qpidByteBuffer)
{
int sourceRemaining = qpidByteBuffer.remaining();
if (!hasRemaining(sourceRemaining))
{
throw new BufferOverflowException();
}
if (qpidByteBuffer instanceof MultiQpidByteBuffer)
{
MultiQpidByteBuffer source = (MultiQpidByteBuffer) qpidByteBuffer;
for (int i = 0, fragmentsSize = source._fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer srcFragment = source._fragments[i];
put(srcFragment.getUnderlyingBuffer().duplicate());
}
}
else if (qpidByteBuffer instanceof SingleQpidByteBuffer)
{
SingleQpidByteBuffer source = (SingleQpidByteBuffer) qpidByteBuffer;
put(source.getUnderlyingBuffer().duplicate());
}
else
{
throw new IllegalStateException("unknown QBB implementation");
}
}
@Override
public final boolean isDirect()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
if (!fragment.isDirect())
{
return false;
}
}
return true;
}
@Override
public final void close()
{
dispose();
}
@Override
public final void dispose()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
fragment.dispose();
}
}
@Override
public final InputStream asInputStream()
{
return new QpidByteBufferInputStream(this);
}
@Override
public final long read(ScatteringByteChannel channel) throws IOException
{
ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
for (int i = 0; i < byteBuffers.length; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
byteBuffers[i] = fragment.getUnderlyingBuffer();
}
return channel.read(byteBuffers);
}
@Override
public String toString()
{
return "QpidByteBuffer{" + _fragments.length + " fragments}";
}
@Override
public QpidByteBuffer reset()
{
if (_resetFragmentIndex < 0)
{
throw new InvalidMarkException();
}
final SingleQpidByteBuffer fragment = _fragments[_resetFragmentIndex];
fragment.reset();
for (int i = _resetFragmentIndex + 1, size = _fragments.length; i < size; ++i)
{
_fragments[i].position(0);
}
return this;
}
@Override
public QpidByteBuffer rewind()
{
_resetFragmentIndex = -1;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
fragment.rewind();
}
return this;
}
@Override
public final boolean hasArray()
{
return false;
}
@Override
public byte[] array()
{
throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array.");
}
@Override
public QpidByteBuffer clear()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
_fragments[i].clear();
}
return this;
}
@Override
public QpidByteBuffer compact()
{
int position = position();
int limit = limit();
if (position != 0)
{
int dstPos = 0;
for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos)
{
put(dstPos, get(srcPos));
}
position(dstPos);
limit(capacity());
}
_resetFragmentIndex = -1;
return this;
}
@Override
public int position()
{
int totalPosition = 0;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
totalPosition += fragment.position();
if (fragment.position() != fragment.limit())
{
break;
}
}
return totalPosition;
}
@Override
public QpidByteBuffer position(int newPosition)
{
if (newPosition < 0 || newPosition > limit())
{
throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit()));
}
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
final int fragmentLimit = fragment.limit();
if (newPosition <= fragmentLimit)
{
fragment.position(newPosition);
newPosition = 0;
}
else
{
if (fragmentLimit != fragment.capacity())
{
throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d",
i,
fragmentLimit,
fragment.capacity()));
}
fragment.position(fragmentLimit);
newPosition -= fragmentLimit;
}
}
return this;
}
@Override
public int limit()
{
int totalLimit = 0;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
final int fragmentLimit = fragment.limit();
totalLimit += fragmentLimit;
if (fragmentLimit != fragment.capacity())
{
break;
}
}
return totalLimit;
}
@Override
public QpidByteBuffer limit(int newLimit)
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
final int fragmentCapacity = fragment.capacity();
final int fragmentLimit = Math.min(newLimit, fragmentCapacity);
fragment.limit(fragmentLimit);
newLimit -= fragmentLimit;
}
return this;
}
@Override
public final QpidByteBuffer mark()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
if (fragment.position() != fragment.limit())
{
fragment.mark();
_resetFragmentIndex = i;
return this;
}
}
_resetFragmentIndex = _fragments.length - 1;
_fragments[_resetFragmentIndex].mark();
return this;
}
@Override
public final int remaining()
{
int remaining = 0;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
remaining += fragment.remaining();
}
return remaining;
}
@Override
public final boolean hasRemaining()
{
return hasRemaining(1);
}
@Override
public final boolean hasRemaining(int atLeast)
{
if (atLeast == 0)
{
return true;
}
int remaining = 0;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
remaining += fragment.remaining();
if (remaining >= atLeast)
{
return true;
}
}
return false;
}
@Override
public QpidByteBuffer flip()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
fragment.flip();
}
return this;
}
@Override
public int capacity()
{
int totalCapacity = 0;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
totalCapacity += _fragments[i].capacity();
}
return totalCapacity;
}
@Override
public QpidByteBuffer duplicate()
{
final SingleQpidByteBuffer[] fragments = new SingleQpidByteBuffer[_fragments.length];
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
fragments[i] =_fragments[i].duplicate();
}
MultiQpidByteBuffer duplicate = new MultiQpidByteBuffer(fragments);
duplicate._resetFragmentIndex = _resetFragmentIndex;
return duplicate;
}
@Override
public QpidByteBuffer slice()
{
return view(0, remaining());
}
@Override
public QpidByteBuffer view(int offset, int length)
{
if (offset + length > remaining())
{
throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining()));
}
final List<SingleQpidByteBuffer> fragments = new ArrayList<>(_fragments.length);
boolean firstFragmentToBeConsidered = true;
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize && length > 0; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
if (fragment.hasRemaining())
{
if (!firstFragmentToBeConsidered && fragment.position() != 0)
{
throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i));
}
firstFragmentToBeConsidered = false;
final int fragmentRemaining = fragment.remaining();
if (fragmentRemaining > offset)
{
final int fragmentViewLength = Math.min(fragmentRemaining - offset, length);
fragments.add(fragment.view(offset, fragmentViewLength));
length -= fragmentViewLength;
offset = 0;
}
else
{
offset -= fragmentRemaining;
}
}
}
return QpidByteBufferFactory.createQpidByteBuffer(fragments);
}
@Override
public boolean isSparse()
{
for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++)
{
final SingleQpidByteBuffer fragment = _fragments[i];
if (fragment.isSparse())
{
return true;
}
}
return false;
}
SingleQpidByteBuffer[] getFragments()
{
return _fragments;
}
ByteBuffer[] getUnderlyingBuffers()
{
ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length];
for (int i = 0; i < _fragments.length; i++)
{
byteBuffers[i] = _fragments[i].getUnderlyingBuffer();
}
return byteBuffers;
}
}