IGNITE-13515 Performance drop when there are many thin clients per server
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 10197d0..7aa506b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -53,7 +53,8 @@
import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DFLT_DISCOVERY_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.affinity.AffinityAssignment.DFLT_AFFINITY_BACKUPS_THRESHOLD;
import static org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache.DFLT_AFFINITY_HISTORY_SIZE;
@@ -529,6 +530,13 @@
public static final String IGNITE_MARSHAL_BUFFERS_RECHECK = "IGNITE_MARSHAL_BUFFERS_RECHECK";
/**
+ * System property to specify per thread binary allocator chunk pool size. Default value is {@code 32}.
+ */
+ @SystemProperty(value = "Per thread binary allocator chunk pool size.",
+ type = Integer.class, defaults = "" + DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE)
+ public static final String IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = "IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE";
+
+ /**
* System property to disable {@link HostnameVerifier} for SSL connections.
* Can be used for development with self-signed certificates. Default value is {@code false}.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
index a6cfcbd..b1f0c68 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
@@ -45,6 +45,9 @@
/** Default size of thread pool. */
public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
+ /** Default selector count. */
+ public static final int DFLT_SELECTOR_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
+
/** Default handshake timeout. */
public static final int DFLT_HANDSHAKE_TIMEOUT = 10_000;
@@ -78,6 +81,9 @@
/** Thread pool size. */
private int threadPoolSize = DFLT_THREAD_POOL_SIZE;
+ /** Selector count. */
+ private int selectorCnt = DFLT_SELECTOR_CNT;
+
/** Idle timeout. */
private long idleTimeout = DFLT_IDLE_TIMEOUT;
@@ -297,21 +303,21 @@
}
/**
- * Size of thread pool that is in charge of processing SQL requests.
+ * Size of thread pool that is in charge of processing client requests.
* <p>
* Defaults {@link #DFLT_THREAD_POOL_SIZE}.
*
- * @return Thread pool that is in charge of processing SQL requests.
+ * @return Thread pool that is in charge of processing client requests.
*/
public int getThreadPoolSize() {
return threadPoolSize;
}
/**
- * Sets thread pool that is in charge of processing SQL requests. See {@link #getThreadPoolSize()} for more
+ * Sets thread pool that is in charge of processing client requests. See {@link #getThreadPoolSize()} for more
* information.
*
- * @param threadPoolSize Thread pool that is in charge of processing SQL requests.
+ * @param threadPoolSize Thread pool that is in charge of processing client requests.
* @return This instance for chaining.
*/
public ClientConnectorConfiguration setThreadPoolSize(int threadPoolSize) {
@@ -321,6 +327,30 @@
}
/**
+ * Get count of selectors to use in TCP server.
+ * <p>
+ * Defaults {@link #DFLT_SELECTOR_CNT}.
+ *
+ * @return Count of selectors to use in TCP server.
+ */
+ public int getSelectorCount() {
+ return selectorCnt;
+ }
+
+ /**
+ * Sets count of selectors to use in TCP server. See {@link #getSelectorCount()} for more
+ * information.
+ *
+ * @param selectorCnt Count of selectors to use in TCP server.
+ * @return This instance for chaining.
+ */
+ public ClientConnectorConfiguration setSelectorCount(int selectorCnt) {
+ this.selectorCnt = selectorCnt;
+
+ return this;
+ }
+
+ /**
* Gets idle timeout for client connections.
* If no packets come within idle timeout, the connection is closed.
* Zero or negative means no timeout.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
index f8da90b..1b68207 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
@@ -32,7 +32,7 @@
};
/** Memory chunk. */
- private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.INSTANCE.chunk();
+ private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.THREAD_LOCAL.chunk();
/** Schema holder. */
private final BinaryWriterSchemaHolder schema = new BinaryWriterSchemaHolder();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
index 17bcdf6..0f411b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -37,7 +37,7 @@
* @param cap Initial capacity.
*/
public BinaryHeapOutputStream(int cap) {
- this(cap, BinaryMemoryAllocator.INSTANCE.chunk());
+ this(cap, BinaryMemoryAllocator.THREAD_LOCAL.chunk());
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
index 5471bc5..8993fb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
@@ -17,41 +17,265 @@
package org.apache.ignite.internal.binary.streams;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
/**
- * Thread-local memory allocator.
+ * On-heap memory allocator.
*/
-public final class BinaryMemoryAllocator {
- /** Memory allocator instance. */
- public static final BinaryMemoryAllocator INSTANCE = new BinaryMemoryAllocator();
+public abstract class BinaryMemoryAllocator {
+ /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
+ public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
- /** Holders. */
- private static final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>();
+ /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE */
+ public static final int DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = 32;
- /**
- * Ensures singleton.
- */
- private BinaryMemoryAllocator() {
- // No-op.
+ /** Buffer size re-check frequency. */
+ private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
+
+ /** */
+ private static final int POOL_SIZE = Integer.getInteger(IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE, DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE);
+
+ /** Thread local allocator instance. */
+ public static final BinaryMemoryAllocator THREAD_LOCAL = new ThreadLocalAllocator();
+
+ /** Pooled allocator instance. */
+ public static final BinaryMemoryAllocator POOLED = new PooledAllocator();
+
+ /** */
+ public abstract BinaryMemoryAllocatorChunk chunk();
+
+ /** */
+ public abstract boolean isAcquired();
+
+ /** */
+ private static class ThreadLocalAllocator extends BinaryMemoryAllocator {
+ /** Holders. */
+ private final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override public BinaryMemoryAllocatorChunk chunk() {
+ BinaryMemoryAllocatorChunk holder = holders.get();
+
+ if (holder == null)
+ holders.set(holder = new Chunk());
+
+ return holder;
+ }
+
+ /**
+ * Checks whether a thread-local array is acquired or not.
+ * The function is used by Unit tests.
+ *
+ * @return {@code true} if acquired {@code false} otherwise.
+ */
+ @Override public boolean isAcquired() {
+ BinaryMemoryAllocatorChunk holder = holders.get();
+
+ return holder != null && holder.isAcquired();
+ }
+
+ /**
+ * Memory allocator chunk.
+ */
+ private static class Chunk implements BinaryMemoryAllocatorChunk {
+ /** Data array */
+ private byte[] data;
+
+ /** Max message size detected between checks. */
+ private int maxMsgSize;
+
+ /** Last time array size is checked. */
+ private long lastCheckNanos = System.nanoTime();
+
+ /** Whether the holder is acquired or not. */
+ private boolean acquired;
+
+ /** {@inheritDoc} */
+ @Override public byte[] allocate(int size) {
+ if (acquired)
+ return new byte[size];
+
+ acquired = true;
+
+ if (data == null || size > data.length)
+ data = new byte[size];
+
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ if (this.data == data)
+ this.data = newData;
+
+ System.arraycopy(data, 0, newData, 0, data.length);
+
+ return newData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release(byte[] data, int maxMsgSize) {
+ if (this.data != data)
+ return;
+
+ if (maxMsgSize > this.maxMsgSize)
+ this.maxMsgSize = maxMsgSize;
+
+ acquired = false;
+
+ long nowNanos = System.nanoTime();
+
+ if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
+ int halfSize = data.length >> 1;
+
+ if (this.maxMsgSize < halfSize)
+ this.data = new byte[halfSize];
+
+ lastCheckNanos = nowNanos;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAcquired() {
+ return acquired;
+ }
+ }
}
- public BinaryMemoryAllocatorChunk chunk() {
- BinaryMemoryAllocatorChunk holder = holders.get();
+ /** */
+ private static class PooledAllocator extends BinaryMemoryAllocator {
+ /** */
+ private final ThreadLocal<DataHoldersPool> holders = ThreadLocal.withInitial(DataHoldersPool::new);
- if (holder == null)
- holders.set(holder = new BinaryMemoryAllocatorChunk());
+ /** {@inheritDoc} */
+ @Override public BinaryMemoryAllocatorChunk chunk() {
+ return new Chunk(holders.get());
+ }
- return holder;
- }
+ /** {@inheritDoc} */
+ @Override public boolean isAcquired() {
+ return false;
+ }
- /**
- * Checks whether a thread-local array is acquired or not.
- * The function is used by Unit tests.
- *
- * @return {@code true} if acquired {@code false} otherwise.
- */
- public boolean isAcquired() {
- BinaryMemoryAllocatorChunk holder = holders.get();
+ /** */
+ private static class DataHoldersPool {
+ /** */
+ private final ArrayDeque<DataHolder> pool = new ArrayDeque<>(POOL_SIZE);
- return holder != null && holder.isAcquired();
+ /** */
+ public synchronized DataHolder acquire() {
+ return pool.isEmpty() ? new DataHolder() : pool.pop();
+ }
+
+ /** */
+ public synchronized void release(DataHolder holder) {
+ if (pool.size() < POOL_SIZE) pool.push(holder);
+ }
+ }
+
+ /** */
+ private static class DataHolder {
+ /** Size history. */
+ private final int[] history = new int[128];
+
+ /** Size history cntr. */
+ private int cntr;
+
+ /** Last time array size is checked. */
+ private long lastCheckNanos = System.nanoTime();
+
+ /** Data array */
+ private byte[] data;
+
+ /** */
+ public byte[] ensureCapacity(int size, boolean copy) {
+ if (data == null)
+ data = new byte[size];
+ else if (data.length < size)
+ data = copy ? Arrays.copyOf(data, size) : new byte[size];
+
+ return data;
+ }
+
+ /** */
+ public boolean corresponds(byte[] data) {
+ return data != null && this.data == data;
+ }
+
+ /** */
+ public void adjustSize(int msgSize) {
+ history[cntr % history.length] = msgSize;
+ cntr = cntr == Integer.MAX_VALUE ? 0 : cntr + 1;
+
+ long now = System.nanoTime();
+ if (U.nanosToMillis(now - lastCheckNanos) >= CHECK_FREQ && cntr > history.length) {
+ lastCheckNanos = now;
+
+ int[] tmp = Arrays.copyOf(history, history.length);
+ Arrays.sort(tmp);
+ int adjusted = U.nextPowerOf2(tmp[tmp.length / 2]);
+
+ if (adjusted < data.length)
+ data = new byte[adjusted];
+ }
+ }
+ }
+
+ /** */
+ private static class Chunk implements BinaryMemoryAllocatorChunk {
+ /** */
+ private volatile DataHolder holder;
+
+ /** */
+ private final DataHoldersPool pool;
+
+ /** */
+ private Chunk(DataHoldersPool pool) {
+ this.pool = pool;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] allocate(int size) {
+ if (holder != null)
+ return new byte[size];
+
+ holder = pool.acquire();
+ return holder.ensureCapacity(size, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] reallocate(byte[] data, int size) {
+ DataHolder holder0 = holder;
+ if (holder0 != null && holder0.corresponds(data))
+ return holder0.ensureCapacity(size, true);
+ else
+ return Arrays.copyOf(data, size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release(byte[] data, int msgSize) {
+ DataHolder holder0 = holder;
+ if (holder0 == null || !holder0.corresponds(data))
+ return;
+
+ holder.adjustSize(msgSize);
+
+ pool.release(holder);
+ holder = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAcquired() {
+ return holder != null;
+ }
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
index 27570cf..5f2dfee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
@@ -17,50 +17,17 @@
package org.apache.ignite.internal.binary.streams;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-
/**
* Memory allocator chunk.
*/
-public class BinaryMemoryAllocatorChunk {
- /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
- public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
-
- /** Buffer size re-check frequency. */
- private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
-
- /** Data array */
- private byte[] data;
-
- /** Max message size detected between checks. */
- private int maxMsgSize;
-
- /** Last time array size is checked. */
- private long lastCheckNanos = System.nanoTime();
-
- /** Whether the holder is acquired or not. */
- private boolean acquired;
-
+public interface BinaryMemoryAllocatorChunk {
/**
* Allocate.
*
* @param size Desired size.
* @return Data.
*/
- public byte[] allocate(int size) {
- if (acquired)
- return new byte[size];
-
- acquired = true;
-
- if (data == null || size > data.length)
- data = new byte[size];
-
- return data;
- }
+ public byte[] allocate(int size);
/**
* Reallocate.
@@ -69,45 +36,15 @@
* @param size Size.
* @return New data.
*/
- public byte[] reallocate(byte[] data, int size) {
- byte[] newData = new byte[size];
-
- if (this.data == data)
- this.data = newData;
-
- System.arraycopy(data, 0, newData, 0, data.length);
-
- return newData;
- }
+ public byte[] reallocate(byte[] data, int size);
/**
* Shrinks array size if needed.
*/
- public void release(byte[] data, int maxMsgSize) {
- if (this.data != data)
- return;
-
- if (maxMsgSize > this.maxMsgSize)
- this.maxMsgSize = maxMsgSize;
-
- this.acquired = false;
-
- long nowNanos = System.nanoTime();
-
- if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
- int halfSize = data.length >> 1;
-
- if (this.maxMsgSize < halfSize)
- this.data = new byte[halfSize];
-
- lastCheckNanos = nowNanos;
- }
- }
+ public void release(byte[] data, int maxMsgSize);
/**
* @return {@code True} if acquired.
*/
- public boolean isAcquired() {
- return acquired;
- }
+ public boolean isAcquired();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index acc0ffe..28330e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -410,14 +410,15 @@
* Process next message from the input stream and complete corresponding future.
*/
private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
- int msgSize = dataInput.readInt();
+ // blocking read a message header not to fall into a busy loop
+ int msgSize = dataInput.readInt(2048);
if (msgSize <= 0)
throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
long bytesReadOnStartMsg = dataInput.totalBytesRead();
- long resId = dataInput.readLong();
+ long resId = dataInput.spinReadLong();
int status = 0;
@@ -426,11 +427,11 @@
BinaryInputStream resIn;
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
- short flags = dataInput.readShort();
+ short flags = dataInput.spinReadShort();
if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
- long topVer = dataInput.readLong();
- int minorTopVer = dataInput.readInt();
+ long topVer = dataInput.spinReadLong();
+ int minorTopVer = dataInput.spinReadInt();
srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
@@ -439,7 +440,7 @@
}
if ((flags & ClientFlag.NOTIFICATION) != 0) {
- short notificationCode = dataInput.readShort();
+ short notificationCode = dataInput.spinReadShort();
notificationOp = ClientOperation.fromCode(notificationCode);
@@ -448,10 +449,10 @@
}
if ((flags & ClientFlag.ERROR) != 0)
- status = dataInput.readInt();
+ status = dataInput.spinReadInt();
}
else
- status = dataInput.readInt();
+ status = dataInput.spinReadInt();
int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
@@ -460,12 +461,12 @@
if (status == 0) {
if (msgSize > hdrSize)
- res = dataInput.read(msgSize - hdrSize);
+ res = dataInput.spinRead(msgSize - hdrSize);
}
else if (status == ClientStatus.SECURITY_VIOLATION)
err = new ClientAuthorizationException();
else {
- resIn = new BinaryHeapInputStream(dataInput.read(msgSize - hdrSize));
+ resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize));
String errMsg = ClientUtils.createBinaryReader(null, resIn).readString();
@@ -713,47 +714,81 @@
this.in = in;
}
+ /** Read bytes from the input stream. */
+ public byte[] read(int len) throws ClientConnectionException {
+ byte[] bytes = new byte[len];
+
+ read(bytes, len, 0);
+
+ return bytes;
+ }
+
+ /** Read bytes from the input stream. */
+ public byte[] spinRead(int len) {
+ byte[] bytes = new byte[len];
+
+ read(bytes, len, Integer.MAX_VALUE);
+
+ return bytes;
+ }
+
/**
* Read bytes from the input stream to the buffer.
*
* @param bytes Bytes buffer.
* @param len Length.
+ * @param tryReadCnt Number of reads before falling into blocking read.
*/
- private void read(byte[] bytes, int len) throws ClientConnectionException {
- int bytesNum;
- int readBytesNum = 0;
+ public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException {
+ int offset = 0;
- while (readBytesNum < len) {
- try {
- bytesNum = in.read(bytes, readBytesNum, len - readBytesNum);
+ try {
+ while (offset < len) {
+ int toRead;
+
+ if (tryReadCnt == 0)
+ toRead = len - offset;
+ else if ((toRead = Math.min(in.available(), len - offset)) == 0) {
+ tryReadCnt--;
+
+ continue;
+ }
+
+ int read = in.read(bytes, offset, toRead);
+
+ if (read < 0)
+ throw handleIOError(null);
+
+ offset += read;
+ totalBytesRead += read;
}
- catch (IOException e) {
- throw handleIOError(e);
- }
-
- if (bytesNum < 0)
- throw handleIOError(null);
-
- readBytesNum += bytesNum;
}
-
- totalBytesRead += readBytesNum;
- }
-
- /** Read bytes from the input stream. */
- public byte[] read(int len) throws ClientConnectionException {
- byte[] bytes = new byte[len];
-
- read(bytes, len);
-
- return bytes;
+ catch (IOException e) {
+ throw handleIOError(e);
+ }
}
/**
* Read long value from the input stream.
*/
public long readLong() throws ClientConnectionException {
- read(tmpBuf, Long.BYTES);
+ return readLong(0);
+ }
+
+ /**
+ * Read long value from the input stream.
+ */
+ public long spinReadLong() throws ClientConnectionException {
+ return readLong(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read long value from the input stream.
+ *
+ * @param tryReadCnt Number of reads before falling into blocking read.
+ */
+ private long readLong(int tryReadCnt) throws ClientConnectionException {
+ read(tmpBuf, Long.BYTES, tryReadCnt);
return BinaryPrimitives.readLong(tmpBuf, 0);
}
@@ -762,7 +797,23 @@
* Read int value from the input stream.
*/
public int readInt() throws ClientConnectionException {
- read(tmpBuf, Integer.BYTES);
+ return readInt(0);
+ }
+
+ /**
+ * Read int value from the input stream.
+ */
+ public int spinReadInt() throws ClientConnectionException {
+ return readInt(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read int value from the input stream.
+ *
+ * @param tryReadCnt Number of reads before falling into blocking read.
+ */
+ private int readInt(int tryReadCnt) throws ClientConnectionException {
+ read(tmpBuf, Integer.BYTES, tryReadCnt);
return BinaryPrimitives.readInt(tmpBuf, 0);
}
@@ -771,7 +822,23 @@
* Read short value from the input stream.
*/
public short readShort() throws ClientConnectionException {
- read(tmpBuf, Short.BYTES);
+ return readShort(0);
+ }
+
+ /**
+ * Read short value from the input stream.
+ */
+ public short spinReadShort() throws ClientConnectionException {
+ return readShort(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read short value from the input stream.
+ *
+ * @param tryReadCnt Number of reads before falling into blocking read.
+ */
+ public short readShort(int tryReadCnt) throws ClientConnectionException {
+ read(tmpBuf, Short.BYTES, tryReadCnt);
return BinaryPrimitives.readShort(tmpBuf, 0);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
deleted file mode 100644
index 48c7367..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.nio.GridNioParser;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-
-/**
- * This class implements stream parser based on {@link ClientListenerNioServerBuffer}.
- * <p>
- * The rule for this parser is that every message sent over the stream is prepended with
- * 4-byte integer header containing message size. So, the stream structure is as follows:
- * <pre>
- * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
- * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
- * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
- * </pre>
- */
-public class ClientListenerBufferedParser implements GridNioParser {
- /** Buffer metadata key. */
- private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
- /** {@inheritDoc} */
- @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
- ClientListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY);
-
- // Decode for a given session is called per one thread, so there should not be any concurrency issues.
- // However, we make some additional checks.
- if (nioBuf == null) {
- nioBuf = new ClientListenerNioServerBuffer();
-
- ClientListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
-
- assert old == null;
- }
-
- return nioBuf.read(buf);
- }
-
- /** {@inheritDoc} */
- @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
- byte[] msg0 = (byte[])msg;
-
- ByteBuffer res = ByteBuffer.allocate(msg0.length + 4);
-
- res.order(ByteOrder.LITTLE_ENDIAN);
-
- res.putInt(msg0.length);
- res.put(msg0);
-
- res.flip();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return ClientListenerBufferedParser.class.getSimpleName();
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
index c3782ef..4dfc117 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
@@ -27,7 +27,7 @@
* @param msg Message.
* @return Request.
*/
- ClientListenerRequest decode(byte[] msg);
+ ClientListenerRequest decode(ClientMessage msg);
/**
* Encode response to byte array.
@@ -35,7 +35,7 @@
* @param resp Response.
* @return Message.
*/
- byte[] encode(ClientListenerResponse resp);
+ ClientMessage encode(ClientListenerResponse resp);
/**
* Decode command type. Allows to recognize the command (message type) without decoding the entire message.
@@ -43,7 +43,7 @@
* @param msg Message.
* @return Command type.
*/
- int decodeCommandType(byte[] msg);
+ int decodeCommandType(ClientMessage msg);
/**
* Decode request Id. Allows to recognize the request Id, if any, without decoding the entire message.
@@ -51,5 +51,5 @@
* @param msg Message.
* @return Request Id.
*/
- long decodeRequestId(byte[] msg);
+ long decodeRequestId(ClientMessage msg);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 369eb2d..393383d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -53,7 +53,7 @@
/**
* Client message listener.
*/
-public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte[]> {
+public class ClientListenerNioListener extends GridNioServerListenerAdapter<ClientMessage> {
/** ODBC driver handshake code. */
public static final byte ODBC_CLIENT = 0;
@@ -139,7 +139,7 @@
}
/** {@inheritDoc} */
- @Override public void onMessage(GridNioSession ses, byte[] msg) {
+ @Override public void onMessage(GridNioSession ses, ClientMessage msg) {
assert msg != null;
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
@@ -214,9 +214,7 @@
", resp=" + resp.status() + ']');
}
- byte[] outMsg = parser.encode(resp);
-
- GridNioFuture<?> fut = ses.send(outMsg);
+ GridNioFuture<?> fut = ses.send(parser.encode(resp));
fut.listen(f -> {
if (f.error() == null)
@@ -289,7 +287,7 @@
* @param ses Session.
* @param msg Message bytes.
*/
- private void onHandshake(GridNioSession ses, byte[] msg) {
+ private void onHandshake(GridNioSession ses, ClientMessage msg) {
BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);
BinaryMarshaller marsh = new BinaryMarshaller();
@@ -298,7 +296,7 @@
ctx.configure(marsh);
- BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg), null, true);
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg.payload()), null, true);
byte cmd = reader.readByte();
@@ -373,7 +371,7 @@
writer.writeInt(ClientStatus.FAILED);
}
- ses.send(writer.array());
+ ses.send(new ClientMessage(writer.array()));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
new file mode 100644
index 0000000..5ba8afc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * This class implements stream parser based on {@link ClientListenerNioServerBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is prepended with
+ * 4-byte integer header containing message size. So, the stream structure is as follows:
+ * <pre>
+ * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
+ * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * </pre>
+ */
+public class ClientListenerNioMessageParser implements GridNioParser {
+ /** Message metadata key. */
+ static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Reader metadata key. */
+ static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ public ClientListenerNioMessageParser(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+ Message msg = ses.removeMeta(MSG_META_KEY);
+
+ try {
+ if (msg == null)
+ msg = new ClientMessage();
+
+ boolean finished = false;
+
+ if (buf.hasRemaining())
+ finished = msg.readFrom(buf, null);
+
+ if (finished)
+ return msg;
+ else {
+ ses.addMeta(MSG_META_KEY, msg);
+
+ return null;
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to read message [msg=" + msg +
+ ", buf=" + buf + ", ses=" + ses + "]", e);
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return ClientListenerNioMessageParser.class.getSimpleName();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index ecebc4a..517d213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -86,13 +86,13 @@
private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
/** Default TCP direct buffer flag. */
- private static final boolean DFLT_TCP_DIRECT_BUF = false;
+ private static final boolean DFLT_TCP_DIRECT_BUF = true;
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** TCP Server. */
- private GridNioServer<byte[]> srv;
+ private GridNioServer<ClientMessage> srv;
/** Executor service. */
private ExecutorService execSvc;
@@ -158,14 +158,16 @@
long idleTimeout = cliConnCfg.getIdleTimeout();
+ int selectorCnt = cliConnCfg.getSelectorCount();
+
for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) {
try {
- GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
+ srv = GridNioServer.<ClientMessage>builder()
.address(hostAddr)
.port(port)
.listener(new ClientListenerNioListener(ctx, busyLock, cliConnCfg))
.logger(log)
- .selectorCount(DFLT_SELECTOR_CNT)
+ .selectorCount(selectorCnt)
.igniteInstanceName(ctx.igniteInstanceName())
.serverName("client-listener")
.tcpNoDelay(cliConnCfg.isTcpNoDelay())
@@ -174,12 +176,10 @@
.socketSendBufferSize(cliConnCfg.getSocketSendBufferSize())
.socketReceiveBufferSize(cliConnCfg.getSocketReceiveBufferSize())
.filters(filters)
- .directMode(false)
+ .directMode(true)
.idleTimeout(idleTimeout > 0 ? idleTimeout : Long.MAX_VALUE)
.build();
- srv = srv0;
-
ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
if (log.isInfoEnabled())
@@ -287,14 +287,14 @@
ClientListenerConnectionContext connCtx = ses.meta(ClientListenerNioListener.CONN_CTX_META_KEY);
if (connCtx != null && connCtx.parser() != null && connCtx.handler().isCancellationSupported()) {
- byte[] inMsg;
+ ClientMessage inMsg;
int cmdType;
long reqId;
try {
- inMsg = (byte[])msg;
+ inMsg = (ClientMessage)msg;
cmdType = connCtx.parser().decodeCommandType(inMsg);
@@ -324,7 +324,7 @@
}
};
- GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerBufferedParser(), log, false);
+ GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerNioMessageParser(log), log, true);
if (cliConnCfg.isSslEnabled()) {
Factory<SSLContext> sslCtxFactory = cliConnCfg.isUseIgniteSslContextFactory() ?
@@ -337,7 +337,7 @@
GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtxFactory.create(),
true, ByteOrder.nativeOrder(), log);
- sslFilter.directMode(false);
+ sslFilter.directMode(true);
boolean auth = cliConnCfg.isSslClientAuth();
@@ -411,8 +411,7 @@
private static String clientConnectionDescription(
GridNioSession ses,
ClientListenerConnectionContext ctx
- )
- {
+ ) {
AuthorizationContext authCtx = ctx.authorizationContext();
StringBuilder sb = new StringBuilder();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
new file mode 100644
index 0000000..f1176d6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+@IgniteCodeGeneratingFail
+public class ClientMessage implements Message, Externalizable {
+ /** */
+ private static final long serialVersionUID = -4609408156037304495L;
+
+ /** */
+ private byte[] data;
+
+ /** */
+ private BinaryHeapOutputStream stream;
+
+ /** */
+ private int cnt = -4;
+
+ /** */
+ private int msgSize;
+
+ /** */
+ public ClientMessage() {}
+
+ /** */
+ public ClientMessage(byte[] data) {
+ this.data = data;
+ }
+
+ /** */
+ public ClientMessage(BinaryHeapOutputStream stream) {
+ this.stream = stream;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter ignored) {
+ assert stream != null || data != null;
+
+ byte[] data = stream != null ? stream.array() : this.data;
+ int msgSize = stream != null ? stream.position() : data.length;
+
+ if (cnt < 0) {
+ for (; cnt < 0 && buf.hasRemaining(); cnt++)
+ buf.put((byte) ((msgSize >> (8 * (4 + cnt))) & 0xFF));
+
+ if (cnt < 0)
+ return false;
+ }
+
+ assert cnt >= 0;
+ assert msgSize > 0;
+
+ int remaining = buf.remaining();
+
+ if (remaining > 0) {
+ int missing = msgSize - cnt;
+
+ if (missing > 0) {
+ int len = Math.min(missing, remaining);
+
+ buf.put(data, cnt, len);
+
+ cnt += len;
+ }
+ }
+
+ if (cnt == msgSize) {
+ cnt = -4;
+
+ if (stream != null) {
+ U.closeQuiet(stream);
+
+ stream = null;
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (cnt < 0) {
+ for (; cnt < 0 && buf.hasRemaining(); cnt++)
+ msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
+
+ if (cnt < 0)
+ return false;
+
+ data = new byte[msgSize];
+ }
+
+ assert data != null;
+ assert cnt >= 0;
+ assert msgSize > 0;
+
+ int remaining = buf.remaining();
+
+ if (remaining > 0) {
+ int missing = msgSize - cnt;
+
+ if (missing > 0) {
+ int len = Math.min(missing, remaining);
+
+ buf.get(data, cnt, len);
+
+ cnt += len;
+ }
+ }
+
+ if (cnt == msgSize) {
+ cnt = -4;
+ msgSize = 0;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return Short.MIN_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op
+ }
+
+ /**
+ * @return Message payload.
+ */
+ public byte[] payload() {
+ if (stream != null) {
+ data = stream.arrayCopy();
+ U.closeQuiet(stream);
+ stream = null;
+ }
+
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ byte[] data = payload();
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ data = new byte[in.readInt()];
+ in.read(data, 0, data.length);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 0d9ea39..64cf609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -216,9 +216,7 @@
if (log.isDebugEnabled())
log.debug("Async response: [resp=" + resp.status() + ']');
- byte[] outMsg = parser.encode(resp);
-
- ses.send(outMsg);
+ ses.send(parser.encode(resp));
}
}
};
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
index fdf7a12..255012b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
@@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
/**
* JDBC message parser.
@@ -60,8 +61,8 @@
* @param msg Message.
* @return Reader.
*/
- protected BinaryReaderExImpl createReader(byte[] msg) {
- BinaryInputStream stream = new BinaryHeapInputStream(msg);
+ protected BinaryReaderExImpl createReader(ClientMessage msg) {
+ BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
return new BinaryReaderExImpl(binCtx, stream, ctx.config().getClassLoader(), true);
}
@@ -76,7 +77,7 @@
}
/** {@inheritDoc} */
- @Override public ClientListenerRequest decode(byte[] msg) {
+ @Override public ClientListenerRequest decode(ClientMessage msg) {
assert msg != null;
BinaryReaderExImpl reader = createReader(msg);
@@ -85,7 +86,7 @@
}
/** {@inheritDoc} */
- @Override public byte[] encode(ClientListenerResponse msg) {
+ @Override public ClientMessage encode(ClientListenerResponse msg) {
assert msg != null;
assert msg instanceof JdbcResponse;
@@ -96,20 +97,20 @@
res.writeBinary(writer, protoCtx);
- return writer.array();
+ return new ClientMessage(writer.array());
}
/** {@inheritDoc} */
- @Override public int decodeCommandType(byte[] msg) {
+ @Override public int decodeCommandType(ClientMessage msg) {
assert msg != null;
- return JdbcRequest.readType(msg);
+ return JdbcRequest.readType(msg.payload());
}
/** {@inheritDoc} */
- @Override public long decodeRequestId(byte[] msg) {
+ @Override public long decodeRequestId(ClientMessage msg) {
assert msg != null;
- return JdbcRequest.readRequestId(msg);
+ return JdbcRequest.readRequestId(msg.payload());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 0cc22d1..74def0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -157,9 +157,7 @@
if (log.isDebugEnabled())
log.debug("Async response: [resp=" + resp.status() + ']');
- byte[] outMsg = parser.encode(resp);
-
- ses.send(outMsg);
+ ses.send(parser.encode(resp));
}
}
};
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index 0e66c93..c9b779f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -34,6 +34,7 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
import org.jetbrains.annotations.NotNull;
@@ -78,10 +79,10 @@
}
/** {@inheritDoc} */
- @Override public ClientListenerRequest decode(byte[] msg) {
+ @Override public ClientListenerRequest decode(ClientMessage msg) {
assert msg != null;
- BinaryInputStream stream = new BinaryHeapInputStream(msg);
+ BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
@@ -242,7 +243,7 @@
}
/** {@inheritDoc} */
- @Override public byte[] encode(ClientListenerResponse msg0) {
+ @Override public ClientMessage encode(ClientListenerResponse msg0) {
assert msg0 != null;
assert msg0 instanceof OdbcResponse;
@@ -263,13 +264,13 @@
if (msg.status() != ClientListenerResponse.STATUS_SUCCESS) {
writer.writeString(msg.error());
- return writer.array();
+ return new ClientMessage(writer.array());
}
Object res0 = msg.response();
if (res0 == null)
- return writer.array();
+ return new ClientMessage(writer.array());
else if (res0 instanceof OdbcQueryExecuteResult) {
OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
@@ -406,19 +407,19 @@
else
assert false : "Should not reach here.";
- return writer.array();
+ return new ClientMessage(writer.array());
}
/** {@inheritDoc} */
- @Override public int decodeCommandType(byte[] msg) {
+ @Override public int decodeCommandType(ClientMessage msg) {
assert msg != null;
- return msg[0];
+ return msg.payload()[0];
}
/** {@inheritDoc} */
- @Override public long decodeRequestId(byte[] msg) {
+ @Override public long decodeRequestId(ClientMessage msg) {
return 0;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index d3b10c5..66b9e89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -23,10 +23,12 @@
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNamePutRequest;
@@ -284,10 +286,10 @@
}
/** {@inheritDoc} */
- @Override public ClientListenerRequest decode(byte[] msg) {
+ @Override public ClientListenerRequest decode(ClientMessage msg) {
assert msg != null;
- BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+ BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
// skipHdrCheck must be true (we have 103 op code).
BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), inStream,
@@ -473,10 +475,10 @@
}
/** {@inheritDoc} */
- @Override public byte[] encode(ClientListenerResponse resp) {
+ @Override public ClientMessage encode(ClientListenerResponse resp) {
assert resp != null;
- BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32);
+ BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32, BinaryMemoryAllocator.POOLED.chunk());
BinaryRawWriterEx writer = marsh.writer(outStream);
@@ -484,20 +486,20 @@
((ClientOutgoingMessage)resp).encode(ctx, writer);
- return outStream.arrayCopy();
+ return new ClientMessage(outStream);
}
/** {@inheritDoc} */
- @Override public int decodeCommandType(byte[] msg) {
+ @Override public int decodeCommandType(ClientMessage msg) {
assert msg != null;
- BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+ BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
return inStream.readShort();
}
/** {@inheritDoc} */
- @Override public long decodeRequestId(byte[] msg) {
+ @Override public long decodeRequestId(ClientMessage msg) {
return 0;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
index a914078..b3c8cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
@@ -28,7 +28,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
index 93f1a95..a165a07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
@@ -26,7 +26,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9c0c6b1..41574ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1401,16 +1401,7 @@
GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
- MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
- if (writer == null) {
- try {
- ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to create message writer.", e);
- }
- }
+ MessageWriter writer = messageWriter(ses);
boolean handshakeFinished = sslFilter.lock(ses);
@@ -1659,16 +1650,7 @@
ByteBuffer buf = ses.writeBuffer();
SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
- MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
- if (writer == null) {
- try {
- ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to create message writer.", e);
- }
- }
+ MessageWriter writer = messageWriter(ses);
if (req == null) {
req = systemMessage(ses);
@@ -1739,6 +1721,25 @@
buf.clear();
}
+ /** */
+ @Nullable private MessageWriter messageWriter(GridSelectorNioSessionImpl ses) throws IOException {
+ if (writerFactory == null)
+ return null;
+
+ MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+
+ if (writer == null) {
+ try {
+ ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to create message writer.", e);
+ }
+ }
+
+ return writer;
+ }
+
/**
* @param writer Customizer of writing.
* @param buf Buffer to write.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 8e6c966..1c92e8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -107,7 +107,7 @@
import org.junit.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE;
+import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.THREAD_LOCAL;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
@@ -2834,28 +2834,28 @@
@Test
public void testThreadLocalArrayReleased() throws Exception {
// Checking the writer directly.
- assertEquals(false, INSTANCE.isAcquired());
+ assertEquals(false, THREAD_LOCAL.isAcquired());
BinaryMarshaller marsh = binaryMarshaller();
try (BinaryWriterExImpl writer = new BinaryWriterExImpl(binaryContext(marsh))) {
- assertEquals(true, INSTANCE.isAcquired());
+ assertEquals(true, THREAD_LOCAL.isAcquired());
writer.writeString("Thread local test");
writer.array();
- assertEquals(true, INSTANCE.isAcquired());
+ assertEquals(true, THREAD_LOCAL.isAcquired());
}
// Checking the binary marshaller.
- assertEquals(false, INSTANCE.isAcquired());
+ assertEquals(false, THREAD_LOCAL.isAcquired());
marsh = binaryMarshaller();
marsh.marshal(new SimpleObject());
- assertEquals(false, INSTANCE.isAcquired());
+ assertEquals(false, THREAD_LOCAL.isAcquired());
marsh = binaryMarshaller();
@@ -2867,7 +2867,7 @@
BinaryObject binaryObj = builder.build();
- assertEquals(false, INSTANCE.isAcquired());
+ assertEquals(false, THREAD_LOCAL.isAcquired());
}
/**