Merge branch 'ignite-1.3.3' into ignite-1.3.3-sslbench
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 166eed5..e8d29b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -146,7 +146,7 @@
GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(routerClient), gridLog, false);
if (sslCtx != null) {
- GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog);
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
sslFilter.directMode(false);
sslFilter.clientMode(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
index cd4c607..7b1cd8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
@@ -228,7 +228,7 @@
GridNioFilter[] filters;
if (sslCtx != null) {
- GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log);
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, false, ByteOrder.nativeOrder(), log);
sslFilter.wantClientAuth(wantClientAuth);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index c37c17d..689e050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -215,7 +215,8 @@
GridNioFilter[] filters;
if (sslCtx != null) {
- GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log);
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx,
+ cfg.isDirectBuffer(), ByteOrder.nativeOrder(), log);
sslFilter.directMode(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index d3f439a..b57bf22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -70,6 +70,9 @@
/** SSL system data buffer metadata key. */
private static final int BUF_SSL_SYSTEM_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+ /** SSL write buf limit. */
+ private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
+
/** Accept worker thread. */
@GridToStringExclude
private final IgniteThread acceptThread;
@@ -920,6 +923,10 @@
}
ByteBuffer buf = ses.writeBuffer();
+
+ if (ses.meta(WRITE_BUF_LIMIT) != null)
+ buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
+
NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
List<NioOperationFuture<?>> doneFuts = null;
@@ -971,19 +978,24 @@
writer.reset();
}
+ int sesBufLimit = buf.limit();
+ int sesCap = buf.capacity();
+
buf.flip();
+ buf = sslFilter.encrypt(ses, buf);
+
ByteBuffer sesBuf = ses.writeBuffer();
- buf = sslFilter.encrypt(ses, sesBuf);
-
- int expand = sesBuf.limit() - buf.limit();
-
sesBuf.clear();
- // SSL data more then socket buffer size
- if (expand < 0)
- sesBuf.limit(sesBuf.limit() + expand - 100);
+ if (sesCap - buf.limit() < 0) {
+ int limit = sesBufLimit + (sesCap - buf.limit()) - 100;
+
+ ses.addMeta(WRITE_BUF_LIMIT, limit);
+
+ sesBuf.limit(limit);
+ }
assert buf.hasRemaining();
@@ -1022,8 +1034,12 @@
break;
}
- else
+ else {
buf = ses.writeBuffer();
+
+ if (ses.meta(WRITE_BUF_LIMIT) != null)
+ buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
+ }
}
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
index 9890efe..ee95308 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.util.nio.ssl;
import org.apache.ignite.*;
-import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -30,7 +29,6 @@
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
-import static org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.*;
/**
*
@@ -483,4 +481,39 @@
throw new IgniteCheckedException("Failed to write byte to socket.", e);
}
}
+
+ /**
+ * Expands the given byte buffer to the requested capacity.
+ *
+ * @param original Original byte buffer.
+ * @param cap Requested capacity.
+ * @return Expanded byte buffer.
+ */
+ private ByteBuffer expandBuffer(ByteBuffer original, int cap) {
+ ByteBuffer res = ByteBuffer.allocate(cap);
+
+ res.order(ByteOrder.nativeOrder());
+
+ original.flip();
+
+ res.put(original);
+
+ return res;
+ }
+
+ /**
+ * Copies the given byte buffer.
+ *
+ * @param original Byte buffer to copy.
+ * @return Copy of the original byte buffer.
+ */
+ private ByteBuffer copy(ByteBuffer original) {
+ ByteBuffer cp = ByteBuffer.allocate(original.remaining());
+
+ cp.put(original);
+
+ cp.flip();
+
+ return cp;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index a05135f..c3cb084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -52,6 +52,12 @@
/** SSL context to use. */
private SSLContext sslCtx;
+ /** Order. */
+ private ByteOrder order;
+
+ /** Allocate direct buffer or heap buffer. */
+ private boolean directBuf;
+
/** Whether SSLEngine should use client mode. */
private boolean clientMode;
@@ -62,13 +68,17 @@
* Creates SSL filter.
*
* @param sslCtx SSL context.
+ * @param directBuf Direct buffer flag.
+ * @param order Byte order.
* @param log Logger to use.
*/
- public GridNioSslFilter(SSLContext sslCtx, IgniteLogger log) {
+ public GridNioSslFilter(SSLContext sslCtx, boolean directBuf, ByteOrder order, IgniteLogger log) {
super("SSL filter");
this.log = log;
this.sslCtx = sslCtx;
+ this.directBuf = directBuf;
+ this.order = order;
}
/**
@@ -151,7 +161,7 @@
engine.setEnabledProtocols(enabledProtos);
try {
- GridNioSslHandler hnd = new GridNioSslHandler(this, ses, engine, log);
+ GridNioSslHandler hnd = new GridNioSslHandler(this, ses, engine, directBuf, order, log);
ses.addMeta(SSL_HANDLER.ordinal(), hnd);
@@ -182,7 +192,8 @@
}
/** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+ @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex)
+ throws IgniteCheckedException {
proceedExceptionCaught(ses, ex);
}
@@ -327,7 +338,8 @@
* @throws GridNioException If failed to forward requests to filter chain.
* @return Close future.
*/
- private GridNioFuture<Boolean> shutdownSession(GridNioSession ses, GridNioSslHandler hnd) throws IgniteCheckedException {
+ private GridNioFuture<Boolean> shutdownSession(GridNioSession ses, GridNioSslHandler hnd)
+ throws IgniteCheckedException {
try {
hnd.closeOutbound();
@@ -382,39 +394,4 @@
return (ByteBuffer)msg;
}
-
- /**
- * Expands the given byte buffer to the requested capacity.
- *
- * @param original Original byte buffer.
- * @param cap Requested capacity.
- * @return Expanded byte buffer.
- */
- public static ByteBuffer expandBuffer(ByteBuffer original, int cap) {
- ByteBuffer res = ByteBuffer.allocate(cap);
-
- res.order(ByteOrder.nativeOrder());
-
- original.flip();
-
- res.put(original);
-
- return res;
- }
-
- /**
- * Copies the given byte buffer.
- *
- * @param original Byte buffer to copy.
- * @return Copy of the original byte buffer.
- */
- public static ByteBuffer copy(ByteBuffer original) {
- ByteBuffer cp = ByteBuffer.allocate(original.remaining());
-
- cp.put(original);
-
- cp.flip();
-
- return cp;
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index dc3d870..8c1ac5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -45,6 +45,12 @@
/** SSL engine. */
private SSLEngine sslEngine;
+ /** Order. */
+ private ByteOrder order;
+
+ /** Allocate direct buffer or heap buffer. */
+ private boolean directBuf;
+
/** Session of this handler. */
private GridNioSession ses;
@@ -82,10 +88,16 @@
* @param ses Session for which this handler was created.
* @param engine SSL engine instance for this handler.
* @param log Logger to use.
+ * @param directBuf Direct buffer flag.
+ * @param order Byte order.
* @throws SSLException If exception occurred when starting SSL handshake.
*/
- GridNioSslHandler(GridNioSslFilter parent, GridNioSession ses, SSLEngine engine, IgniteLogger log)
- throws SSLException {
+ GridNioSslHandler(GridNioSslFilter parent,
+ GridNioSession ses,
+ SSLEngine engine,
+ boolean directBuf,
+ ByteOrder order,
+ IgniteLogger log) throws SSLException {
assert parent != null;
assert ses != null;
assert engine != null;
@@ -93,6 +105,8 @@
this.parent = parent;
this.ses = ses;
+ this.order = order;
+ this.directBuf = directBuf;
this.log = log;
sslEngine = engine;
@@ -111,8 +125,13 @@
// Allocate a little bit more so SSL engine would not return buffer overflow status.
int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
- outNetBuf = ByteBuffer.allocate(netBufSize);
- inNetBuf = ByteBuffer.allocate(netBufSize);
+ outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+
+ outNetBuf.order(order);
+
+ inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+
+ inNetBuf.order(order);
// Initially buffer is empty.
outNetBuf.position(0);
@@ -120,9 +139,9 @@
int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);
- appBuf = ByteBuffer.allocate(appBufSize);
+ appBuf = directBuf ? ByteBuffer.allocateDirect(appBufSize) : ByteBuffer.allocate(appBufSize);
- appBuf.order(ByteOrder.nativeOrder());
+ appBuf.order(order);
if (log.isDebugEnabled())
log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBufSize + ']');
@@ -588,6 +607,44 @@
}
/**
+ * Expands the given byte buffer to the requested capacity.
+ *
+ * @param original Original byte buffer.
+ * @param cap Requested capacity.
+ * @return Expanded byte buffer.
+ */
+ private ByteBuffer expandBuffer(ByteBuffer original, int cap) {
+ ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
+
+ res.order(order);
+
+ original.flip();
+
+ res.put(original);
+
+ return res;
+ }
+
+ /**
+ * Copies the given byte buffer.
+ *
+ * @param original Byte buffer to copy.
+ * @return Copy of the original byte buffer.
+ */
+ private ByteBuffer copy(ByteBuffer original) {
+ ByteBuffer cp = directBuf ? ByteBuffer.allocateDirect(original.remaining()) :
+ ByteBuffer.allocate(original.remaining());
+
+ cp.order(order);
+
+ cp.put(original);
+
+ cp.flip();
+
+ return cp;
+ }
+
+ /**
* Write request for cases while handshake is not finished yet.
*/
private static class WriteRequest {
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a665a3a..a0acb5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1478,7 +1478,8 @@
if (isSslEnabled()) {
GridNioSslFilter sslFilter =
- new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(), log);
+ new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(),
+ true, ByteOrder.nativeOrder(), log);
sslFilter.directMode(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
index 2a71f28..2ec4dec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java
@@ -124,7 +124,7 @@
assertTrue(F.forAny(log.logs(), new IgnitePredicate<String>() {
@Override public boolean apply(String s) {
- return s.contains("Topology snapshot [ver=2, server nodes=2, client nodes=0,")
+ return s.contains("Topology snapshot [ver=2, servers=2, clients=0,")
|| (s.contains(">>> Number of server nodes: 2") && s.contains(">>> Number of client nodes: 0"));
}
}));
@@ -174,7 +174,7 @@
assertTrue(F.forAny(log.logs(), new IgnitePredicate<String>() {
@Override public boolean apply(String s) {
- return s.contains("Topology snapshot [ver=4, server nodes=2, client nodes=2,")
+ return s.contains("Topology snapshot [ver=4, servers=2, clients=2,")
|| (s.contains(">>> Number of server nodes: 2") && s.contains(">>> Number of client nodes: 2"));
}
}));
@@ -225,7 +225,7 @@
assertTrue(F.forAny(log.logs(), new IgnitePredicate<String>() {
@Override public boolean apply(String s) {
- return s.contains("Topology snapshot [ver=5, server nodes=2, client nodes=3,")
+ return s.contains("Topology snapshot [ver=5, servers=2, clients=3,")
|| (s.contains(">>> Number of server nodes: 2") && s.contains(">>> Number of client nodes: 3"));
}
}));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCache150ClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCache150ClientsTest.java
index 282c7c8..3fc44c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCache150ClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCache150ClientsTest.java
@@ -51,6 +51,7 @@
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setLocalHost("127.0.0.1");
cfg.setNetworkTimeout(30_000);
cfg.setConnectorConfiguration(null);
cfg.setPeerClassLoadingEnabled(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
index 930b5d1..ba22395 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
@@ -69,7 +69,7 @@
.sendQueueLimit(0)
.filters(
new GridNioCodecFilter(parser, log, false),
- new GridNioSslFilter(sslCtx, log))
+ new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), log))
.build();
srvr.start();
diff --git a/modules/core/src/test/java/org/apache/ignite/startup/cmdline/GridCommandLineTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/startup/cmdline/GridCommandLineTransformerSelfTest.java
index ec85532..dafc649 100644
--- a/modules/core/src/test/java/org/apache/ignite/startup/cmdline/GridCommandLineTransformerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/startup/cmdline/GridCommandLineTransformerSelfTest.java
@@ -32,7 +32,7 @@
public void testTransformIfNoArguments() throws Exception {
assertEquals(
"\"INTERACTIVE=0\" \"QUIET=-DIGNITE_QUIET=true\" \"NO_PAUSE=0\" " +
- "\"JVM_XOPTS=\" \"CONFIG=\"",
+ "\"NO_JMX=0\" \"JVM_XOPTS=\" \"CONFIG=\"",
CommandLineTransformer.transform());
}
@@ -101,7 +101,7 @@
*/
public void testTransformIfOnlyPathToConfigSpecified() throws Exception {
assertEquals(
- "\"INTERACTIVE=0\" \"QUIET=-DIGNITE_QUIET=true\" \"NO_PAUSE=0\" " +
+ "\"INTERACTIVE=0\" \"QUIET=-DIGNITE_QUIET=true\" \"NO_PAUSE=0\" \"NO_JMX=0\" " +
"\"JVM_XOPTS=\" \"CONFIG=c:\\qw.xml\"",
CommandLineTransformer.transform("c:\\qw.xml"));
}
@@ -111,10 +111,10 @@
*/
public void testTransformIfAllSupportedArguments() throws Exception {
assertEquals(
- "\"INTERACTIVE=1\" \"QUIET=-DIGNITE_QUIET=false\" \"NO_PAUSE=1\" " +
- "\"JVM_XOPTS=-Xmx1g -Xms1m\" " +
- "\"CONFIG=\"c:\\path to\\русский каталог\"\"",
- CommandLineTransformer.transform("-i", "-np", "-v", "-J-Xmx1g", "-J-Xms1m",
+ "\"INTERACTIVE=1\" \"QUIET=-DIGNITE_QUIET=false\" \"NO_PAUSE=1\" \"NO_JMX=1\" " +
+ "\"JVM_XOPTS=-Xmx1g -Xms1m\" " +
+ "\"CONFIG=\"c:\\path to\\русский каталог\"\"",
+ CommandLineTransformer.transform("-i", "-np", "-v", "-J-Xmx1g", "-J-Xms1m", "-nojmx",
"\"c:\\path to\\русский каталог\""));
}
}