blob: 74a2e4d536684315b40ee505c94430eabfafefdb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.net;
import java.lang.ref.SoftReference;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
public class BufferPool {
private final DMStats stats;
private static final Logger logger = LogService.getLogger();
private Method parentOfSliceMethod;
/**
* Buffers may be acquired from the Buffers pool
* or they may be allocated using Buffer.allocate(). This enum is used
* to note the different types. Tracked buffers come from the Buffers pool
* and need to be released when we're done using them.
*/
public enum BufferType {
UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER
}
public BufferPool(DMStats stats) {
this.stats = stats;
}
/**
* A list of soft references to small byte buffers.
*/
private final ConcurrentLinkedQueue<BBSoftReference> bufferSmallQueue =
new ConcurrentLinkedQueue<>();
/**
* A list of soft references to middle byte buffers.
*/
private final ConcurrentLinkedQueue<BBSoftReference> bufferMiddleQueue =
new ConcurrentLinkedQueue<>();
/**
* A list of soft references to large byte buffers.
*/
private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue =
new ConcurrentLinkedQueue<>();
static final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
static final int MEDIUM_BUFFER_SIZE = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
/**
* use direct ByteBuffers instead of heap ByteBuffers for NIO operations
*/
public static final boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers")
|| Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "BufferPool.useHeapBuffers");
/**
* Should only be called by threads that have currently acquired send permission.
*
* @return a byte buffer to be used for sending on this connection.
*/
public ByteBuffer acquireDirectSenderBuffer(int size) {
return acquireDirectBuffer(size, true);
}
public ByteBuffer acquireDirectReceiveBuffer(int size) {
return acquireDirectBuffer(size, false);
}
/**
* try to acquire direct buffer, if enabled by configuration
*/
private ByteBuffer acquireDirectBuffer(int size, boolean send) {
ByteBuffer result;
if (useDirectBuffers) {
if (size <= MEDIUM_BUFFER_SIZE) {
result = acquirePredefinedFixedBuffer(send, size);
} else {
result = acquireLargeBuffer(send, size);
}
if (result.capacity() > size) {
result.position(0).limit(size);
result = result.slice();
}
return result;
}
// if we are using heap buffers then don't bother with keeping them around
result = ByteBuffer.allocate(size);
updateBufferStats(size, send, false);
return result;
}
public ByteBuffer acquireNonDirectSenderBuffer(int size) {
ByteBuffer result = ByteBuffer.allocate(size);
stats.incSenderBufferSize(size, false);
return result;
}
public ByteBuffer acquireNonDirectReceiveBuffer(int size) {
ByteBuffer result = ByteBuffer.allocate(size);
stats.incReceiverBufferSize(size, false);
return result;
}
/**
* Acquire direct buffer with predefined default capacity (SMALL_BUFFER_SIZE or
* MEDIUM_BUFFER_SIZE)
*/
private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
// set
int defaultSize;
ConcurrentLinkedQueue<BBSoftReference> bufferTempQueue;
ByteBuffer result;
if (size <= SMALL_BUFFER_SIZE) {
defaultSize = SMALL_BUFFER_SIZE;
bufferTempQueue = bufferSmallQueue;
} else {
defaultSize = MEDIUM_BUFFER_SIZE;
bufferTempQueue = bufferMiddleQueue;
}
BBSoftReference ref = bufferTempQueue.poll();
while (ref != null) {
ByteBuffer bb = ref.getBB();
if (bb == null) {
// it was garbage collected
updateBufferStats(-defaultSize, ref.getSend(), true);
} else {
bb.clear();
if (defaultSize > size) {
bb.limit(size);
}
return bb;
}
ref = bufferTempQueue.poll();
}
result = ByteBuffer.allocateDirect(defaultSize);
updateBufferStats(defaultSize, send, true);
if (defaultSize > size) {
result.limit(size);
}
return result;
}
private ByteBuffer acquireLargeBuffer(boolean send, int size) {
// set
ByteBuffer result;
IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
// set
BBSoftReference ref = bufferLargeQueue.poll();
while (ref != null) {
ByteBuffer bb = ref.getBB();
if (bb == null) {
// it was garbage collected
int refSize = ref.consumeSize();
if (refSize > 0) {
updateBufferStats(-refSize, ref.getSend(), true);
}
} else if (bb.capacity() >= size) {
bb.clear();
if (bb.capacity() > size) {
bb.limit(size);
}
return bb;
} else {
// wasn't big enough so put it back in the queue
Assert.assertTrue(bufferLargeQueue.offer(ref));
if (alreadySeen == null) {
alreadySeen = new IdentityHashMap<>();
}
if (alreadySeen.put(ref, ref) != null) {
break;
}
}
ref = bufferLargeQueue.poll();
}
result = ByteBuffer.allocateDirect(size);
updateBufferStats(size, send, true);
return result;
}
public void releaseSenderBuffer(ByteBuffer bb) {
releaseBuffer(bb, true);
}
public void releaseReceiveBuffer(ByteBuffer bb) {
releaseBuffer(bb, false);
}
/**
* expand a buffer that's currently being read from
*/
ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
int desiredCapacity) {
if (existing.capacity() >= desiredCapacity) {
if (existing.position() > 0) {
existing.compact();
existing.flip();
}
return existing;
}
ByteBuffer newBuffer;
if (existing.isDirect()) {
newBuffer = acquireDirectBuffer(type, desiredCapacity);
} else {
newBuffer = acquireNonDirectBuffer(type, desiredCapacity);
}
newBuffer.clear();
newBuffer.put(existing);
newBuffer.flip();
releaseBuffer(type, existing);
return newBuffer;
}
/**
* expand a buffer that's currently being written to
*/
ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
int desiredCapacity) {
if (existing.capacity() >= desiredCapacity) {
return existing;
}
ByteBuffer newBuffer;
if (existing.isDirect()) {
newBuffer = acquireDirectBuffer(type, desiredCapacity);
} else {
newBuffer = acquireNonDirectBuffer(type, desiredCapacity);
}
newBuffer.clear();
existing.flip();
newBuffer.put(existing);
releaseBuffer(type, existing);
return newBuffer;
}
ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
switch (type) {
case UNTRACKED:
return ByteBuffer.allocate(capacity);
case TRACKED_SENDER:
return acquireDirectSenderBuffer(capacity);
case TRACKED_RECEIVER:
return acquireDirectReceiveBuffer(capacity);
}
throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
}
ByteBuffer acquireNonDirectBuffer(BufferPool.BufferType type, int capacity) {
switch (type) {
case UNTRACKED:
return ByteBuffer.allocate(capacity);
case TRACKED_SENDER:
return acquireNonDirectSenderBuffer(capacity);
case TRACKED_RECEIVER:
return acquireNonDirectReceiveBuffer(capacity);
}
throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
}
void releaseBuffer(BufferPool.BufferType type, ByteBuffer buffer) {
switch (type) {
case UNTRACKED:
return;
case TRACKED_SENDER:
releaseSenderBuffer(buffer);
return;
case TRACKED_RECEIVER:
releaseReceiveBuffer(buffer);
return;
}
throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
}
/**
* Releases a previously acquired buffer.
*/
private void releaseBuffer(ByteBuffer buffer, boolean send) {
if (buffer.isDirect()) {
buffer = getPoolableBuffer(buffer);
BBSoftReference bbRef = new BBSoftReference(buffer, send);
if (buffer.capacity() <= SMALL_BUFFER_SIZE) {
bufferSmallQueue.offer(bbRef);
} else if (buffer.capacity() <= MEDIUM_BUFFER_SIZE) {
bufferMiddleQueue.offer(bbRef);
} else {
bufferLargeQueue.offer(bbRef);
}
} else {
updateBufferStats(-buffer.capacity(), send, false);
}
}
/**
* If we hand out a buffer that is larger than the requested size we create a
* "slice" of the buffer having the requested capacity and hand that out instead.
* When we put the buffer back in the pool we need to find the original, non-sliced,
* buffer. This is held in DirectBuffer in its "attachment" field, which is a public
* method, though DirectBuffer is package-private. This method is visible for use
* in debugging and testing. For debugging, invoke this method if you need to see
* the non-sliced buffer for some reason, such as logging its hashcode.
*/
@VisibleForTesting
public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
if (!buffer.isDirect()) {
return buffer;
}
ByteBuffer result = buffer;
if (parentOfSliceMethod == null) {
Class clazz = buffer.getClass();
try {
Method method = clazz.getMethod("attachment");
method.setAccessible(true);
parentOfSliceMethod = method;
} catch (Exception e) {
throw new InternalGemFireException("unable to retrieve underlying byte buffer", e);
}
}
try {
Object attachment = parentOfSliceMethod.invoke(buffer);
if (attachment instanceof ByteBuffer) {
result = (ByteBuffer) attachment;
} else if (attachment != null) {
throw new InternalGemFireException(
"direct byte buffer attachment was not a byte buffer but a " +
attachment.getClass().getName());
}
} catch (Exception e) {
throw new InternalGemFireException("unable to retrieve underlying byte buffer", e);
}
return result;
}
/**
* Update buffer stats.
*/
private void updateBufferStats(int size, boolean send, boolean direct) {
if (send) {
stats.incSenderBufferSize(size, direct);
} else {
stats.incReceiverBufferSize(size, direct);
}
}
/**
* A soft reference that remembers the size of the byte buffer it refers to. TODO Dan - I really
* think this should be a weak reference. The JVM doesn't seem to clear soft references if it is
* getting low on direct memory.
*/
private static class BBSoftReference extends SoftReference<ByteBuffer> {
private int size;
private final boolean send;
BBSoftReference(ByteBuffer bb, boolean send) {
super(bb);
this.size = bb.capacity();
this.send = send;
}
public int getSize() {
return this.size;
}
synchronized int consumeSize() {
int result = this.size;
this.size = 0;
return result;
}
public boolean getSend() {
return this.send;
}
public ByteBuffer getBB() {
return super.get();
}
}
}