IGNITE-13515 Performance drop when there are many thin clients per server
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 10197d0..7aa506b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -53,7 +53,8 @@
 import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
 import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
 import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DFLT_DISCOVERY_HISTORY_SIZE;
 import static org.apache.ignite.internal.processors.affinity.AffinityAssignment.DFLT_AFFINITY_BACKUPS_THRESHOLD;
 import static org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache.DFLT_AFFINITY_HISTORY_SIZE;
@@ -529,6 +530,13 @@
     public static final String IGNITE_MARSHAL_BUFFERS_RECHECK = "IGNITE_MARSHAL_BUFFERS_RECHECK";
 
     /**
+     * System property to specify per thread binary allocator chunk pool size. Default value is {@code 32}.
+     */
+    @SystemProperty(value = "Per thread binary allocator chunk pool size.",
+        type = Integer.class, defaults = "" + DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE)
+    public static final String IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = "IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE";
+
+    /**
      * System property to disable {@link HostnameVerifier} for SSL connections.
      * Can be used for development with self-signed certificates. Default value is {@code false}.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
index a6cfcbd..b1f0c68 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
@@ -45,6 +45,9 @@
     /** Default size of thread pool. */
     public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
 
+    /** Default selector count. */
+    public static final int DFLT_SELECTOR_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
+
     /** Default handshake timeout. */
     public static final int DFLT_HANDSHAKE_TIMEOUT = 10_000;
 
@@ -78,6 +81,9 @@
     /** Thread pool size. */
     private int threadPoolSize = DFLT_THREAD_POOL_SIZE;
 
+    /** Selector count. */
+    private int selectorCnt = DFLT_SELECTOR_CNT;
+
     /** Idle timeout. */
     private long idleTimeout = DFLT_IDLE_TIMEOUT;
 
@@ -297,21 +303,21 @@
     }
 
     /**
-     * Size of thread pool that is in charge of processing SQL requests.
+     * Size of thread pool that is in charge of processing client requests.
      * <p>
      * Defaults {@link #DFLT_THREAD_POOL_SIZE}.
      *
-     * @return Thread pool that is in charge of processing SQL requests.
+     * @return Thread pool that is in charge of processing client requests.
      */
     public int getThreadPoolSize() {
         return threadPoolSize;
     }
 
     /**
-     * Sets thread pool that is in charge of processing SQL requests. See {@link #getThreadPoolSize()} for more
+     * Sets thread pool that is in charge of processing client requests. See {@link #getThreadPoolSize()} for more
      * information.
      *
-     * @param threadPoolSize Thread pool that is in charge of processing SQL requests.
+     * @param threadPoolSize Thread pool that is in charge of processing client requests.
      * @return This instance for chaining.
      */
     public ClientConnectorConfiguration setThreadPoolSize(int threadPoolSize) {
@@ -321,6 +327,30 @@
     }
 
     /**
+     * Get count of selectors to use in TCP server.
+     * <p>
+     * Defaults {@link #DFLT_SELECTOR_CNT}.
+     *
+     * @return Count of selectors to use in TCP server.
+     */
+    public int getSelectorCount() {
+        return selectorCnt;
+    }
+
+    /**
+     * Sets count of selectors to use in TCP server. See {@link #getSelectorCount()} for more
+     * information.
+     *
+     * @param selectorCnt Count of selectors to use in TCP server.
+     * @return This instance for chaining.
+     */
+    public ClientConnectorConfiguration setSelectorCount(int selectorCnt) {
+        this.selectorCnt = selectorCnt;
+
+        return this;
+    }
+
+    /**
      * Gets idle timeout for client connections.
      * If no packets come within idle timeout, the connection is closed.
      * Zero or negative means no timeout.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
index f8da90b..1b68207 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
@@ -32,7 +32,7 @@
     };
 
     /** Memory chunk. */
-    private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.INSTANCE.chunk();
+    private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.THREAD_LOCAL.chunk();
 
     /** Schema holder. */
     private final BinaryWriterSchemaHolder schema = new BinaryWriterSchemaHolder();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
