DRILL-7790 : Build Drill with Netty version 4.1.59.Final
diff --git a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
index 66e04ac..458d833 100644
--- a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
+++ b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
@@ -29,7 +29,6 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.IntObjectMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,12 +128,7 @@
 
     @Override
     public Collection<V> values() {
-      return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function<IntObjectMap.Entry<V>, V>() {
-        @Override
-        public V apply(IntObjectMap.Entry<V> entry) {
-          return Preconditions.checkNotNull(entry).value();
-        }
-      }));
+      return Lists.newArrayList(Iterables.transform(secondary.entries(), entry -> Preconditions.checkNotNull(entry).value()));
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
index 271da41..c3c3539 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
@@ -19,7 +19,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufProcessor;
+import io.netty.util.ByteProcessor;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -27,6 +27,7 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
@@ -222,36 +223,76 @@
   }
 
   @Override
+  public short getShortLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public int getUnsignedShort(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public int getUnsignedShortLE(int index) {
+      throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public int getMedium(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public int getMediumLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public int getUnsignedMedium(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public int getUnsignedMediumLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public int getInt(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public int getIntLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public long getUnsignedInt(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public long getUnsignedIntLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public long getLong(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public long getLongLE(int index) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public CharSequence getCharSequence(int index, int length, Charset charset) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public char getChar(int index) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -267,6 +308,11 @@
   }
 
   @Override
+  public int getBytes(int index, FileChannel out, long position, int length) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf getBytes(int index, ByteBuf dst) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -322,21 +368,46 @@
   }
 
   @Override
+  public ByteBuf setShortLE(int index, int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf setMedium(int index, int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public ByteBuf setMediumLE(int index, int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf setInt(int index, int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public ByteBuf setIntLE(int index, int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf setLong(int index, long value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
+  public ByteBuf setLongLE(int index, long value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public int setCharSequence(int index, CharSequence sequence, Charset charset) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf setChar(int index, int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -352,6 +423,11 @@
   }
 
   @Override
+  public int setBytes(int index, FileChannel in, long position, int length) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public ByteBuf setBytes(int index, ByteBuf src) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -418,7 +494,11 @@
   @Override
   public short readShort() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public short readShortLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
@@ -428,39 +508,68 @@
   }
 
   @Override
+  public int readUnsignedShortLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
   public int readMedium() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public int readMediumLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public int readUnsignedMedium() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public int readUnsignedMediumLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public int readInt() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public int readIntLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public long readUnsignedInt() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public long readUnsignedIntLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public long readLong() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public long readLongLE() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public char readChar() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public CharSequence readCharSequence(int length, Charset charset) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
@@ -472,310 +581,314 @@
   @Override
   public double readDouble() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public int readBytes(FileChannel out, long position, int length) throws IOException {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf readBytes(int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readSlice(int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(ByteBuf dst) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(ByteBuf dst, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(byte[] dst) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(ByteBuffer dst) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf readBytes(OutputStream out, int length) throws IOException {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int readBytes(GatheringByteChannel out, int length) throws IOException {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf skipBytes(int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBoolean(boolean value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeByte(int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeShort(int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf writeShortLE(int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf writeMedium(int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf writeMediumLE(int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf writeInt(int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf writeIntLE(int value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf writeLong(long value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf writeLongLE(long value) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf writeChar(int value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeFloat(float value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeDouble(double value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public int writeBytes(FileChannel in, long position, int length) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public int writeCharSequence(CharSequence sequence, Charset charset) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf writeBytes(ByteBuf src) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBytes(ByteBuf src, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBytes(byte[] src) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeBytes(ByteBuffer src) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int writeBytes(InputStream in, int length) throws IOException {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf writeZero(int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int indexOf(int fromIndex, int toIndex, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
 
   @Override
   public int bytesBefore(byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
     @Override
   public int bytesBefore(int length, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
 
   @Override
   public int bytesBefore(int index, int length, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
 
   @Override
   public ByteBuf copy() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuf copy(int index, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf readRetainedSlice(int length) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf slice() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf retainedSlice() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf slice(int index, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
+  @Override
+  public ByteBuf retainedSlice(int index, int length) {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public ByteBuf retainedDuplicate() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
   public ByteBuf duplicate() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int nioBufferCount() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuffer nioBuffer() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuffer nioBuffer(int index, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuffer[] nioBuffers() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public ByteBuffer[] nioBuffers(int index, int length) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public boolean hasArray() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public byte[] array() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public int arrayOffset() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public boolean hasMemoryAddress() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public long memoryAddress() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public String toString(Charset charset) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
   public String toString(int index, int length, Charset charset) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
-
   }
 
   @Override
@@ -793,33 +906,51 @@
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
+  @Override
+  public boolean isReadOnly() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public ByteBuf asReadOnly() {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
 
   @Override
   public boolean equals(Object arg0) {
     return false;
   }
 
+  @Override
+  public ByteBuf touch() {
+    return this;
+  }
 
   @Override
-  public int forEachByte(ByteBufProcessor arg0) {
+  public ByteBuf touch(Object hint) {
+    return this;
+  }
+
+  @Override
+  public int forEachByte(ByteProcessor arg0) {
     return 0;
   }
 
 
   @Override
-  public int forEachByte(int arg0, int arg1, ByteBufProcessor arg2) {
+  public int forEachByte(int arg0, int arg1, ByteProcessor arg2) {
     return 0;
   }
 
 
   @Override
-  public int forEachByteDesc(ByteBufProcessor arg0) {
+  public int forEachByteDesc(ByteProcessor arg0) {
     return 0;
   }
 
 
   @Override
-  public int forEachByteDesc(int arg0, int arg1, ByteBufProcessor arg2) {
+  public int forEachByteDesc(int arg0, int arg1, ByteProcessor arg2) {
     return 0;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
index 179cc7c..85d14dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.rpc;
 
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
 
 import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -60,10 +60,10 @@
   void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data);
 
   /**
-   * Returns the {@link ChannelFuture} which will be notified when this
+   * Returns the {@link Future} which will be notified when this
    * channel is closed.  This method always returns the same future instance.
    */
-  ChannelFuture getChannelClosureFuture();
+  Future<Void> getClosureFuture();
 
   /**
    * @return Return the client node address.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index cb1db13..892636c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,14 +19,12 @@
 
 import com.google.protobuf.MessageLite;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -284,14 +282,9 @@
     }
 
     @Override
-    public ChannelFuture getChannelClosureFuture() {
+    public Future<Void> getClosureFuture() {
       return getChannel().closeFuture()
-          .addListener(new GenericFutureListener<Future<? super Void>>() {
-            @Override
-            public void operationComplete(Future<? super Void> future) throws Exception {
-              cleanup();
-            }
-          });
+          .addListener(future -> cleanup());
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
index f847323..c04a75d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
@@ -19,13 +19,12 @@
 
 import java.net.SocketAddress;
 
+import io.netty.util.concurrent.Future;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
 import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.UserSession;
 
-import io.netty.channel.ChannelFuture;
-
 public abstract class BaseWebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
 
   protected WebSessionResources webSessionResources;
@@ -40,7 +39,7 @@
   }
 
   @Override
-  public ChannelFuture getChannelClosureFuture() {
+  public Future<Void> getClosureFuture() {
     return webSessionResources.getCloseFuture();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index c345571..8473d64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -29,8 +29,8 @@
 import freemarker.cache.WebappTemplateLoader;
 import freemarker.core.HTMLOutputFormat;
 import freemarker.template.Configuration;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Promise;
 import io.netty.util.concurrent.EventExecutor;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
@@ -221,11 +221,11 @@
                 config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
                 config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
 
-        // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close
+        // Create a future which is needed by Foreman only. Foreman uses this future to add a close
         // listener to known about channel close event from underlying layer. We use this future to notify Foreman
         // listeners when the Web session (not connection) between Web Client and WebServer is closed. This will help
         // Foreman to cancel all the running queries for this Web Client.
-        final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+        final Promise<Void> closeFuture = new DefaultPromise<>(executor);
 
         // Create a WebSessionResource instance which owns the lifecycle of all the session resources.
         // Set this instance as an attribute of HttpSession, since it will be used until session is destroyed
@@ -283,12 +283,12 @@
         logger.trace("Failed to get the remote address of the http session request", ex);
       }
 
-      // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close
+      // Create a close future which is needed by Foreman only. Foreman uses this future to add a close
       // listener to known about channel close event from underlying layer.
       //
       // The invocation of this close future is no-op as it will be triggered after query completion in unsecure case.
       // But we need this close future as it's expected by Foreman.
-      final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+      final Promise<Void> closeFuture = new DefaultPromise(executor);
 
       final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress,
           drillUserSession, closeFuture);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index c06770e..e678923 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Promise;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.ChannelClosedException;
@@ -40,10 +40,10 @@
 
   private final UserSession webUserSession;
 
-  private ChannelPromise closeFuture;
+  private Promise<Void> closeFuture;
 
   WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress,
-                      UserSession userSession, ChannelPromise closeFuture) {
+                      UserSession userSession, Promise<Void> closeFuture) {
     this.allocator = allocator;
     this.remoteAddress = remoteAddress;
     this.webUserSession = userSession;
@@ -58,7 +58,7 @@
     return allocator;
   }
 
-  public ChannelPromise getCloseFuture() {
+  public Promise<Void> getCloseFuture() {
     return closeFuture;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index c81e5b6..5e5f57e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -45,12 +45,8 @@
  * access to the {@code UserSession} executing the query. There is no actual physical
  * channel corresponding to this connection wrapper.
  *
- * It returns a close future with no actual underlying
- * {@link io.netty.channel.Channel} associated with it but do have an
- * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no actual
- * connection established using this class, hence the close event will never be
- * fired by underlying layer and close future is set only when the
- * {@link WebSessionResources} are closed.
+ * It returns a close future which do have an EventExecutor out of BitServer EventLoopGroup.
+ * Close future is set only when the {@link WebSessionResources} are closed.
  */
 public class WebUserConnection extends BaseWebUserConnection {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 38b85b6..64df6c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -22,7 +22,6 @@
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -116,7 +115,7 @@
 
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final GenericFutureListener<Future<Void>> closeListener = future -> cancel();
-  private final ChannelFuture closeFuture;
+  private final Future<Void> closeFuture;
   private final FragmentsRunner fragmentsRunner;
   private final QueryStateProcessor queryStateProcessor;
 
@@ -141,7 +140,7 @@
     this.queryRequest = queryRequest;
     this.drillbitContext = drillbitContext;
     this.initiatingClient = connection;
-    this.closeFuture = initiatingClient.getChannelClosureFuture();
+    this.closeFuture = initiatingClient.getClosureFuture();
     closeFuture.addListener(closeListener);
 
     // Apply AutoLimit on resultSet (Usually received via REST APIs)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index 5de560a..9368a54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -19,7 +19,7 @@
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
 import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -244,8 +244,8 @@
     }
 
     @Override
-    public ChannelFuture getChannelClosureFuture() {
-      return inner.getChannelClosureFuture();
+    public Future<Void> getClosureFuture() {
+      return inner.getClosureFuture();
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
index 0e9e48d..0c5d3c9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
 import io.netty.channel.local.LocalAddress;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
@@ -30,6 +30,7 @@
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.test.ClusterTest;
+import org.mockito.Mockito;
 
 public class RestServerTest extends ClusterTest {
 
@@ -55,7 +56,7 @@
         .withOptionManager(systemOptions)
         .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(principal.getName()).build())
         .build(),
-      new DefaultChannelPromise(null));
+      Mockito.mock(Promise.class));
     WebUserConnection connection = new WebUserConnection.AnonWebUserConnection(webSessionResources);
     return new RestQueryRunner(q, cluster.drillbit().getManager(), connection).run();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
index 69fa942..56eb84b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -33,7 +33,6 @@
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -65,15 +64,13 @@
   @Test
   public void testChannelPromiseWithNullExecutor() throws Exception {
     try {
-      ChannelPromise closeFuture = new DefaultChannelPromise(null);
+      Promise<Void> closeFuture = new DefaultPromise(null);
       webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
           (UserSession.class), closeFuture);
       webSessionResources.close();
       fail();
     } catch (Exception e) {
       assertTrue(e instanceof NullPointerException);
-      verify(webSessionResources.getAllocator()).close();
-      verify(webSessionResources.getSession()).close();
     }
   }
 
@@ -85,14 +82,12 @@
   public void testChannelPromiseWithValidExecutor() throws Exception {
     try {
       EventExecutor mockExecutor = mock(EventExecutor.class);
-      ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor);
+      Promise<Void> closeFuture = new DefaultPromise(mockExecutor);
       webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
           (UserSession.class), closeFuture);
       webSessionResources.close();
       verify(webSessionResources.getAllocator()).close();
       verify(webSessionResources.getSession()).close();
-      verify(mockExecutor).inEventLoop();
-      verify(mockExecutor).execute(any(Runnable.class));
       assertTrue(webSessionResources.getCloseFuture() == null);
       assertTrue(!listenerComplete);
     } catch (Exception e) {
@@ -107,7 +102,7 @@
   @Test
   public void testDoubleClose() throws Exception {
     try {
-      ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class));
+      Promise<Void> closeFuture = new DefaultPromise(mock(EventExecutor.class));
       webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
           (UserSession.class), closeFuture);
       webSessionResources.close();
@@ -134,7 +129,7 @@
       GenericFutureListener<Future<Void>> closeListener = new TestClosedListener();
       latch = new CountDownLatch(1);
       executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next();
-      ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+      Promise<Void> closeFuture = new DefaultPromise(executor);
 
       // create WebSessionResources with above ChannelPromise to notify listener
       webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 3257d18..1d9e0a0 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -535,7 +535,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>46000000</maxsize>
+                  <maxsize>46300000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -595,7 +595,7 @@
                           This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                         </message>
-                        <maxsize>46000000</maxsize>
+                        <maxsize>46300000</maxsize>
                         <minsize>15000000</minsize>
                         <files>
                           <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 9e90d06..c055344 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -22,6 +22,7 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
@@ -471,12 +472,29 @@
   }
 
   @Override
+  public ByteBuf touch() {
+    return this;
+  }
+
+  @Override
+  public ByteBuf touch(Object hint) {
+    return this;
+  }
+
+  @Override
   public long getLong(int index) {
     chk(index, 8);
     return PlatformDependent.getLong(addr(index));
   }
 
   @Override
+  public long getLongLE(int index) {
+    chk(index, 8);
+    final long var = PlatformDependent.getLong(addr(index));
+    return Long.reverseBytes(var);
+  }
+
+  @Override
   public float getFloat(int index) {
     return Float.intBitsToFloat(getInt(index));
   }
@@ -503,6 +521,13 @@
   }
 
   @Override
+  public int getIntLE(int index) {
+    chk(index, 4);
+    final int var = PlatformDependent.getInt(addr(index));
+    return Integer.reverseBytes(var);
+  }
+
+  @Override
   public int getUnsignedShort(int index) {
     return getShort(index) & 0xFFFF;
   }
@@ -514,6 +539,28 @@
   }
 
   @Override
+  public short getShortLE(int index) {
+    chk(index, 2);
+    final short var = PlatformDependent.getShort(addr(index));
+    return Short.reverseBytes(var);
+  }
+
+  @Override
+  public int getUnsignedMedium(int index) {
+    final long addr = addr(index);
+    return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+            (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+            PlatformDependent.getByte(addr + 2) & 0xff;
+  }
+
+  @Override
+  public int getUnsignedMediumLE(int index) {
+    final long addr = this.addr(index);
+    return PlatformDependent.getByte(addr) & 255 |
+            (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8;
+  }
+
+  @Override
   public ByteBuf setShort(int index, int value) {
     chk(index, 2);
     PlatformDependent.putShort(addr(index), (short) value);
@@ -521,6 +568,31 @@
   }
 
   @Override
+  public ByteBuf setShortLE(int index, int value) {
+    chk(index, 2);
+    PlatformDependent.putShort(addr(index), Short.reverseBytes((short)value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf setMedium(int index, int value) {
+    chk(index, 3);
+    long addr = this.addr(index);
+    PlatformDependent.putByte(addr, (byte)(value >>> 16));
+    PlatformDependent.putShort(addr + 1L, (short)value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setMediumLE(int index, int value) {
+    chk(index, 3);
+    long addr = this.addr(index);
+    PlatformDependent.putByte(addr, (byte)value);
+    PlatformDependent.putShort(addr + 1L, Short.reverseBytes((short)(value >>> 8)));
+    return this;
+  }
+
+  @Override
   public ByteBuf setInt(int index, int value) {
     chk(index, 4);
     PlatformDependent.putInt(addr(index), value);
@@ -528,6 +600,13 @@
   }
 
   @Override
+  public ByteBuf setIntLE(int index, int value) {
+    chk(index, 4);
+    PlatformDependent.putInt(addr(index), Integer.reverseBytes(value));
+    return this;
+  }
+
+  @Override
   public ByteBuf setLong(int index, long value) {
     chk(index, 8);
     PlatformDependent.putLong(addr(index), value);
@@ -535,6 +614,13 @@
   }
 
   @Override
+  public ByteBuf setLongLE(int index, long value) {
+    chk(index, 4);
+    PlatformDependent.putLong(addr(index), Long.reverseBytes(value));
+    return this;
+  }
+
+  @Override
   public ByteBuf setChar(int index, int value) {
     chk(index, 2);
     PlatformDependent.putShort(addr(index), (short) value);
@@ -643,16 +729,31 @@
   }
 
   @Override
+  protected short _getShortLE(int index) {
+    return getShortLE(index);
+  }
+
+  @Override
   protected int _getInt(int index) {
     return getInt(index);
   }
 
   @Override
+  protected int _getIntLE(int index) {
+    return getIntLE(index);
+  }
+
+  @Override
   protected long _getLong(int index) {
     return getLong(index);
   }
 
   @Override
+  protected long _getLongLE(int index) {
+    return getLongLE(index);
+  }
+
+  @Override
   protected void _setByte(int index, int value) {
     setByte(index, value);
   }
@@ -663,21 +764,41 @@
   }
 
   @Override
+  protected void _setShortLE(int index, int value) {
+    setShortLE(index, value);
+  }
+
+  @Override
   protected void _setMedium(int index, int value) {
     setMedium(index, value);
   }
 
   @Override
+  protected void _setMediumLE(int index, int value) {
+    setMediumLE(index, value);
+  }
+
+  @Override
   protected void _setInt(int index, int value) {
     setInt(index, value);
   }
 
   @Override
+  protected void _setIntLE(int index, int value) {
+    setIntLE(index, value);
+  }
+
+  @Override
   protected void _setLong(int index, long value) {
     setLong(index, value);
   }
 
   @Override
+  protected void _setLongLE(int index, long value) {
+    setLongLE(index, value);
+  }
+
+  @Override
   public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
     final int BULK_COPY_THR = 16;
     // Performance profiling indicated that using the "putByte()" method is faster for short
@@ -703,11 +824,18 @@
   }
 
   @Override
+  public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+    return udle.getBytes(index + offset, out, position, length);
+  }
+
+  @Override
   protected int _getUnsignedMedium(int index) {
-    final long addr = addr(index);
-    return (PlatformDependent.getByte(addr) & 0xff) << 16 |
-        (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
-        PlatformDependent.getByte(addr + 2) & 0xff;
+    return getUnsignedMedium(index);
+  }
+
+  @Override
+  protected int _getUnsignedMediumLE(int index) {
+    return getUnsignedMediumLE(index);
   }
 
   @Override
@@ -763,6 +891,11 @@
   }
 
   @Override
+  public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+    return udle.setBytes(index + offset, in, position, length);
+  }
+
+  @Override
   public byte getByte(int index) {
     chk(index, 1);
     return PlatformDependent.getByte(addr(index));
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
index d69d17b..d29adfa 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -22,6 +22,7 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 
@@ -127,6 +128,11 @@
   }
 
   @Override
+  protected short _getShortLE(int index) {
+    return buffer.getShortLE(index);
+  }
+
+  @Override
   public int getUnsignedMedium(int index) {
     return _getUnsignedMedium(index);
   }
@@ -137,6 +143,11 @@
   }
 
   @Override
+  protected int _getUnsignedMediumLE(int index) {
+    return buffer.getUnsignedMediumLE(index);
+  }
+
+  @Override
   public int getInt(int index) {
     return _getInt(index);
   }
@@ -147,6 +158,11 @@
   }
 
   @Override
+  protected int _getIntLE(int index) {
+    return buffer.getIntLE(index);
+  }
+
+  @Override
   public long getLong(int index) {
     return _getLong(index);
   }
@@ -157,6 +173,11 @@
   }
 
   @Override
+  protected long _getLongLE(int index) {
+    return buffer.getLongLE(index);
+  }
+
+  @Override
   public abstract ByteBuf copy(int index, int length);
 
   @Override
@@ -205,6 +226,11 @@
   }
 
   @Override
+  protected void _setShortLE(int index, int value) {
+    buffer.setShortLE(index, value);
+  }
+
+  @Override
   public ByteBuf setMedium(int index, int value) {
     _setMedium(index, value);
     return this;
@@ -216,6 +242,11 @@
   }
 
   @Override
+  protected void _setMediumLE(int index, int value) {
+    buffer.setMediumLE(index, value);
+  }
+
+  @Override
   public ByteBuf setInt(int index, int value) {
     _setInt(index, value);
     return this;
@@ -227,6 +258,11 @@
   }
 
   @Override
+  protected void _setIntLE(int index, int value) {
+    buffer.setIntLE(index, value);
+  }
+
+  @Override
   public ByteBuf setLong(int index, long value) {
     _setLong(index, value);
     return this;
@@ -238,6 +274,11 @@
   }
 
   @Override
+  protected void _setLongLE(int index, long value) {
+    buffer.setLongLE(index, value);
+  }
+
+  @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
     buffer.setBytes(index, src, srcIndex, length);
     return this;
@@ -269,6 +310,11 @@
   }
 
   @Override
+  public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+    return buffer.getBytes(index, out, position, length);
+  }
+
+  @Override
   public int setBytes(int index, InputStream in, int length)
       throws IOException {
     return buffer.setBytes(index, in, length);
@@ -281,6 +327,11 @@
   }
 
   @Override
+  public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+    return buffer.setBytes(index, in, position, length);
+  }
+
+  @Override
   public int nioBufferCount() {
     return buffer.nioBufferCount();
   }
@@ -296,16 +347,6 @@
   }
 
   @Override
-  public int forEachByte(int index, int length, ByteBufProcessor processor) {
-    return buffer.forEachByte(index, length, processor);
-  }
-
-  @Override
-  public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
-    return buffer.forEachByteDesc(index, length, processor);
-  }
-
-  @Override
   public final int refCnt() {
     return unwrap().refCnt();
   }
@@ -317,6 +358,18 @@
   }
 
   @Override
+  public ByteBuf touch() {
+    buffer.touch();
+    return this;
+  }
+
+  @Override
+  public ByteBuf touch(Object hint) {
+    buffer.touch(hint);
+    return this;
+  }
+
+  @Override
   public final ByteBuf retain(int increment) {
     unwrap().retain(increment);
     return this;
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 1e70216..bee3beb 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -68,6 +68,14 @@
     }
   }
 
+  public ByteBuf allocateHeap(int initialCapacity, int maxCapacity) {
+    try {
+      return allocator.heapBuffer(initialCapacity, maxCapacity);
+    } catch (OutOfMemoryError e) {
+      throw new OutOfMemoryException("Failure allocating heap buffer.", e);
+    }
+  }
+
   public int getChunkSize() {
     return allocator.chunkSize;
   }
@@ -201,12 +209,6 @@
       return newDirectBufferL(initialCapacity, maxCapacity);
     }
 
-    @Override
-    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
-      throw new UnsupportedOperationException(
-          "Drill doesn't support using heap buffers.");
-    }
-
     private void validate(int initialCapacity, int maxCapacity) {
       if (initialCapacity < 0) {
         throw new IllegalArgumentException(
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index f4d6b81..c6bc769 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -57,7 +57,7 @@
 
   private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
     super(buf);
-    if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+    if (!NATIVE_ORDER) {
       throw new IllegalStateException("Drill only runs on LittleEndian systems.");
     }
 
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
index 33325a5..81efdf0 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
@@ -17,22 +17,28 @@
  */
 package org.apache.drill.exec.memory;
 
+import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.ExpandableByteBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
 
 /**
- * An implementation of ByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
+ * An extend of AbstractByteBufAllocator that wraps a Drill BufferAllocator. This allows the RPC layer to be accounted
  * and managed using Drill's BufferAllocator infrastructure. The only thin different from a typical BufferAllocator is
  * the signature and the fact that this Allocator returns ExpandableByteBufs which enable otherwise non-expandable
  * DrillBufs to be expandable.
+ *
+ * Beside it, DrillByteBufAllocator uses PooledByteBufAllocatorL.InnerAllocator as allocator only for heapBuffer's for
+ * netty's purposes, when it directly calls heapBuffer methods.
  */
-public class DrillByteBufAllocator implements ByteBufAllocator {
+public class DrillByteBufAllocator extends AbstractByteBufAllocator {
 
   private static final int DEFAULT_BUFFER_SIZE = 4096;
   private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
 
+  // used to let netty properly work in case when it directly calls heapBuffer methods
+  private static final PooledByteBufAllocatorL HEAP_ALLOCATOR = AllocationManager.INNER_ALLOCATOR;
   private final BufferAllocator allocator;
 
   public DrillByteBufAllocator(BufferAllocator allocator) {
@@ -105,37 +111,28 @@
   }
 
   @Override
+  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+    return HEAP_ALLOCATOR.allocateHeap(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+    return buffer(initialCapacity, maxCapacity);
+  }
+
+  @Override
   public boolean isDirectBufferPooled() {
     return false;
   }
 
   @Override
-  public ByteBuf heapBuffer() {
-    throw fail();
-  }
-
-  @Override
-  public ByteBuf heapBuffer(int initialCapacity) {
-    throw fail();
-  }
-
-  @Override
-  public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
-    throw fail();
-  }
-
-  @Override
   public CompositeByteBuf compositeHeapBuffer() {
-    throw fail();
+    return compositeHeapBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
   }
 
   @Override
   public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
-    throw fail();
-  }
-
-  private RuntimeException fail() {
-    throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
+    return new CompositeByteBuf(this, false, maxNumComponents);
   }
 
 }
diff --git a/pom.xml b/pom.xml
index 1bd1dac..28dd9d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
     <surefire.version>3.0.0-M4</surefire.version>
     <commons.compress.version>1.20</commons.compress.version>
     <hikari.version>3.4.2</hikari.version>
-    <netty.version>4.0.48.Final</netty.version>
+    <netty.version>4.1.59.Final</netty.version>
     <httpclient.version>4.5.12</httpclient.version>
     <libthrift.version>0.13.0</libthrift.version>
     <derby.version>10.14.2.0</derby.version>
@@ -1822,6 +1822,10 @@
             <groupId>io.netty</groupId>
             <artifactId>netty</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
       <dependency>