DRILL-7790 : Build Drill with Netty version 4.1.59.Final
diff --git a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
index 66e04ac..458d833 100644
--- a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
+++ b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
@@ -29,7 +29,6 @@
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.IntObjectMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,12 +128,7 @@
@Override
public Collection<V> values() {
- return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function<IntObjectMap.Entry<V>, V>() {
- @Override
- public V apply(IntObjectMap.Entry<V> entry) {
- return Preconditions.checkNotNull(entry).value();
- }
- }));
+ return Lists.newArrayList(Iterables.transform(secondary.entries(), entry -> Preconditions.checkNotNull(entry).value()));
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
index 271da41..c3c3539 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
@@ -19,7 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufProcessor;
+import io.netty.util.ByteProcessor;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -27,6 +27,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
@@ -222,36 +223,76 @@
}
@Override
+ public short getShortLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getUnsignedShort(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getUnsignedShortLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getMedium(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getMediumLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getUnsignedMedium(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getUnsignedMediumLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getInt(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getIntLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public long getUnsignedInt(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public long getUnsignedIntLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public long getLong(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public long getLongLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public CharSequence getCharSequence(int index, int length, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public char getChar(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -267,6 +308,11 @@
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf getBytes(int index, ByteBuf dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -322,21 +368,46 @@
}
@Override
+ public ByteBuf setShortLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setMedium(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setMediumLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setIntLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setLongLE(int index, long value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public int setCharSequence(int index, CharSequence sequence, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setChar(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -352,6 +423,11 @@
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setBytes(int index, ByteBuf src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -418,7 +494,11 @@
@Override
public short readShort() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public short readShortLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
@@ -428,39 +508,68 @@
}
@Override
+ public int readUnsignedShortLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int readMedium() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readMediumLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public int readUnsignedMedium() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readUnsignedMediumLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public int readInt() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readIntLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public long readUnsignedInt() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public long readUnsignedIntLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public long readLong() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public long readLongLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public char readChar() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public CharSequence readCharSequence(int length, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
@@ -472,310 +581,314 @@
@Override
public double readDouble() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readBytes(FileChannel out, long position, int length) throws IOException {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf readBytes(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readSlice(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(byte[] dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuffer dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(OutputStream out, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf skipBytes(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBoolean(boolean value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeByte(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeShort(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeShortLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeMedium(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeMediumLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeInt(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeIntLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeLong(long value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeLongLE(long value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeChar(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeFloat(float value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeDouble(double value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int writeBytes(FileChannel in, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public int writeCharSequence(CharSequence sequence, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeBytes(ByteBuf src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(byte[] src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(ByteBuffer src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int writeBytes(InputStream in, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeZero(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(int length, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(int index, int length, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf copy() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf copy(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf readRetainedSlice(int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf slice() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf retainedSlice() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf slice(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf retainedSlice(int index, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public ByteBuf retainedDuplicate() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf duplicate() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int nioBufferCount() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer nioBuffer() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public boolean hasArray() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public byte[] array() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public boolean hasMemoryAddress() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public long memoryAddress() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public String toString(Charset charset) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public String toString(int index, int length, Charset charset) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
@@ -793,33 +906,51 @@
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
+ @Override
+ public boolean isReadOnly() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public ByteBuf asReadOnly() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
@Override
public boolean equals(Object arg0) {
return false;
}
+ @Override
+ public ByteBuf touch() {
+ return this;
+ }
@Override
- public int forEachByte(ByteBufProcessor arg0) {
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
+
+ @Override
+ public int forEachByte(ByteProcessor arg0) {
return 0;
}
@Override
- public int forEachByte(int arg0, int arg1, ByteBufProcessor arg2) {
+ public int forEachByte(int arg0, int arg1, ByteProcessor arg2) {
return 0;
}
@Override
- public int forEachByteDesc(ByteBufProcessor arg0) {
+ public int forEachByteDesc(ByteProcessor arg0) {
return 0;
}
@Override
- public int forEachByteDesc(int arg0, int arg1, ByteBufProcessor arg2) {
+ public int forEachByteDesc(int arg0, int arg1, ByteProcessor arg2) {
return 0;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
index 179cc7c..85d14dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.rpc;
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -60,10 +60,10 @@
void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data);
/**
- * Returns the {@link ChannelFuture} which will be notified when this
+ * Returns the {@link Future} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/
- ChannelFuture getChannelClosureFuture();
+ Future<Void> getClosureFuture();
/**
* @return Return the client node address.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index cb1db13..892636c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,14 +19,12 @@
import com.google.protobuf.MessageLite;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -284,14 +282,9 @@
}
@Override
- public ChannelFuture getChannelClosureFuture() {
+ public Future<Void> getClosureFuture() {
return getChannel().closeFuture()
- .addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- cleanup();
- }
- });
+ .addListener(future -> cleanup());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
index f847323..c04a75d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
@@ -19,13 +19,12 @@
import java.net.SocketAddress;
+import io.netty.util.concurrent.Future;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.UserSession;
-import io.netty.channel.ChannelFuture;
-
public abstract class BaseWebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
protected WebSessionResources webSessionResources;
@@ -40,7 +39,7 @@
}
@Override
- public ChannelFuture getChannelClosureFuture() {
+ public Future<Void> getClosureFuture() {
return webSessionResources.getCloseFuture();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index c345571..8473d64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -29,8 +29,8 @@
import freemarker.cache.WebappTemplateLoader;
import freemarker.core.HTMLOutputFormat;
import freemarker.template.Configuration;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.EventExecutor;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
@@ -221,11 +221,11 @@
config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
- // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close
+ // Create a future which is needed by Foreman only. Foreman uses this future to add a close
// listener to known about channel close event from underlying layer. We use this future to notify Foreman
// listeners when the Web session (not connection) between Web Client and WebServer is closed. This will help
// Foreman to cancel all the running queries for this Web Client.
- final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+ final Promise<Void> closeFuture = new DefaultPromise<>(executor);
// Create a WebSessionResource instance which owns the lifecycle of all the session resources.
// Set this instance as an attribute of HttpSession, since it will be used until session is destroyed
@@ -283,12 +283,12 @@
logger.trace("Failed to get the remote address of the http session request", ex);
}
- // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close
+ // Create a close future which is needed by Foreman only. Foreman uses this future to add a close
// listener to known about channel close event from underlying layer.
//
// The invocation of this close future is no-op as it will be triggered after query completion in unsecure case.
// But we need this close future as it's expected by Foreman.
- final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+ final Promise<Void> closeFuture = new DefaultPromise(executor);
final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress,
drillUserSession, closeFuture);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index c06770e..e678923 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Promise;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.ChannelClosedException;
@@ -40,10 +40,10 @@
private final UserSession webUserSession;
- private ChannelPromise closeFuture;
+ private Promise<Void> closeFuture;
WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress,
- UserSession userSession, ChannelPromise closeFuture) {
+ UserSession userSession, Promise<Void> closeFuture) {
this.allocator = allocator;
this.remoteAddress = remoteAddress;
this.webUserSession = userSession;
@@ -58,7 +58,7 @@
return allocator;
}
- public ChannelPromise getCloseFuture() {
+ public Promise<Void> getCloseFuture() {
return closeFuture;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index c81e5b6..5e5f57e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -45,12 +45,8 @@
* access to the {@code UserSession} executing the query. There is no actual physical
* channel corresponding to this connection wrapper.
*
- * It returns a close future with no actual underlying
- * {@link io.netty.channel.Channel} associated with it but do have an
- * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no actual
- * connection established using this class, hence the close event will never be
- * fired by underlying layer and close future is set only when the
- * {@link WebSessionResources} are closed.
+ * It returns a close future which do have an EventExecutor out of BitServer EventLoopGroup.
+ * Close future is set only when the {@link WebSessionResources} are closed.
*/
public class WebUserConnection extends BaseWebUserConnection {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 38b85b6..64df6c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -22,7 +22,6 @@
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -116,7 +115,7 @@
private final ResponseSendListener responseListener = new ResponseSendListener();
private final GenericFutureListener<Future<Void>> closeListener = future -> cancel();
- private final ChannelFuture closeFuture;
+ private final Future<Void> closeFuture;
private final FragmentsRunner fragmentsRunner;
private final QueryStateProcessor queryStateProcessor;
@@ -141,7 +140,7 @@
this.queryRequest = queryRequest;
this.drillbitContext = drillbitContext;
this.initiatingClient = connection;
- this.closeFuture = initiatingClient.getChannelClosureFuture();
+ this.closeFuture = initiatingClient.getClosureFuture();
closeFuture.addListener(closeListener);
// Apply AutoLimit on resultSet (Usually received via REST APIs)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index 5de560a..9368a54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -19,7 +19,7 @@
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
import org.apache.drill.common.exceptions.ErrorHelper;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -244,8 +244,8 @@
}
@Override
- public ChannelFuture getChannelClosureFuture() {
- return inner.getChannelClosureFuture();
+ public Future<Void> getClosureFuture() {
+ return inner.getClosureFuture();
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
index 0e9e48d..0c5d3c9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
import io.netty.channel.local.LocalAddress;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
@@ -30,6 +30,7 @@
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.test.ClusterTest;
+import org.mockito.Mockito;
public class RestServerTest extends ClusterTest {
@@ -55,7 +56,7 @@
.withOptionManager(systemOptions)
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(principal.getName()).build())
.build(),
- new DefaultChannelPromise(null));
+ Mockito.mock(Promise.class));
WebUserConnection connection = new WebUserConnection.AnonWebUserConnection(webSessionResources);
return new RestQueryRunner(q, cluster.drillbit().getManager(), connection).run();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
index 69fa942..56eb84b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -33,7 +33,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -65,15 +64,13 @@
@Test
public void testChannelPromiseWithNullExecutor() throws Exception {
try {
- ChannelPromise closeFuture = new DefaultChannelPromise(null);
+ Promise<Void> closeFuture = new DefaultPromise(null);
webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
fail();
} catch (Exception e) {
assertTrue(e instanceof NullPointerException);
- verify(webSessionResources.getAllocator()).close();
- verify(webSessionResources.getSession()).close();
}
}
@@ -85,14 +82,12 @@
public void testChannelPromiseWithValidExecutor() throws Exception {
try {
EventExecutor mockExecutor = mock(EventExecutor.class);
- ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor);
+ Promise<Void> closeFuture = new DefaultPromise(mockExecutor);
webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
verify(webSessionResources.getAllocator()).close();
verify(webSessionResources.getSession()).close();
- verify(mockExecutor).inEventLoop();
- verify(mockExecutor).execute(any(Runnable.class));
assertTrue(webSessionResources.getCloseFuture() == null);
assertTrue(!listenerComplete);
} catch (Exception e) {
@@ -107,7 +102,7 @@
@Test
public void testDoubleClose() throws Exception {
try {
- ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class));
+ Promise<Void> closeFuture = new DefaultPromise(mock(EventExecutor.class));
webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
@@ -134,7 +129,7 @@
GenericFutureListener<Future<Void>> closeListener = new TestClosedListener();
latch = new CountDownLatch(1);
executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next();
- ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+ Promise<Void> closeFuture = new DefaultPromise(executor);
// create WebSessionResources with above ChannelPromise to notify listener
webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 3257d18..1d9e0a0 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -535,7 +535,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>46000000</maxsize>
+ <maxsize>46300000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -595,7 +595,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>46000000</maxsize>
+ <maxsize>46300000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 9e90d06..c055344 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -22,6 +22,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
@@ -471,12 +472,29 @@
}
@Override
+ public ByteBuf touch() {
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
+
+ @Override
public long getLong(int index) {
chk(index, 8);
return PlatformDependent.getLong(addr(index));
}
@Override
+ public long getLongLE(int index) {
+ chk(index, 8);
+ final long var = PlatformDependent.getLong(addr(index));
+ return Long.reverseBytes(var);
+ }
+
+ @Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
@@ -503,6 +521,13 @@
}
@Override
+ public int getIntLE(int index) {
+ chk(index, 4);
+ final int var = PlatformDependent.getInt(addr(index));
+ return Integer.reverseBytes(var);
+ }
+
+ @Override
public int getUnsignedShort(int index) {
return getShort(index) & 0xFFFF;
}
@@ -514,6 +539,28 @@
}
@Override
+ public short getShortLE(int index) {
+ chk(index, 2);
+ final short var = PlatformDependent.getShort(addr(index));
+ return Short.reverseBytes(var);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ final long addr = addr(index);
+ return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+ (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+ PlatformDependent.getByte(addr + 2) & 0xff;
+ }
+
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ final long addr = this.addr(index);
+ return PlatformDependent.getByte(addr) & 255 |
+ (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8;
+ }
+
+ @Override
public ByteBuf setShort(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
@@ -521,6 +568,31 @@
}
@Override
+ public ByteBuf setShortLE(int index, int value) {
+ chk(index, 2);
+ PlatformDependent.putShort(addr(index), Short.reverseBytes((short)value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ chk(index, 3);
+ long addr = this.addr(index);
+ PlatformDependent.putByte(addr, (byte)(value >>> 16));
+ PlatformDependent.putShort(addr + 1L, (short)value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ chk(index, 3);
+ long addr = this.addr(index);
+ PlatformDependent.putByte(addr, (byte)value);
+ PlatformDependent.putShort(addr + 1L, Short.reverseBytes((short)(value >>> 8)));
+ return this;
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), value);
@@ -528,6 +600,13 @@
}
@Override
+ public ByteBuf setIntLE(int index, int value) {
+ chk(index, 4);
+ PlatformDependent.putInt(addr(index), Integer.reverseBytes(value));
+ return this;
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), value);
@@ -535,6 +614,13 @@
}
@Override
+ public ByteBuf setLongLE(int index, long value) {
+ chk(index, 4);
+ PlatformDependent.putLong(addr(index), Long.reverseBytes(value));
+ return this;
+ }
+
+ @Override
public ByteBuf setChar(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
@@ -643,16 +729,31 @@
}
@Override
+ protected short _getShortLE(int index) {
+ return getShortLE(index);
+ }
+
+ @Override
protected int _getInt(int index) {
return getInt(index);
}
@Override
+ protected int _getIntLE(int index) {
+ return getIntLE(index);
+ }
+
+ @Override
protected long _getLong(int index) {
return getLong(index);
}
@Override
+ protected long _getLongLE(int index) {
+ return getLongLE(index);
+ }
+
+ @Override
protected void _setByte(int index, int value) {
setByte(index, value);
}
@@ -663,21 +764,41 @@
}
@Override
+ protected void _setShortLE(int index, int value) {
+ setShortLE(index, value);
+ }
+
+ @Override
protected void _setMedium(int index, int value) {
setMedium(index, value);
}
@Override
+ protected void _setMediumLE(int index, int value) {
+ setMediumLE(index, value);
+ }
+
+ @Override
protected void _setInt(int index, int value) {
setInt(index, value);
}
@Override
+ protected void _setIntLE(int index, int value) {
+ setIntLE(index, value);
+ }
+
+ @Override
protected void _setLong(int index, long value) {
setLong(index, value);
}
@Override
+ protected void _setLongLE(int index, long value) {
+ setLongLE(index, value);
+ }
+
+ @Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
final int BULK_COPY_THR = 16;
// Performance profiling indicated that using the "putByte()" method is faster for short
@@ -703,11 +824,18 @@
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ return udle.getBytes(index + offset, out, position, length);
+ }
+
+ @Override
protected int _getUnsignedMedium(int index) {
- final long addr = addr(index);
- return (PlatformDependent.getByte(addr) & 0xff) << 16 |
- (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
- PlatformDependent.getByte(addr + 2) & 0xff;
+ return getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return getUnsignedMediumLE(index);
}
@Override
@@ -763,6 +891,11 @@
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ return udle.setBytes(index + offset, in, position, length);
+ }
+
+ @Override
public byte getByte(int index) {
chk(index, 1);
return PlatformDependent.getByte(addr(index));
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
index d69d17b..d29adfa 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -22,6 +22,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@@ -127,6 +128,11 @@
}
@Override
+ protected short _getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
public int getUnsignedMedium(int index) {
return _getUnsignedMedium(index);
}
@@ -137,6 +143,11 @@
}
@Override
+ protected int _getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
public int getInt(int index) {
return _getInt(index);
}
@@ -147,6 +158,11 @@
}
@Override
+ protected int _getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
public long getLong(int index) {
return _getLong(index);
}
@@ -157,6 +173,11 @@
}
@Override
+ protected long _getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
public abstract ByteBuf copy(int index, int length);
@Override
@@ -205,6 +226,11 @@
}
@Override
+ protected void _setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ }
+
+ @Override
public ByteBuf setMedium(int index, int value) {
_setMedium(index, value);
return this;
@@ -216,6 +242,11 @@
}
@Override
+ protected void _setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
_setInt(index, value);
return this;
@@ -227,6 +258,11 @@
}
@Override
+ protected void _setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
_setLong(index, value);
return this;
@@ -238,6 +274,11 @@
}
@Override
+ protected void _setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ }
+
+ @Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
buffer.setBytes(index, src, srcIndex, length);
return this;
@@ -269,6 +310,11 @@
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ return buffer.getBytes(index, out, position, length);
+ }
+
+ @Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
return buffer.setBytes(index, in, length);
@@ -281,6 +327,11 @@
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ return buffer.setBytes(index, in, position, length);
+ }
+
+ @Override
public int nioBufferCount() {
return buffer.nioBufferCount();
}
@@ -296,16 +347,6 @@
}
@Override
- public int forEachByte(int index, int length, ByteBufProcessor processor) {
- return buffer.forEachByte(index, length, processor);
- }
-
- @Override
- public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
- return buffer.forEachByteDesc(index, length, processor);
- }
-
- @Override
public final int refCnt() {
return unwrap().refCnt();
}
@@ -317,6 +358,18 @@
}
@Override
+ public ByteBuf touch() {
+ buffer.touch();
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ buffer.touch(hint);
+ return this;
+ }
+
+ @Override
public final ByteBuf retain(int increment) {
unwrap().retain(increment);
return this;
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 1e70216..bee3beb 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -68,6 +68,14 @@
}
}
+ public ByteBuf allocateHeap(int initialCapacity, int maxCapacity) {
+ try {
+ return allocator.heapBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e) {
+ throw new OutOfMemoryException("Failure allocating heap buffer.", e);
+ }
+ }
+
public int getChunkSize() {
return allocator.chunkSize;
}
@@ -201,12 +209,6 @@
return newDirectBufferL(initialCapacity, maxCapacity);
}
- @Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException(
- "Drill doesn't support using heap buffers.");
- }
-
private void validate(int initialCapacity, int maxCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException(
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index f4d6b81..c6bc769 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -57,7 +57,7 @@
private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
super(buf);
- if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+ if (!NATIVE_ORDER) {
throw new IllegalStateException("Drill only runs on LittleEndian systems.");
}
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
index 33325a5..81efdf0 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
@@ -17,22 +17,28 @@
*/
package org.apache.drill.exec.memory;
+import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ExpandableByteBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
/**
- * An implementation of ByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
+ * An extend of AbstractByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
* and managed using Drill's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
* the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
* DrillBufs to be expandable.
+ *
+ * Beside it, DrillByteBufAllocator uses PooledByteBufAllocatorL.InnerAllocator as allocator only for heapBuffer's for
+ * netty's purposes, when it directly calls heapBuffer methods.
*/
-public class DrillByteBufAllocator implements ByteBufAllocator {
+public class DrillByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
+ // used to let netty properly work in case when it directly calls heapBuffer methods
+ private static final PooledByteBufAllocatorL HEAP_ALLOCATOR = AllocationManager.INNER_ALLOCATOR;
private final BufferAllocator allocator;
public DrillByteBufAllocator(BufferAllocator allocator) {
@@ -105,37 +111,28 @@
}
@Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ return HEAP_ALLOCATOR.allocateHeap(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
public boolean isDirectBufferPooled() {
return false;
}
@Override
- public ByteBuf heapBuffer() {
- throw fail();
- }
-
- @Override
- public ByteBuf heapBuffer(int initialCapacity) {
- throw fail();
- }
-
- @Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw fail();
- }
-
- @Override
public CompositeByteBuf compositeHeapBuffer() {
- throw fail();
+ return compositeHeapBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
- throw fail();
- }
-
- private RuntimeException fail() {
- throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
+ return new CompositeByteBuf(this, false, maxNumComponents);
}
}
diff --git a/pom.xml b/pom.xml
index 1bd1dac..28dd9d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
<surefire.version>3.0.0-M4</surefire.version>
<commons.compress.version>1.20</commons.compress.version>
<hikari.version>3.4.2</hikari.version>
- <netty.version>4.0.48.Final</netty.version>
+ <netty.version>4.1.59.Final</netty.version>
<httpclient.version>4.5.12</httpclient.version>
<libthrift.version>0.13.0</libthrift.version>
<derby.version>10.14.2.0</derby.version>
@@ -1822,6 +1822,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>