blob: fd74860abfec8e1474ac9a314043e32ab65a0f85 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.bytebuffer;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
final class QpidByteBufferFactory
{
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private static final QpidByteBuffer EMPTY_QPID_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
private static final ThreadLocal<SingleQpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
private volatile static boolean _isPoolInitialized;
private volatile static BufferPool _bufferPool;
private volatile static int _pooledBufferSize;
private volatile static double _sparsityFraction;
private volatile static ByteBuffer _zeroed;
static QpidByteBuffer allocate(boolean direct, int size)
{
return direct ? allocateDirect(size) : allocate(size);
}
static QpidByteBuffer allocate(int size)
{
return new SingleQpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
}
static QpidByteBuffer allocateDirect(int size)
{
if (size < 0)
{
throw new IllegalArgumentException("Cannot allocate QpidByteBufferFragment with size "
+ size
+ " which is negative.");
}
if (_isPoolInitialized)
{
if (size <= _pooledBufferSize)
{
return allocateDirectSingle(size);
}
else
{
List<SingleQpidByteBuffer> fragments = new ArrayList<>();
int allocatedSize = 0;
while (size - allocatedSize >= _pooledBufferSize)
{
fragments.add(allocateDirectSingle(_pooledBufferSize));
allocatedSize += _pooledBufferSize;
}
if (allocatedSize != size)
{
fragments.add(allocateDirectSingle(size - allocatedSize));
}
return new MultiQpidByteBuffer(fragments);
}
}
else
{
return allocate(size);
}
}
static QpidByteBuffer asQpidByteBuffer(InputStream stream) throws IOException
{
final List<SingleQpidByteBuffer> fragments = new ArrayList<>();
final int pooledBufferSize = getPooledBufferSize();
byte[] transferBuf = new byte[pooledBufferSize];
int readFragment = 0;
int read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
while (read > 0)
{
readFragment += read;
if (readFragment == pooledBufferSize)
{
SingleQpidByteBuffer fragment = allocateDirectSingle(pooledBufferSize);
fragment.put(transferBuf, 0, pooledBufferSize);
fragment.flip();
fragments.add(fragment);
readFragment = 0;
}
read = stream.read(transferBuf, readFragment, pooledBufferSize - readFragment);
}
if (readFragment != 0)
{
SingleQpidByteBuffer fragment = allocateDirectSingle(readFragment);
fragment.put(transferBuf, 0, readFragment);
fragment.flip();
fragments.add(fragment);
}
return createQpidByteBuffer(fragments);
}
private static QpidByteBuffer asQpidByteBuffer(final byte[] data, final int offset, final int length)
{
try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, QpidByteBuffer.getPooledBufferSize()))
{
outputStream.write(data, offset, length);
return outputStream.fetchAccumulatedBuffer();
}
catch (IOException e)
{
throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
}
}
private static ByteBuffer[] getUnderlyingBuffers(QpidByteBuffer buffer)
{
if (buffer instanceof SingleQpidByteBuffer)
{
return new ByteBuffer[] {((SingleQpidByteBuffer) buffer).getUnderlyingBuffer()};
}
else if (buffer instanceof MultiQpidByteBuffer)
{
return ((MultiQpidByteBuffer) buffer).getUnderlyingBuffers();
}
else
{
throw new IllegalStateException("Unknown Buffer Implementation");
}
}
static SSLEngineResult encryptSSL(SSLEngine engine,
Collection<QpidByteBuffer> buffers,
QpidByteBuffer dest) throws SSLException
{
if (dest instanceof SingleQpidByteBuffer)
{
SingleQpidByteBuffer dst = (SingleQpidByteBuffer) dest;
final ByteBuffer[] src;
// QPID-7447: prevent unnecessary allocations
if (buffers.isEmpty())
{
src = EMPTY_BYTE_BUFFER_ARRAY;
}
else
{
List<ByteBuffer> buffers_ = new LinkedList<>();
for (QpidByteBuffer buffer : buffers)
{
Collections.addAll(buffers_, getUnderlyingBuffers(buffer));
}
src = buffers_.toArray(new ByteBuffer[buffers_.size()]);
}
return engine.wrap(src, dst.getUnderlyingBuffer());
}
else
{
throw new IllegalStateException("Expected a single fragment output buffer");
}
}
static SSLEngineResult decryptSSL(final SSLEngine engine, final QpidByteBuffer src, final QpidByteBuffer dst)
throws SSLException
{
if (src instanceof SingleQpidByteBuffer)
{
ByteBuffer underlying = ((SingleQpidByteBuffer)src).getUnderlyingBuffer();
if (dst instanceof SingleQpidByteBuffer)
{
return engine.unwrap(underlying, ((SingleQpidByteBuffer) dst).getUnderlyingBuffer());
}
else if (dst instanceof MultiQpidByteBuffer)
{
return engine.unwrap(underlying, ((MultiQpidByteBuffer) dst).getUnderlyingBuffers());
}
else
{
throw new IllegalStateException("unknown QBB implementation");
}
}
else
{
throw new IllegalStateException("Source QBB can only be single byte buffer");
}
}
static QpidByteBuffer inflate(QpidByteBuffer compressedBuffer) throws IOException
{
if (compressedBuffer == null)
{
throw new IllegalArgumentException("compressedBuffer cannot be null");
}
boolean isDirect = compressedBuffer.isDirect();
final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
List<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
try (GZIPInputStream gzipInputStream = new GZIPInputStream(compressedBuffer.asInputStream()))
{
byte[] buf = new byte[bufferSize];
int read;
while ((read = gzipInputStream.read(buf)) != -1)
{
uncompressedBuffers.add(asQpidByteBuffer(buf, 0, read));
}
return concatenate(uncompressedBuffers);
}
finally
{
uncompressedBuffers.forEach(QpidByteBuffer::dispose);
}
}
static QpidByteBuffer deflate(QpidByteBuffer uncompressedBuffer) throws IOException
{
if (uncompressedBuffer == null)
{
throw new IllegalArgumentException("uncompressedBuffer cannot be null");
}
boolean isDirect = uncompressedBuffer.isDirect();
final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
try (QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
InputStream compressedInput = uncompressedBuffer.asInputStream();
GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput,
bufferSize)))
{
byte[] buf = new byte[16384];
int read;
while ((read = compressedInput.read(buf)) > -1)
{
gzipStream.write(buf, 0, read);
}
gzipStream.finish();
gzipStream.flush();
return compressedOutput.fetchAccumulatedBuffer();
}
}
static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
throws IOException
{
List<ByteBuffer> byteBuffers = new ArrayList<>();
for (QpidByteBuffer qpidByteBuffer : qpidByteBuffers)
{
Collections.addAll(byteBuffers, getUnderlyingBuffers(qpidByteBuffer));
}
return channel.write(byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
}
static QpidByteBuffer wrap(ByteBuffer wrap)
{
return new SingleQpidByteBuffer(new NonPooledByteBufferRef(wrap));
}
static QpidByteBuffer wrap(byte[] data)
{
return wrap(ByteBuffer.wrap(data));
}
static QpidByteBuffer wrap(byte[] data, int offset, int length)
{
return wrap(ByteBuffer.wrap(data, offset, length));
}
static void initialisePool(int bufferSize, int maxPoolSize, double sparsityFraction)
{
if (_isPoolInitialized && (bufferSize != _pooledBufferSize
|| maxPoolSize != _bufferPool.getMaxSize()
|| sparsityFraction != _sparsityFraction))
{
final String errorMessage = String.format(
"QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f."
+
"Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
_pooledBufferSize,
_bufferPool.getMaxSize(),
_sparsityFraction,
bufferSize,
maxPoolSize);
throw new IllegalStateException(errorMessage);
}
if (bufferSize <= 0)
{
throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
}
_bufferPool = new BufferPool(maxPoolSize);
_pooledBufferSize = bufferSize;
_zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
_sparsityFraction = sparsityFraction;
_isPoolInitialized = true;
}
/**
* Test use only
*/
static void deinitialisePool()
{
if (_isPoolInitialized)
{
SingleQpidByteBuffer singleQpidByteBuffer = _cachedBuffer.get();
if (singleQpidByteBuffer != null)
{
singleQpidByteBuffer.dispose();
_cachedBuffer.remove();
}
_bufferPool = null;
_pooledBufferSize = -1;
_isPoolInitialized = false;
_sparsityFraction = 1.0;
_zeroed = null;
}
}
static void returnToPool(final ByteBuffer buffer)
{
buffer.clear();
if (_isPoolInitialized)
{
final ByteBuffer duplicate = _zeroed.duplicate();
duplicate.limit(buffer.capacity());
buffer.put(duplicate);
_bufferPool.returnBuffer(buffer);
}
}
static double getSparsityFraction()
{
return _sparsityFraction;
}
static int getPooledBufferSize()
{
return _pooledBufferSize;
}
static long getAllocatedDirectMemorySize()
{
return (long)_pooledBufferSize * getNumberOfBuffersInUse();
}
static int getNumberOfBuffersInUse()
{
return PooledByteBufferRef.getActiveBufferCount();
}
static int getNumberOfBuffersInPool()
{
return _bufferPool.size();
}
static long getPooledBufferDisposalCounter()
{
return PooledByteBufferRef.getDisposalCounter();
}
static QpidByteBuffer reallocateIfNecessary(QpidByteBuffer data)
{
if (data != null && data.isDirect() && data.isSparse())
{
QpidByteBuffer newBuf = allocateDirect(data.remaining());
newBuf.put(data);
newBuf.flip();
data.dispose();
return newBuf;
}
else
{
return data;
}
}
static QpidByteBuffer concatenate(List<QpidByteBuffer> buffers)
{
final List<SingleQpidByteBuffer> fragments = new ArrayList<>(buffers.size());
for (QpidByteBuffer buffer : buffers)
{
if (buffer instanceof SingleQpidByteBuffer)
{
if (buffer.hasRemaining())
{
fragments.add((SingleQpidByteBuffer) buffer.slice());
}
}
else if (buffer instanceof MultiQpidByteBuffer)
{
for (final SingleQpidByteBuffer fragment : ((MultiQpidByteBuffer) buffer).getFragments())
{
if (fragment.hasRemaining())
{
fragments.add(fragment.slice());
}
}
}
else
{
throw new IllegalStateException("unknown QBB implementation");
}
}
return createQpidByteBuffer(fragments);
}
static QpidByteBuffer createQpidByteBuffer(final List<SingleQpidByteBuffer> fragments)
{
if (fragments.size() == 0)
{
return emptyQpidByteBuffer();
}
else if (fragments.size() == 1)
{
return fragments.get(0);
}
else
{
return new MultiQpidByteBuffer(fragments);
}
}
static QpidByteBuffer concatenate(QpidByteBuffer... buffers)
{
return concatenate(Arrays.asList(buffers));
}
static QpidByteBuffer emptyQpidByteBuffer()
{
return EMPTY_QPID_BYTE_BUFFER.duplicate();
}
static ThreadFactory createQpidByteBufferTrackingThreadFactory(ThreadFactory factory)
{
return r -> factory.newThread(() -> {
try
{
r.run();
}
finally
{
final SingleQpidByteBuffer cachedThreadLocalBuffer = _cachedBuffer.get();
if (cachedThreadLocalBuffer != null)
{
cachedThreadLocalBuffer.dispose();
_cachedBuffer.remove();
}
}
});
}
private static SingleQpidByteBuffer allocateDirectSingle(int size)
{
if (size < 0)
{
throw new IllegalArgumentException("Cannot allocate SingleQpidByteBuffer with size "
+ size
+ " which is negative.");
}
final ByteBufferRef ref;
if (_isPoolInitialized && _pooledBufferSize >= size)
{
if (_pooledBufferSize == size)
{
ByteBuffer buf = _bufferPool.getBuffer();
if (buf == null)
{
buf = ByteBuffer.allocateDirect(size);
}
ref = new PooledByteBufferRef(buf);
}
else
{
SingleQpidByteBuffer buf = _cachedBuffer.get();
if (buf == null || buf.remaining() < size)
{
if (buf != null)
{
buf.dispose();
}
buf = allocateDirectSingle(_pooledBufferSize);
_cachedBuffer.set(buf);
}
SingleQpidByteBuffer rVal = buf.view(0, size);
buf.position(buf.position() + size);
return rVal;
}
}
else
{
ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
}
return new SingleQpidByteBuffer(ref);
}
}