index 17bcdf6..0f411b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -37,7 +37,7 @@
      * @param cap Initial capacity.
      */
     public BinaryHeapOutputStream(int cap) {
-        this(cap, BinaryMemoryAllocator.INSTANCE.chunk());
+        this(cap, BinaryMemoryAllocator.THREAD_LOCAL.chunk());
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
index 5471bc5..8993fb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
@@ -17,41 +17,265 @@
 
 package org.apache.ignite.internal.binary.streams;
 
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
 /**
- * Thread-local memory allocator.
+ * On-heap memory allocator.
  */
-public final class BinaryMemoryAllocator {
-    /** Memory allocator instance. */
-    public static final BinaryMemoryAllocator INSTANCE = new BinaryMemoryAllocator();
+public abstract class BinaryMemoryAllocator {
+    /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
+    public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
 
-    /** Holders. */
-    private static final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>();
+    /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE */
+    public static final int DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = 32;
 
-    /**
-     * Ensures singleton.
-     */
-    private BinaryMemoryAllocator() {
-        // No-op.
+    /** Buffer size re-check frequency. */
+    private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
+
+    /** */
+    private static final int POOL_SIZE = Integer.getInteger(IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE, DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE);
+
+    /** Thread local allocator instance. */
+    public static final BinaryMemoryAllocator THREAD_LOCAL = new ThreadLocalAllocator();
+
+    /** Pooled allocator instance. */
+    public static final BinaryMemoryAllocator POOLED = new PooledAllocator();
+
+    /** */
+    public abstract BinaryMemoryAllocatorChunk chunk();
+
+    /** */
+    public abstract boolean isAcquired();
+
+    /** */
+    private static class ThreadLocalAllocator extends BinaryMemoryAllocator {
+        /** Holders. */
+        private final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>();
+
+        /** {@inheritDoc} */
+        @Override public BinaryMemoryAllocatorChunk chunk() {
+            BinaryMemoryAllocatorChunk holder = holders.get();
+
+            if (holder == null)
+                holders.set(holder = new Chunk());
+
+            return holder;
+        }
+
+        /**
+         * Checks whether a thread-local array is acquired or not.
+         * The function is used by Unit tests.
+         *
+         * @return {@code true} if acquired {@code false} otherwise.
+         */
+        @Override public boolean isAcquired() {
+            BinaryMemoryAllocatorChunk holder = holders.get();
+
+            return holder != null && holder.isAcquired();
+        }
+
+        /**
+         * Memory allocator chunk.
+         */
+        private static class Chunk implements BinaryMemoryAllocatorChunk {
+            /** Data array */
+            private byte[] data;
+
+            /** Max message size detected between checks. */
+            private int maxMsgSize;
+
+            /** Last time array size is checked. */
+            private long lastCheckNanos = System.nanoTime();
+
+            /** Whether the holder is acquired or not. */
+            private boolean acquired;
+
+            /** {@inheritDoc} */
+            @Override public byte[] allocate(int size) {
+                if (acquired)
+                    return new byte[size];
+
+                acquired = true;
+
+                if (data == null || size > data.length)
+                    data = new byte[size];
+
+                return data;
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] reallocate(byte[] data, int size) {
+                byte[] newData = new byte[size];
+
+                if (this.data == data)
+                    this.data = newData;
+
+                System.arraycopy(data, 0, newData, 0, data.length);
+
+                return newData;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void release(byte[] data, int maxMsgSize) {
+                if (this.data != data)
+                    return;
+
+                if (maxMsgSize > this.maxMsgSize)
+                    this.maxMsgSize = maxMsgSize;
+
+                acquired = false;
+
+                long nowNanos = System.nanoTime();
+
+                if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
+                    int halfSize = data.length >> 1;
+
+                    if (this.maxMsgSize < halfSize)
+                        this.data = new byte[halfSize];
+
+                    lastCheckNanos = nowNanos;
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean isAcquired() {
+                return acquired;
+            }
+        }
     }
 
-    public BinaryMemoryAllocatorChunk chunk() {
-        BinaryMemoryAllocatorChunk holder = holders.get();
+    /** */
+    private static class PooledAllocator extends BinaryMemoryAllocator {
+        /** */
+        private final ThreadLocal<DataHoldersPool> holders = ThreadLocal.withInitial(DataHoldersPool::new);
 
-        if (holder == null)
-            holders.set(holder = new BinaryMemoryAllocatorChunk());
+        /** {@inheritDoc} */
+        @Override public BinaryMemoryAllocatorChunk chunk() {
+            return new Chunk(holders.get());
+        }
 
-        return holder;
-    }
+        /** {@inheritDoc} */
+        @Override public boolean isAcquired() {
+            return false;
+        }
 
-    /**
-     * Checks whether a thread-local array is acquired or not.
-     * The function is used by Unit tests.
-     *
-     * @return {@code true} if acquired {@code false} otherwise.
-     */
-    public boolean isAcquired() {
-        BinaryMemoryAllocatorChunk holder = holders.get();
+        /** */
+        private static class DataHoldersPool {
+            /** */
+            private final ArrayDeque<DataHolder> pool = new ArrayDeque<>(POOL_SIZE);
 
-        return holder != null && holder.isAcquired();
+            /** */
+            public synchronized DataHolder acquire() {
+                return pool.isEmpty() ? new DataHolder() : pool.pop();
+            }
+
+            /** */
+            public synchronized void release(DataHolder holder) {
+                if (pool.size() < POOL_SIZE) pool.push(holder);
+            }
+        }
+
+        /** */
+        private static class DataHolder {
+            /** Size history. */
+            private final int[] history = new int[128];
+
+            /** Size history cntr. */
+            private int cntr;
+
+            /** Last time array size is checked. */
+            private long lastCheckNanos = System.nanoTime();
+
+            /** Data array */
+            private byte[] data;
+
+            /** */
+            public byte[] ensureCapacity(int size, boolean copy) {
+                if (data == null)
+                    data = new byte[size];
+                else if (data.length < size)
+                    data = copy ? Arrays.copyOf(data, size) : new byte[size];
+
+                return data;
+            }
+
+            /** */
+            public boolean corresponds(byte[] data) {
+                return data != null && this.data == data;
+            }
+
+            /** */
+            public void adjustSize(int msgSize) {
+                history[cntr % history.length] = msgSize;
+                cntr = cntr == Integer.MAX_VALUE ? 0 : cntr + 1;
+
+                long now = System.nanoTime();
+                if (U.nanosToMillis(now - lastCheckNanos) >= CHECK_FREQ && cntr > history.length) {
+                    lastCheckNanos = now;
+
+                    int[] tmp = Arrays.copyOf(history, history.length);
+                    Arrays.sort(tmp);
+                    int adjusted = U.nextPowerOf2(tmp[tmp.length / 2]);
+
+                    if (adjusted < data.length)
+                        data = new byte[adjusted];
+                }
+            }
+        }
+
+        /** */
+        private static class Chunk implements BinaryMemoryAllocatorChunk {
+            /** */
+            private volatile DataHolder holder;
+
+            /** */
+            private final DataHoldersPool pool;
+
+            /** */
+            private Chunk(DataHoldersPool pool) {
+                this.pool = pool;
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] allocate(int size) {
+                if (holder != null)
+                    return new byte[size];
+
+                holder = pool.acquire();
+                return holder.ensureCapacity(size, false);
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] reallocate(byte[] data, int size) {
+                DataHolder holder0 = holder;
+                if (holder0 != null && holder0.corresponds(data))
+                    return holder0.ensureCapacity(size, true);
+                else
+                    return Arrays.copyOf(data, size);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void release(byte[] data, int msgSize) {
+                DataHolder holder0 = holder;
+                if (holder0 == null || !holder0.corresponds(data))
+                    return;
+
+                holder.adjustSize(msgSize);
+
+                pool.release(holder);
+                holder = null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean isAcquired() {
+                return holder != null;
+            }
+        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
index 27570cf..5f2dfee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
@@ -17,50 +17,17 @@
 
 package org.apache.ignite.internal.binary.streams;
 
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-
 /**
  * Memory allocator chunk.
  */
-public class BinaryMemoryAllocatorChunk {
-    /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
-    public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
-
-    /** Buffer size re-check frequency. */
-    private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
-
-    /** Data array */
-    private byte[] data;
-
-    /** Max message size detected between checks. */
-    private int maxMsgSize;
-
-    /** Last time array size is checked. */
-    private long lastCheckNanos = System.nanoTime();
-
-    /** Whether the holder is acquired or not. */
-    private boolean acquired;
-
+public interface BinaryMemoryAllocatorChunk {
     /**
      * Allocate.
      *
      * @param size Desired size.
      * @return Data.
      */
-    public byte[] allocate(int size) {
-        if (acquired)
-            return new byte[size];
-
-        acquired = true;
-
-        if (data == null || size > data.length)
-            data = new byte[size];
-
-        return data;
-    }
+    public byte[] allocate(int size);
 
     /**
      * Reallocate.
@@ -69,45 +36,15 @@
      * @param size Size.
      * @return New data.
      */
-    public byte[] reallocate(byte[] data, int size) {
-        byte[] newData = new byte[size];
-
-        if (this.data == data)
-            this.data = newData;
-
-        System.arraycopy(data, 0, newData, 0, data.length);
-
-        return newData;
-    }
+    public byte[] reallocate(byte[] data, int size);
 
     /**
      * Shrinks array size if needed.
      */
-    public void release(byte[] data, int maxMsgSize) {
-        if (this.data != data)
-            return;
-
-        if (maxMsgSize > this.maxMsgSize)
-            this.maxMsgSize = maxMsgSize;
-
-        this.acquired = false;
-
-        long nowNanos = System.nanoTime();
-
-        if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
-            int halfSize = data.length >> 1;
-
-            if (this.maxMsgSize < halfSize)
-                this.data = new byte[halfSize];
-
-            lastCheckNanos = nowNanos;
-        }
-    }
+    public void release(byte[] data, int maxMsgSize);
 
     /**
      * @return {@code True} if acquired.
      */
-    public boolean isAcquired() {
-        return acquired;
-    }
+    public boolean isAcquired();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index acc0ffe..28330e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -410,14 +410,15 @@
      * Process next message from the input stream and complete corresponding future.
      */
     private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
-        int msgSize = dataInput.readInt();
+        // blocking read a message header not to fall into a busy loop
+        int msgSize = dataInput.readInt(2048);
 
         if (msgSize <= 0)
             throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
 
         long bytesReadOnStartMsg = dataInput.totalBytesRead();
 
-        long resId = dataInput.readLong();
+        long resId = dataInput.spinReadLong();
 
         int status = 0;
 
@@ -426,11 +427,11 @@
         BinaryInputStream resIn;
 
         if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
-            short flags = dataInput.readShort();
+            short flags = dataInput.spinReadShort();
 
             if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
-                long topVer = dataInput.readLong();
-                int minorTopVer = dataInput.readInt();
+                long topVer = dataInput.spinReadLong();
+                int minorTopVer = dataInput.spinReadInt();
 
                 srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
@@ -439,7 +440,7 @@
             }
 
             if ((flags & ClientFlag.NOTIFICATION) != 0) {
-                short notificationCode = dataInput.readShort();
+                short notificationCode = dataInput.spinReadShort();
 
                 notificationOp = ClientOperation.fromCode(notificationCode);
 
@@ -448,10 +449,10 @@
             }
 
             if ((flags & ClientFlag.ERROR) != 0)
-                status = dataInput.readInt();
+                status = dataInput.spinReadInt();
         }
         else
-            status = dataInput.readInt();
+            status = dataInput.spinReadInt();
 
         int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
 
@@ -460,12 +461,12 @@
 
         if (status == 0) {
             if (msgSize > hdrSize)
-                res = dataInput.read(msgSize - hdrSize);
+                res = dataInput.spinRead(msgSize - hdrSize);
         }
         else if (status == ClientStatus.SECURITY_VIOLATION)
             err = new ClientAuthorizationException();
         else {
-            resIn = new BinaryHeapInputStream(dataInput.read(msgSize - hdrSize));
+            resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize));
 
             String errMsg = ClientUtils.createBinaryReader(null, resIn).readString();
 
@@ -713,47 +714,81 @@
             this.in = in;
         }
 
+        /** Read bytes from the input stream. */
+        public byte[] read(int len) throws ClientConnectionException {
+            byte[] bytes = new byte[len];
+
+            read(bytes, len, 0);
+
+            return bytes;
+        }
+
+        /** Read bytes from the input stream. */
+        public byte[] spinRead(int len) {
+            byte[] bytes = new byte[len];
+
+            read(bytes, len, Integer.MAX_VALUE);
+
+            return bytes;
+        }
+
         /**
          * Read bytes from the input stream to the buffer.
          *
          * @param bytes Bytes buffer.
          * @param len Length.
+         * @param tryReadCnt Number of reads before falling into blocking read.
          */
-        private void read(byte[] bytes, int len) throws ClientConnectionException {
-            int bytesNum;
-            int readBytesNum = 0;
+        public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException {
+            int offset = 0;
 
-            while (readBytesNum < len) {
-                try {
-                    bytesNum = in.read(bytes, readBytesNum, len - readBytesNum);
+            try {
+                while (offset < len) {
+                    int toRead;
+
+                    if (tryReadCnt == 0)
+                        toRead = len - offset;
+                    else if ((toRead = Math.min(in.available(), len - offset)) == 0) {
+                        tryReadCnt--;
+
+                        continue;
+                    }
+
+                    int read = in.read(bytes, offset, toRead);
+
+                    if (read < 0)
+                        throw handleIOError(null);
+
+                    offset += read;
+                    totalBytesRead += read;
                 }
-                catch (IOException e) {
-                    throw handleIOError(e);
-                }
-
-                if (bytesNum < 0)
-                    throw handleIOError(null);
-
-                readBytesNum += bytesNum;
             }
-
-            totalBytesRead += readBytesNum;
-        }
-
-        /** Read bytes from the input stream. */
-        public byte[] read(int len) throws ClientConnectionException {
-            byte[] bytes = new byte[len];
-
-            read(bytes, len);
-
-            return bytes;
+            catch (IOException e) {
+                throw handleIOError(e);
+            }
         }
 
         /**
          * Read long value from the input stream.
          */
         public long readLong() throws ClientConnectionException {
-            read(tmpBuf, Long.BYTES);
+            return readLong(0);
+        }
+
+        /**
+         * Read long value from the input stream.
+         */
+        public long spinReadLong() throws ClientConnectionException {
+            return readLong(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read long value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        private long readLong(int tryReadCnt) throws ClientConnectionException {
+            read(tmpBuf, Long.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readLong(tmpBuf, 0);
         }
@@ -762,7 +797,23 @@
          * Read int value from the input stream.
          */
         public int readInt() throws ClientConnectionException {
-            read(tmpBuf, Integer.BYTES);
+            return readInt(0);
+        }
+
+        /**
+         * Read int value from the input stream.
+         */
+        public int spinReadInt() throws ClientConnectionException {
+            return readInt(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read int value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        private int readInt(int tryReadCnt) throws ClientConnectionException {
+            read(tmpBuf, Integer.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readInt(tmpBuf, 0);
         }
@@ -771,7 +822,23 @@
          * Read short value from the input stream.
          */
         public short readShort() throws ClientConnectionException {
-            read(tmpBuf, Short.BYTES);
+            return readShort(0);
+        }
+
+        /**
+         * Read short value from the input stream.
+         */
+        public short spinReadShort() throws ClientConnectionException {
+            return readShort(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read short value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        public short readShort(int tryReadCnt) throws ClientConnectionException {
+            read(tmpBuf, Short.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readShort(tmpBuf, 0);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
deleted file mode 100644
index 48c7367..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.nio.GridNioParser;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-
-/**
- * This class implements stream parser based on {@link ClientListenerNioServerBuffer}.
- * <p>
- * The rule for this parser is that every message sent over the stream is prepended with
- * 4-byte integer header containing message size. So, the stream structure is as follows:
- * <pre>
- *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
- *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
- *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
- * </pre>
- */
-public class ClientListenerBufferedParser implements GridNioParser {
-    /** Buffer metadata key. */
-    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** {@inheritDoc} */
-    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
-        ClientListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY);
-
-        // Decode for a given session is called per one thread, so there should not be any concurrency issues.
-        // However, we make some additional checks.
-        if (nioBuf == null) {
-            nioBuf = new ClientListenerNioServerBuffer();
-
-            ClientListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
-
-            assert old == null;
-        }
-
-        return nioBuf.read(buf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
-        byte[] msg0 = (byte[])msg;
-
-        ByteBuffer res = ByteBuffer.allocate(msg0.length + 4);
-
-        res.order(ByteOrder.LITTLE_ENDIAN);
-
-        res.putInt(msg0.length);
-        res.put(msg0);
-
-        res.flip();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return ClientListenerBufferedParser.class.getSimpleName();
-    }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
index c3782ef..4dfc117 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
@@ -27,7 +27,7 @@
      * @param msg Message.
      * @return Request.
      */
-    ClientListenerRequest decode(byte[] msg);
+    ClientListenerRequest decode(ClientMessage msg);
 
     /**
      * Encode response to byte array.
@@ -35,7 +35,7 @@
      * @param resp Response.
      * @return Message.
      */
-    byte[] encode(ClientListenerResponse resp);
+    ClientMessage encode(ClientListenerResponse resp);
 
     /**
      * Decode command type. Allows to recognize the command (message type) without decoding the entire message.
@@ -43,7 +43,7 @@
      * @param msg Message.
      * @return Command type.
      */
-    int decodeCommandType(byte[] msg);
+    int decodeCommandType(ClientMessage msg);
 
     /**
      * Decode request Id. Allows to recognize the request Id, if any, without decoding the entire message.
@@ -51,5 +51,5 @@
      * @param msg Message.
      * @return Request Id.
      */
-    long decodeRequestId(byte[] msg);
+    long decodeRequestId(ClientMessage msg);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 369eb2d..393383d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -53,7 +53,7 @@
 /**
  * Client message listener.
  */
-public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte[]> {
+public class ClientListenerNioListener extends GridNioServerListenerAdapter<ClientMessage> {
     /** ODBC driver handshake code. */
     public static final byte ODBC_CLIENT = 0;
 
@@ -139,7 +139,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void onMessage(GridNioSession ses, byte[] msg) {
+    @Override public void onMessage(GridNioSession ses, ClientMessage msg) {
         assert msg != null;
 
         ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
@@ -214,9 +214,7 @@
                         ", resp=" + resp.status() + ']');
                 }
 
-                byte[] outMsg = parser.encode(resp);
-
-                GridNioFuture<?> fut = ses.send(outMsg);
+                    GridNioFuture<?> fut = ses.send(parser.encode(resp));
 
                 fut.listen(f -> {
                     if (f.error() == null)
@@ -289,7 +287,7 @@
      * @param ses Session.
      * @param msg Message bytes.
      */
-    private void onHandshake(GridNioSession ses, byte[] msg) {
+    private void onHandshake(GridNioSession ses, ClientMessage msg) {
         BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);
 
         BinaryMarshaller marsh = new BinaryMarshaller();
@@ -298,7 +296,7 @@
 
         ctx.configure(marsh);
 
-        BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg), null, true);
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg.payload()), null, true);
 
         byte cmd = reader.readByte();
 
@@ -373,7 +371,7 @@
                 writer.writeInt(ClientStatus.FAILED);
         }
 
-        ses.send(writer.array());
+        ses.send(new ClientMessage(writer.array()));
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
new file mode 100644
index 0000000..5ba8afc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * This class implements stream parser based on {@link ClientListenerNioServerBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is prepended with
+ * 4-byte integer header containing message size. So, the stream structure is as follows:
+ * <pre>
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * </pre>
+ */
+public class ClientListenerNioMessageParser implements GridNioParser {
+    /** Message metadata key. */
+    static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Reader metadata key. */
+    static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    public ClientListenerNioMessageParser(IgniteLogger log) {
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+        Message msg = ses.removeMeta(MSG_META_KEY);
+
+        try {
+            if (msg == null)
+                msg = new ClientMessage();
+
+            boolean finished = false;
+
+            if (buf.hasRemaining())
+                finished = msg.readFrom(buf, null);
+
+            if (finished)
+                return msg;
+            else {
+                ses.addMeta(MSG_META_KEY, msg);
+
+                return null;
+            }
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed to read message [msg=" + msg +
+                    ", buf=" + buf + ", ses=" + ses + "]", e);
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return ClientListenerNioMessageParser.class.getSimpleName();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index ecebc4a..517d213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -86,13 +86,13 @@
     private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
 
     /** Default TCP direct buffer flag. */
-    private static final boolean DFLT_TCP_DIRECT_BUF = false;
+    private static final boolean DFLT_TCP_DIRECT_BUF = true;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** TCP Server. */
-    private GridNioServer<byte[]> srv;
+    private GridNioServer<ClientMessage> srv;
 
     /** Executor service. */
     private ExecutorService execSvc;
@@ -158,14 +158,16 @@
 
                 long idleTimeout = cliConnCfg.getIdleTimeout();
 
+                int selectorCnt = cliConnCfg.getSelectorCount();
+
                 for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) {
                     try {
-                        GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
+                        srv = GridNioServer.<ClientMessage>builder()
                             .address(hostAddr)
                             .port(port)
                             .listener(new ClientListenerNioListener(ctx, busyLock, cliConnCfg))
                             .logger(log)
-                            .selectorCount(DFLT_SELECTOR_CNT)
+                            .selectorCount(selectorCnt)
                             .igniteInstanceName(ctx.igniteInstanceName())
                             .serverName("client-listener")
                             .tcpNoDelay(cliConnCfg.isTcpNoDelay())
@@ -174,12 +176,10 @@
                             .socketSendBufferSize(cliConnCfg.getSocketSendBufferSize())
                             .socketReceiveBufferSize(cliConnCfg.getSocketReceiveBufferSize())
                             .filters(filters)
-                            .directMode(false)
+                            .directMode(true)
                             .idleTimeout(idleTimeout > 0 ? idleTimeout : Long.MAX_VALUE)
                             .build();
 
-                        srv = srv0;
-
                         ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
 
                         if (log.isInfoEnabled())
@@ -287,14 +287,14 @@
                 ClientListenerConnectionContext connCtx = ses.meta(ClientListenerNioListener.CONN_CTX_META_KEY);
 
                 if (connCtx != null && connCtx.parser() != null && connCtx.handler().isCancellationSupported()) {
-                    byte[] inMsg;
+                    ClientMessage inMsg;
 
                     int cmdType;
 
                     long reqId;
 
                     try {
-                        inMsg = (byte[])msg;
+                        inMsg = (ClientMessage)msg;
 
                         cmdType = connCtx.parser().decodeCommandType(inMsg);
 
@@ -324,7 +324,7 @@
             }
         };
 
-        GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerBufferedParser(), log, false);
+        GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerNioMessageParser(log), log, true);
 
         if (cliConnCfg.isSslEnabled()) {
             Factory<SSLContext> sslCtxFactory = cliConnCfg.isUseIgniteSslContextFactory() ?
@@ -337,7 +337,7 @@
             GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtxFactory.create(),
                 true, ByteOrder.nativeOrder(), log);
 
-            sslFilter.directMode(false);
+            sslFilter.directMode(true);
 
             boolean auth = cliConnCfg.isSslClientAuth();
 
@@ -411,8 +411,7 @@
     private static String clientConnectionDescription(
         GridNioSession ses,
         ClientListenerConnectionContext ctx
-    )
-    {
+    ) {
         AuthorizationContext authCtx = ctx.authorizationContext();
 
         StringBuilder sb = new StringBuilder();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
new file mode 100644
index 0000000..f1176d6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+@IgniteCodeGeneratingFail
+public class ClientMessage implements Message, Externalizable {
+    /** */
+    private static final long serialVersionUID = -4609408156037304495L;
+
+    /** */
+    private byte[] data;
+
+    /** */
+    private BinaryHeapOutputStream stream;
+
+    /** */
+    private int cnt = -4;
+
+    /** */
+    private int msgSize;
+
+    /** */
+    public ClientMessage() {}
+
+    /** */
+    public ClientMessage(byte[] data) {
+        this.data = data;
+    }
+
+    /** */
+    public ClientMessage(BinaryHeapOutputStream stream) {
+        this.stream = stream;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter ignored) {
+        assert stream != null || data != null;
+
+        byte[] data = stream != null ? stream.array() : this.data;
+        int msgSize = stream != null ? stream.position() : data.length;
+
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++)
+                buf.put((byte) ((msgSize >> (8 * (4 + cnt))) & 0xFF));
+
+            if (cnt < 0)
+                return false;
+        }
+
+        assert cnt >= 0;
+        assert msgSize > 0;
+
+        int remaining = buf.remaining();
+
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            if (missing > 0) {
+                int len = Math.min(missing, remaining);
+
+                buf.put(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            cnt = -4;
+
+            if (stream != null) {
+                U.closeQuiet(stream);
+
+                stream = null;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++)
+                msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
+
+            if (cnt < 0)
+                return false;
+
+            data = new byte[msgSize];
+        }
+
+        assert data != null;
+        assert cnt >= 0;
+        assert msgSize > 0;
+
+        int remaining = buf.remaining();
+
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            if (missing > 0) {
+                int len = Math.min(missing, remaining);
+
+                buf.get(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            cnt = -4;
+            msgSize = 0;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return Short.MIN_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op
+    }
+
+    /**
+     * @return Message payload.
+     */
+    public byte[] payload() {
+        if (stream != null) {
+            data = stream.arrayCopy();
+            U.closeQuiet(stream);
+            stream = null;
+        }
+
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        byte[] data = payload();
+        out.writeInt(data.length);
+        out.write(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        data = new byte[in.readInt()];
+        in.read(data, 0, data.length);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 0d9ea39..64cf609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -216,9 +216,7 @@
                     if (log.isDebugEnabled())
                         log.debug("Async response: [resp=" + resp.status() + ']');
 
-                    byte[] outMsg = parser.encode(resp);
-
-                    ses.send(outMsg);
+                    ses.send(parser.encode(resp));
                 }
             }
         };
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
index fdf7a12..255012b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
@@ -29,6 +29,7 @@
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 
 /**
  * JDBC message parser.
@@ -60,8 +61,8 @@
      * @param msg Message.
      * @return Reader.
      */
-    protected BinaryReaderExImpl createReader(byte[] msg) {
-        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+    protected BinaryReaderExImpl createReader(ClientMessage msg) {
+        BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
 
         return new BinaryReaderExImpl(binCtx, stream, ctx.config().getClassLoader(), true);
     }
@@ -76,7 +77,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
         BinaryReaderExImpl reader = createReader(msg);
@@ -85,7 +86,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse msg) {
+    @Override public ClientMessage encode(ClientListenerResponse msg) {
         assert msg != null;
 
         assert msg instanceof JdbcResponse;
@@ -96,20 +97,20 @@
 
         res.writeBinary(writer, protoCtx);
 
-        return writer.array();
+        return new ClientMessage(writer.array());
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        return JdbcRequest.readType(msg);
+        return JdbcRequest.readType(msg.payload());
     }
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         assert msg != null;
 
-        return JdbcRequest.readRequestId(msg);
+        return JdbcRequest.readRequestId(msg.payload());
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 0cc22d1..74def0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -157,9 +157,7 @@
                     if (log.isDebugEnabled())
                         log.debug("Async response: [resp=" + resp.status() + ']');
 
-                    byte[] outMsg = parser.encode(resp);
-
-                    ses.send(outMsg);
+                    ses.send(parser.encode(resp));
                 }
             }
         };
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index 0e66c93..c9b779f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -34,6 +34,7 @@
 import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
 import org.jetbrains.annotations.NotNull;
 
@@ -78,10 +79,10 @@
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+        BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
 
         BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
 
@@ -242,7 +243,7 @@
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse msg0) {
+    @Override public ClientMessage encode(ClientListenerResponse msg0) {
         assert msg0 != null;
 
         assert msg0 instanceof OdbcResponse;
@@ -263,13 +264,13 @@
         if (msg.status() != ClientListenerResponse.STATUS_SUCCESS) {
             writer.writeString(msg.error());
 
-            return writer.array();
+            return new ClientMessage(writer.array());
         }
 
         Object res0 = msg.response();
 
         if (res0 == null)
-            return writer.array();
+            return new ClientMessage(writer.array());
         else if (res0 instanceof OdbcQueryExecuteResult) {
             OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
 
@@ -406,19 +407,19 @@
         else
             assert false : "Should not reach here.";
 
-        return writer.array();
+        return new ClientMessage(writer.array());
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        return msg[0];
+        return msg.payload()[0];
     }
 
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         return 0;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index d3b10c5..66b9e89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -23,10 +23,12 @@
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest;
 import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest;
 import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNamePutRequest;
@@ -284,10 +286,10 @@
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+        BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
 
         // skipHdrCheck must be true (we have 103 op code).
         BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), inStream,
@@ -473,10 +475,10 @@
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse resp) {
+    @Override public ClientMessage encode(ClientListenerResponse resp) {
         assert resp != null;
 
-        BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32);
+        BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32, BinaryMemoryAllocator.POOLED.chunk());
 
         BinaryRawWriterEx writer = marsh.writer(outStream);
 
@@ -484,20 +486,20 @@
 
         ((ClientOutgoingMessage)resp).encode(ctx, writer);
 
-        return outStream.arrayCopy();
+        return new ClientMessage(outStream);
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+        BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
 
         return inStream.readShort();
     }
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         return 0;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
index a914078..b3c8cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
@@ -28,7 +28,7 @@
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
index 93f1a95..a165a07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
@@ -26,7 +26,7 @@
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9c0c6b1..41574ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1401,16 +1401,7 @@
 
             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
-            if (writer == null) {
-                try {
-                    ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to create message writer.", e);
-                }
-            }
+            MessageWriter writer = messageWriter(ses);
 
             boolean handshakeFinished = sslFilter.lock(ses);
 
@@ -1659,16 +1650,7 @@
             ByteBuffer buf = ses.writeBuffer();
             SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
-            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
-            if (writer == null) {
-                try {
-                    ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to create message writer.", e);
-                }
-            }
+            MessageWriter writer = messageWriter(ses);
 
             if (req == null) {
                 req = systemMessage(ses);
@@ -1739,6 +1721,25 @@
                 buf.clear();
         }
 
+        /** */
+        @Nullable private MessageWriter messageWriter(GridSelectorNioSessionImpl ses) throws IOException {
+            if (writerFactory == null)
+                return null;
+
+            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+
+            if (writer == null) {
+                try {
+                    ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to create message writer.", e);
+                }
+            }
+
+            return writer;
+        }
+
         /**
          * @param writer Customizer of writing.
          * @param buf Buffer to write.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 8e6c966..1c92e8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -107,7 +107,7 @@
 import org.junit.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.THREAD_LOCAL;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -2834,28 +2834,28 @@
     @Test
     public void testThreadLocalArrayReleased() throws Exception {
         // Checking the writer directly.
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         BinaryMarshaller marsh = binaryMarshaller();
 
         try (BinaryWriterExImpl writer = new BinaryWriterExImpl(binaryContext(marsh))) {
-            assertEquals(true, INSTANCE.isAcquired());
+            assertEquals(true, THREAD_LOCAL.isAcquired());
 
             writer.writeString("Thread local test");
 
             writer.array();
 
-            assertEquals(true, INSTANCE.isAcquired());
+            assertEquals(true, THREAD_LOCAL.isAcquired());
         }
 
         // Checking the binary marshaller.
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         marsh = binaryMarshaller();
 
         marsh.marshal(new SimpleObject());
 
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         marsh = binaryMarshaller();
 
@@ -2867,7 +2867,7 @@
 
         BinaryObject binaryObj = builder.build();
 
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
     }
 
     /**