blob: 3ec6b3e0fe7fce3ea90a139e6df2ceaeb3560cbf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.exec.memory.Accountor;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.BufferManager;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.util.AssertionUtil;
import com.google.common.base.Preconditions;
public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
private final ByteBuf b;
private final long addr;
private final int offset;
private final boolean rootBuffer;
private final AtomicLong rootRefCnt = new AtomicLong(1);
private volatile BufferAllocator allocator;
private volatile Accountor acct;
private volatile int length;
// TODO - cleanup
// The code is partly shared and partly copy-pasted between
// these three types. They should be unified under one interface
// to share code and to remove the hacky code here to use only
// one of these types at a time and use null checks to find out
// which.
private OperatorContext context;
private FragmentContext fContext;
private BufferManager bufManager;
public DrillBuf(BufferAllocator allocator, Accountor a, UnsafeDirectLittleEndian b) {
super(b.maxCapacity());
this.b = b;
this.addr = b.memoryAddress();
this.acct = a;
this.length = b.capacity();
this.offset = 0;
this.rootBuffer = true;
this.allocator = allocator;
}
private DrillBuf(BufferAllocator allocator, Accountor a) {
super(0);
this.b = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
this.allocator = allocator;
this.acct = a;
this.length = 0;
this.addr = 0;
this.rootBuffer = false;
this.offset = 0;
}
/**
* Special constructor used for RPC ownership transfer. Takes a snapshot slice of the current buf
* but points directly to the underlying UnsafeLittleEndian buffer. Does this by calling unwrap()
* twice on the provided DrillBuf and expecting an UnsafeDirectLittleEndian buffer. This operation
* includes taking a new reference count on the underlying buffer and maintaining returning with a
* current reference count for itself (masking the underlying reference count).
* @param allocator
* @param a Allocator used when users try to receive allocator from buffer.
* @param b Accountor used for accounting purposes.
*/
public DrillBuf(BufferAllocator allocator, Accountor a, DrillBuf b) {
this(allocator, a, getUnderlying(b), b, 0, b.length, true);
assert b.unwrap().unwrap() instanceof UnsafeDirectLittleEndian;
b.unwrap().unwrap().retain();
}
private DrillBuf(DrillBuf buffer, int index, int length) {
this(buffer.allocator, null, buffer, buffer, index, length, false);
}
private static ByteBuf getUnderlying(DrillBuf b){
ByteBuf underlying = b.unwrap().unwrap();
return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
}
private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
super(length);
if (index < 0 || index > buffer.capacity() - length) {
throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + index + ", " + length + ')');
}
this.length = length;
writerIndex(length);
this.b = replacement;
this.addr = buffer.memoryAddress() + index;
this.offset = index;
this.acct = a;
this.length = length;
this.rootBuffer = root;
this.allocator = allocator;
}
public void setOperatorContext(OperatorContext c) {
this.context = c;
}
public void setFragmentContext(FragmentContext c) {
this.fContext = c;
}
public void setBufferManager(BufferManager bufManager) {
this.bufManager = bufManager;
}
public BufferAllocator getAllocator() {
return allocator;
}
public DrillBuf reallocIfNeeded(int size) {
if (this.capacity() >= size) {
return this;
}
if (context != null) {
return context.replace(this, size);
} else if(fContext != null) {
return fContext.replace(this, size);
} else if (bufManager != null) {
return bufManager.replace(this, size);
} else {
throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
}
}
@Override
public int refCnt() {
if(rootBuffer){
return (int) this.rootRefCnt.get();
}else{
return b.refCnt();
}
}
private long addr(int index) {
return addr + index;
}
private final void checkIndexD(int index) {
ensureAccessible();
if (index < 0 || index >= capacity()) {
throw new IndexOutOfBoundsException(String.format(
"index: %d (expected: range(0, %d))", index, capacity()));
}
}
private final void checkIndexD(int index, int fieldLength) {
ensureAccessible();
if (fieldLength < 0) {
throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
}
if (index < 0 || index > capacity() - fieldLength) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
}
}
/**
* Allows a function to determine whether not reading a particular string of bytes is valid.
*
* Will throw an exception if the memory is not readable for some reason. Only doesn't something in the
* case that AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
*
* @param start The starting position of the bytes to be read.
* @param end The exclusive endpoint of the bytes to be read.
*/
public void checkBytes(int start, int end){
if (BOUNDS_CHECKING_ENABLED) {
checkIndexD(start, end - start);
}
}
private void chk(int index, int width) {
if (BOUNDS_CHECKING_ENABLED) {
checkIndexD(index, width);
}
}
private void chk(int index) {
if (BOUNDS_CHECKING_ENABLED) {
checkIndexD(index);
}
}
private void ensure(int width) {
if (BOUNDS_CHECKING_ENABLED) {
ensureWritable(width);
}
}
public boolean transferAccounting(Accountor target) {
if (rootBuffer) {
boolean outcome = acct.transferTo(target, this, length);
acct = target;
return outcome;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public synchronized boolean release() {
return release(1);
}
/**
* Release the provided number of reference counts. If this is a root buffer, will decrease accounting if the local reference count returns to zero.
*/
@Override
public synchronized boolean release(int decrement) {
if(rootBuffer){
final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
if (newRefCnt == 0) {
b.release(decrement);
acct.release(this, length);
return true;
}else{
return false;
}
}else{
return b.release(decrement);
}
}
@Override
public int capacity() {
return length;
}
@Override
public synchronized ByteBuf capacity(int newCapacity) {
if (rootBuffer) {
if (newCapacity == length) {
return this;
} else if (newCapacity < length) {
b.capacity(newCapacity);
int diff = length - b.capacity();
acct.releasePartial(this, diff);
this.length = length - diff;
return this;
} else {
throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
}
} else {
throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
}
}
@Override
public int maxCapacity() {
return length;
}
@Override
public ByteBufAllocator alloc() {
return b.alloc();
}
@Override
public ByteOrder order() {
return ByteOrder.LITTLE_ENDIAN;
}
@Override
public ByteBuf order(ByteOrder endianness) {
// if(endianness != ByteOrder.LITTLE_ENDIAN) throw new
// UnsupportedOperationException("Drill buffers only support little endian.");
return this;
}
@Override
public ByteBuf unwrap() {
return b;
}
@Override
public boolean isDirect() {
return true;
}
@Override
public ByteBuf readBytes(int length) {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf readSlice(int length) {
ByteBuf slice = slice(readerIndex(), length);
readerIndex(readerIndex() + length);
return slice;
}
@Override
public ByteBuf copy() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf copy(int index, int length) {
throw new UnsupportedOperationException();
}
@Override
public ByteBuf slice() {
return slice(readerIndex(), readableBytes());
}
@Override
public DrillBuf slice(int index, int length) {
DrillBuf buf = new DrillBuf(this, index, length);
buf.writerIndex = length;
return buf;
}
@Override
public DrillBuf duplicate() {
return new DrillBuf(this, 0, length);
}
@Override
public int nioBufferCount() {
return 1;
}
@Override
public ByteBuffer nioBuffer() {
return nioBuffer(readerIndex(), readableBytes());
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return b.nioBuffer(offset + index, length);
}
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
return b.internalNioBuffer(offset + index, length);
}
@Override
public ByteBuffer[] nioBuffers() {
return new ByteBuffer[]{nioBuffer()};
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[]{nioBuffer(index, length)};
}
@Override
public boolean hasArray() {
return b.hasArray();
}
@Override
public byte[] array() {
return b.array();
}
@Override
public int arrayOffset() {
return b.arrayOffset();
}
@Override
public boolean hasMemoryAddress() {
return true;
}
@Override
public long memoryAddress() {
return this.addr;
}
@Override
public String toString(Charset charset) {
return toString(readerIndex, readableBytes(), charset);
}
@Override
public String toString(int index, int length, Charset charset) {
if (length == 0) {
return "";
}
ByteBuffer nioBuffer;
if (nioBufferCount() == 1) {
nioBuffer = nioBuffer(index, length);
} else {
nioBuffer = ByteBuffer.allocate(length);
getBytes(index, nioBuffer);
nioBuffer.flip();
}
return ByteBufUtil.decodeString(nioBuffer, charset);
}
@Override
public int hashCode() {
return System.identityHashCode(this);
}
@Override
public boolean equals(Object obj) {
// identity equals only.
return this == obj;
}
@Override
public ByteBuf retain(int increment) {
if(rootBuffer){
this.rootRefCnt.addAndGet(increment);
}else{
b.retain(increment);
}
return this;
}
@Override
public ByteBuf retain() {
return retain(1);
}
@Override
public long getLong(int index) {
chk(index, 8);
long v = PlatformDependent.getLong(addr(index));
return v;
}
@Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
@Override
public double getDouble(int index) {
return Double.longBitsToDouble(getLong(index));
}
@Override
public char getChar(int index) {
return (char) getShort(index);
}
@Override
public long getUnsignedInt(int index) {
return getInt(index) & 0xFFFFFFFFL;
}
@Override
public int getInt(int index) {
chk(index, 4);
int v = PlatformDependent.getInt(addr(index));
return v;
}
@Override
public int getUnsignedShort(int index) {
return getShort(index) & 0xFFFF;
}
@Override
public short getShort(int index) {
chk(index, 2);
short v = PlatformDependent.getShort(addr(index));
return v;
}
@Override
public ByteBuf setShort(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
return this;
}
@Override
public ByteBuf setInt(int index, int value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), value);
return this;
}
@Override
public ByteBuf setLong(int index, long value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), value);
return this;
}
@Override
public ByteBuf setChar(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
return this;
}
@Override
public ByteBuf setFloat(int index, float value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
return this;
}
@Override
public ByteBuf setDouble(int index, double value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
return this;
}
@Override
public ByteBuf writeShort(int value) {
ensure(2);
PlatformDependent.putShort(addr(writerIndex), (short) value);
writerIndex += 2;
return this;
}
@Override
public ByteBuf writeInt(int value) {
ensure(4);
PlatformDependent.putInt(addr(writerIndex), value);
writerIndex += 4;
return this;
}
@Override
public ByteBuf writeLong(long value) {
ensure(8);
PlatformDependent.putLong(addr(writerIndex), value);
writerIndex += 8;
return this;
}
@Override
public ByteBuf writeChar(int value) {
ensure(2);
PlatformDependent.putShort(addr(writerIndex), (short) value);
writerIndex += 2;
return this;
}
@Override
public ByteBuf writeFloat(float value) {
ensure(4);
PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
writerIndex += 4;
return this;
}
@Override
public ByteBuf writeDouble(double value) {
ensure(8);
PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
writerIndex += 8;
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
b.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
b.getBytes(index + offset, dst);
return this;
}
@Override
public ByteBuf setByte(int index, int value) {
chk(index, 1);
PlatformDependent.putByte(addr(index), (byte) value);
return this;
}
public void setByte(int index, byte b){
chk(index, 1);
PlatformDependent.putByte(addr(index), b);
}
public void writeByteUnsafe(byte b){
PlatformDependent.putByte(addr(readerIndex), b);
readerIndex++;
}
@Override
protected byte _getByte(int index) {
return getByte(index);
}
@Override
protected short _getShort(int index) {
return getShort(index);
}
@Override
protected int _getInt(int index) {
return getInt(index);
}
@Override
protected long _getLong(int index) {
return getLong(index);
}
@Override
protected void _setByte(int index, int value) {
setByte(index, value);
}
@Override
protected void _setShort(int index, int value) {
setShort(index, value);
}
@Override
protected void _setMedium(int index, int value) {
setMedium(index, value);
}
@Override
protected void _setInt(int index, int value) {
setInt(index, value);
}
@Override
protected void _setLong(int index, long value) {
setLong(index, value);
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
b.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
b.getBytes(index + offset, out, length);
return this;
}
@Override
protected int _getUnsignedMedium(int index) {
long addr = addr(index);
return (PlatformDependent.getByte(addr) & 0xff) << 16 |
(PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
PlatformDependent.getByte(addr + 2) & 0xff;
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
return b.getBytes(index + offset, out, length);
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
b.setBytes(index + offset, src, srcIndex, length);
return this;
}
public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
if (src.isDirect()) {
checkIndex(index, length);
PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
b.setBytes(index + offset, src);
} else {
ByteBuffer newBuf = src.duplicate();
newBuf.position(srcIndex);
newBuf.limit(srcIndex + length);
b.setBytes(index + offset, src);
}
}
return this;
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
b.setBytes(index + offset, src, srcIndex, length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
b.setBytes(index + offset, src);
return this;
}
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
return b.setBytes(index + offset, in, length);
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
return b.setBytes(index + offset, in, length);
}
@Override
public byte getByte(int index) {
chk(index, 1);
return PlatformDependent.getByte(addr(index));
}
public static DrillBuf getEmpty(BufferAllocator allocator, Accountor a) {
return new DrillBuf(allocator, a);
}
public boolean isRootBuffer() {
return rootBuffer;
}
@Override
public void close() {
release();
}
}