Revert "GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)"
This reverts commit 08e9e9673d0ed05555a3d74c6d16e706817cab09.
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
deleted file mode 100644
index 77fe9bf..0000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.tcp;
-
-import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.NAME;
-import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
-import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
-import static org.apache.geode.test.dunit.VM.getVM;
-import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Fail.fail;
-
-import java.io.File;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.logging.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DistributionMessage;
-import org.apache.geode.distributed.internal.DistributionMessageObserver;
-import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
-import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.DistributedBlackboard;
-import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
-import org.apache.geode.test.dunit.rules.DistributedRule;
-
-/**
- * It would be nice if this test didn't need to use the cache since the test's purpose is to test
- * that the {@link Connection} class can be closed while readers and writers hold locks on its
- * internal TLS {@link ByteBuffer}s
- *
- * But this test does use the cache (region) because it enabled us to use existing cache messaging
- * and to use the DistributionMessageObserver (observer) hooks.
- *
- * see also ClusterCommunicationsDUnitTest
- */
-public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
-
- private static final int SMALL_BUFFER_SIZE = 8000;
- private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered";
- private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate";
- private static final String regionName = "connectionCloseDUnitTestRegion";
- private static final Logger logger = LogService.getLogger();
-
- private static Cache cache;
-
- @Rule
- public DistributedRule distributedRule =
- DistributedRule.builder().withVMCount(3).build();
-
- @Rule
- public DistributedBlackboard blackboard = new DistributedBlackboard();
-
- @Rule
- public DistributedRestoreSystemProperties restoreSystemProperties =
- new DistributedRestoreSystemProperties();
-
- private VM locator;
- private VM sender;
- private VM receiver;
-
- @Before
- public void before() {
- locator = getVM(0);
- sender = getVM(1);
- receiver = getVM(2);
- }
-
- @After
- public void after() {
- receiver.invoke(() -> {
- DistributionMessageObserver.setInstance(null);
- });
- }
-
- @Test
- public void connectionWithHungReaderIsCloseableAndUnhangsReader()
- throws InterruptedException, TimeoutException {
-
- blackboard.clearGate(UPDATE_ENTERED_GATE);
- blackboard.clearGate(SUSPEND_UPDATE_GATE);
-
- final int locatorPort = createLocator(locator);
- createCacheAndRegion(sender, locatorPort);
- createCacheAndRegion(receiver, locatorPort);
-
- receiver
- .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)",
- () -> {
- final DistributionMessageObserver observer =
- new DistributionMessageObserver() {
-
- @Override
- public void beforeProcessMessage(final ClusterDistributionManager dm,
- final DistributionMessage message) {
- guardMessageProcessingHook(message, () -> {
- try {
- blackboard.signalGate(UPDATE_ENTERED_GATE);
- blackboard.waitForGate(SUSPEND_UPDATE_GATE);
- } catch (TimeoutException | InterruptedException e) {
- fail("message observus interruptus");
- }
- logger.info("BGB: got before process message: " + message);
- });
- }
- };
- DistributionMessageObserver.setInstance(observer);
- });
-
- final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> {
- final Region<Object, Object> region = cache.getRegion(regionName);
- // test is going to close the cache while we are waiting for our ack
- assertThatThrownBy(() -> {
- region.put("hello", "world");
- }).isInstanceOf(DistributedSystemDisconnectedException.class);
- });
-
- // wait until our message observer is blocked
- blackboard.waitForGate(UPDATE_ENTERED_GATE);
-
- // at this point our put() is blocked waiting for a direct ack
- assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue();
-
- /*
- * Now close the cache. The point of calling it is to test that we don't block while trying
- * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn
- * closes all the connections (and their sockets.) We want the sockets to close because that'll
- * cause our hung put() to see a DistributedSystemDisconnectedException.
- */
- sender.invoke("", () -> cache.close());
-
- // wait for put task to complete: with an exception, that is!
- putInvocation.get();
-
- // un-stick our message observer
- blackboard.signalGate(SUSPEND_UPDATE_GATE);
- }
-
- private void guardMessageProcessingHook(final DistributionMessage message,
- final Runnable runnable) {
- if (message instanceof UpdateMessage) {
- final UpdateMessage updateMessage = (UpdateMessage) message;
- if (updateMessage.getRegionPath().equals("/" + regionName)) {
- runnable.run();
- }
- }
- }
-
- private int createLocator(VM memberVM) {
- return memberVM.invoke("create locator", () -> {
- // if you need to debug SSL communications use this property:
- // System.setProperty("javax.net.debug", "all");
- System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
- return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
- .getPort();
- });
- }
-
- private void createCacheAndRegion(VM memberVM, int locatorPort) {
- memberVM.invoke("start cache and create region", () -> {
- cache = createCache(locatorPort);
- cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
- });
- }
-
- private Cache createCache(int locatorPort) {
- // if you need to debug SSL communications use this property:
- // System.setProperty("javax.net.debug", "all");
- Properties properties = getDistributedSystemProperties();
- properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
- return new CacheFactory(properties).create();
- }
-
- private Properties getDistributedSystemProperties() {
- Properties properties = new Properties();
- properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
- properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
- properties.setProperty(NAME, "vm" + VM.getCurrentVMNum());
- properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack
- properties.setProperty(SOCKET_LEASE_TIME, "10000");
- properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
-
- properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator");
- properties
- .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore")
- .getAbsolutePath());
- properties.setProperty(SSL_TRUSTSTORE,
- createTempFileFromResource(getClass(), "server.keystore")
- .getAbsolutePath());
- properties.setProperty(SSL_PROTOCOLS, "TLSv1.2");
- properties.setProperty(SSL_KEYSTORE_PASSWORD, "password");
- properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password");
- properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true");
- return properties;
- }
-
-}
diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore
deleted file mode 100644
index 8b5305f..0000000
--- a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore
+++ /dev/null
Binary files differ
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index a70f3b1..dc7df44 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -215,9 +215,7 @@
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
- try (final ByteBufferSharing unused =
- nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
- }
+ nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
})
.isInstanceOf(IOException.class);
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index add6b9a..19eab4f 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -256,13 +256,11 @@
ByteBuffer buffer = bbos.getContentBuffer();
System.out.println(
"client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
- try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) {
- ByteBuffer wrappedBuffer = outputSharing.getBuffer();
- System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
- + " and limit is " + wrappedBuffer.limit());
- int bytesWritten = clientChannel.write(wrappedBuffer);
- System.out.println("client bytes written is " + bytesWritten);
- }
+ ByteBuffer wrappedBuffer = engine.wrap(buffer);
+ System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+ + " and limit is " + wrappedBuffer.limit());
+ int bytesWritten = clientChannel.write(wrappedBuffer);
+ System.out.println("client bytes written is " + bytesWritten);
}
private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
@@ -301,9 +299,7 @@
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
- try (final ByteBufferSharing unused =
- nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
- }
+ nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
})
.isInstanceOf(IOException.class);
}
@@ -317,35 +313,24 @@
private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
throws IOException {
- try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) {
- final ByteBuffer unwrapped = sharedBuffer.getBuffer();
- // if we already have unencrypted data skip unwrapping
- if (unwrapped.position() == 0) {
- int bytesRead;
- // if we already have encrypted data skip reading from the socket
- if (buffer.position() == 0) {
- bytesRead = socket.getChannel().read(buffer);
- buffer.flip();
- } else {
- bytesRead = buffer.remaining();
- }
- System.out.println("server bytes read is " + bytesRead + ": buffer position is "
- + buffer.position() + " and limit is " + buffer.limit());
- try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) {
- final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer();
-
- unwrapped2.flip();
- System.out.println("server unwrapped buffer position is " + unwrapped2.position()
- + " and limit is " + unwrapped2.limit());
- finishReadMessageFromNIOSSLClient(unwrapped2);
- }
+ ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
+ // if we already have unencrypted data skip unwrapping
+ if (unwrapped.position() == 0) {
+ int bytesRead;
+ // if we already have encrypted data skip reading from the socket
+ if (buffer.position() == 0) {
+ bytesRead = socket.getChannel().read(buffer);
+ buffer.flip();
} else {
- finishReadMessageFromNIOSSLClient(unwrapped);
+ bytesRead = buffer.remaining();
}
+ System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+ + buffer.position() + " and limit is " + buffer.limit());
+ unwrapped = engine.unwrap(buffer);
+ unwrapped.flip();
+ System.out.println("server unwrapped buffer position is " + unwrapped.position()
+ + " and limit is " + unwrapped.limit());
}
- }
-
- private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
DataInputStream dis = new DataInputStream(bbis);
String welcome = dis.readUTF();
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 33f43c3..a46d5fc 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,3 @@
org/apache/geode/cache/query/internal/xml/ElementType$1
org/apache/geode/cache/query/internal/xml/ElementType$2
org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
deleted file mode 100644
index cdfa897..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.net;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-
-/**
- * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for
- * reading and modification) within the scope of that try block.
- *
- * Releases managed ByteBuffer back to pool after last reference is dropped.
- */
-public interface ByteBufferSharing extends AutoCloseable {
-
- /**
- * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was
- * acquired. Retain the reference only within the scope of that try-with-resources.
- *
- * @return the buffer: manipulable only within the scope of the try-with-resources
- * @throws IOException if the buffer is no longer accessible
- */
- ByteBuffer getBuffer() throws IOException;
-
- /**
- * Expand the buffer if needed. This may return a different object so be sure to pay attention to
- * the return value if you need access to the potentially- expanded buffer.
- *
- * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
- *
- * @return the same buffer or a different (bigger) buffer
- * @throws IOException if the buffer is no longer accessible
- */
- ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
-
- /**
- * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
- */
- @Override
- void close();
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
deleted file mode 100644
index e9a941e..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.net;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.geode.annotations.VisibleForTesting;
-import org.apache.geode.internal.net.BufferPool.BufferType;
-
-/**
- * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
- * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
- * try-with-resources.
- */
-class ByteBufferSharingImpl implements ByteBufferSharing {
-
- static class OpenAttemptTimedOut extends Exception {
- }
-
- private final Lock lock;
- private final AtomicBoolean isClosed;
- // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
- private ByteBuffer buffer;
- private final BufferType bufferType;
- private final AtomicInteger counter;
- private final BufferPool bufferPool;
-
- /**
- * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
- *
- * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
- * to an external object or is returned to an external caller.)
- *
- * This constructor acquires no lock. The reference count will be 1 after this constructor
- * completes.
- */
- ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
- final BufferPool bufferPool) {
- this.buffer = buffer;
- this.bufferType = bufferType;
- this.bufferPool = bufferPool;
- lock = new ReentrantLock();
- counter = new AtomicInteger(1);
- isClosed = new AtomicBoolean(false);
- }
-
- /**
- * The destructor. Called by the resource owner to undo the work of the constructor.
- */
- void destruct() {
- if (isClosed.compareAndSet(false, true)) {
- dropReference();
- }
- }
-
- /**
- * This method is for use only by the owner of the shared resource. It's used for handing out
- * references to the shared resource. So it does reference counting and also acquires a lock.
- *
- * Resource owners call this method as the last thing before returning a reference to the caller.
- * That caller binds that reference to a variable in a try-with-resources statement and relies on
- * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
- */
- ByteBufferSharing open() {
- lock.lock();
- addReference();
- return this;
- }
-
- /**
- * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
- */
- ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut {
- try {
- if (!lock.tryLock(time, unit)) {
- throw new OpenAttemptTimedOut();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new OpenAttemptTimedOut();
- }
- addReference();
- return this;
- }
-
- @Override
- public ByteBuffer getBuffer() throws IOException {
- if (isClosed.get()) {
- throw new IOException("NioSslEngine has been closed");
- } else {
- return buffer;
- }
- }
-
- @Override
- public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
- return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
- }
-
- @Override
- public void close() {
- /*
- * We are counting on our ReentrantLock throwing an exception if the current thread
- * does not hold the lock. In that case dropReference() will not be called. This
- * prevents ill-behaved clients (clients that call close() too many times) from
- * corrupting our reference count.
- */
- lock.unlock();
- dropReference();
- }
-
- private int addReference() {
- return counter.incrementAndGet();
- }
-
- private int dropReference() {
- final int usages = counter.decrementAndGet();
- if (usages == 0) {
- bufferPool.releaseBuffer(bufferType, buffer);
- }
- return usages;
- }
-
- @VisibleForTesting
- public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
- buffer = newBufferForTesting;
- }
-
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
deleted file mode 100644
index bd707e3..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.net;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
- * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
- * try-with-resources.
- *
- * This implementation is a "no-op". It performs no actual locking and no reference counting. It's
- * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
- * needs no reference counting on buffers, nor any synchronization around access to buffers.
- *
- * See also {@link ByteBufferSharingImpl}
- */
-class ByteBufferSharingNoOp implements ByteBufferSharing {
-
- private final ByteBuffer buffer;
-
- ByteBufferSharingNoOp(final ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- @Override
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- @Override
- public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
- throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
- }
-
- @Override
- public void close() {}
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index eb53f0e..9c437ad 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -19,53 +19,47 @@
import java.nio.channels.SocketChannel;
/**
- * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to
- * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br>
- * Implementations of
- * this class may not be thread-safe in regard to the buffers their methods return. These may be
- * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate
- * external synchronization must be used in order to provide thread-safety. Do this by invoking
- * getSynchObject() and synchronizing on the returned object while using the buffer.
+ * Prior to transmitting a buffer or processing a received buffer
+ * a NioFilter should be called to wrap (transmit) or unwrap (received)
+ * the buffer in case SSL is being used.<br>
+ * Implementations of this class may not be thread-safe in regard to
+ * the buffers their methods return. These may be internal state that,
+ * if used concurrently by multiple threads could cause corruption.
+ * Appropriate external synchronization must be used in order to provide
+ * thread-safety. Do this by invoking getSynchObject() and synchronizing on
+ * the returned object while using the buffer.
*/
public interface NioFilter {
/**
* wrap bytes for transmission to another process
- *
- * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
- * to call this method in a try-with-resources statement.
*/
- ByteBufferSharing wrap(ByteBuffer buffer) throws IOException;
+ ByteBuffer wrap(ByteBuffer buffer) throws IOException;
/**
- * unwrap bytes received from another process. The unwrapped buffer should be flipped before
- * reading. When done reading invoke doneReading() to reset for future read ops
- *
- * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
- * to call this method in a try-with-resources statement.
+ * unwrap bytes received from another process. The unwrapped
+ * buffer should be flipped before reading. When done reading invoke
+ * doneReading() to reset for future read ops
*/
- ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException;
+ ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
/**
- * ensure that the wrapped buffer has enough room to read the given amount of data. This must be
- * invoked before readAtLeast. A new buffer may be returned by this method.
+ * ensure that the wrapped buffer has enough room to read the given amount of data.
+ * This must be invoked before readAtLeast. A new buffer may be returned by this method.
*/
ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
BufferPool.BufferType bufferType);
/**
- * read at least the indicated amount of bytes from the given socket. The buffer position will be
- * ready for reading the data when this method returns. Note: you must invoke
- * ensureWrappedCapacity with the given amount prior to each invocation of this method.
+ * read at least the indicated amount of bytes from the given
+ * socket. The buffer position will be ready for reading
+ * the data when this method returns. Note: you must invoke ensureWrappedCapacity
+ * with the given amount prior to each invocation of this method.
* <br>
* wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
- * unwrappedBuffer
- * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
- *
- * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
- * to call this method in a try-with-resources statement.
+ * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
*/
- ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+ ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
throws IOException;
/**
@@ -87,19 +81,28 @@
}
}
+ default boolean isClosed() {
+ return false;
+ }
+
/**
* invoke this method when you are done using the NioFilter
+ *
*/
default void close(SocketChannel socketChannel) {
// nothing by default
}
/**
- * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists.
- *
- * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
- * to call this method in a try-with-resources statement.
+ * returns the unwrapped byte buffer associated with the given wrapped buffer.
*/
- ByteBufferSharing getUnwrappedBuffer();
+ ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
+ /**
+ * returns an object to be used in synchronizing on the use of buffers returned by
+ * a NioFilter.
+ */
+ default Object getSynchObject() {
+ return this;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 8b5df96..3ebce38 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.geode.annotations.internal.MakeImmutable;
import org.apache.geode.internal.Assert;
/**
@@ -28,12 +27,6 @@
* secure communications.
*/
public class NioPlainEngine implements NioFilter {
-
- // this variable requires the MakeImmutable annotation but the buffer is empty and
- // not really modifiable
- @MakeImmutable
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
private final BufferPool bufferPool;
int lastReadPosition;
@@ -45,14 +38,14 @@
}
@Override
- public ByteBufferSharing wrap(ByteBuffer buffer) {
- return shareBuffer(buffer);
+ public ByteBuffer wrap(ByteBuffer buffer) {
+ return buffer;
}
@Override
- public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) {
+ public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
wrappedBuffer.position(wrappedBuffer.limit());
- return shareBuffer(wrappedBuffer);
+ return wrappedBuffer;
}
@Override
@@ -89,7 +82,7 @@
}
@Override
- public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+ public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
throws IOException {
ByteBuffer buffer = wrappedBuffer;
@@ -115,7 +108,7 @@
buffer.position(lastProcessedPosition);
lastProcessedPosition += bytes;
- return shareBuffer(buffer);
+ return buffer;
}
public void doneReading(ByteBuffer unwrappedBuffer) {
@@ -128,12 +121,8 @@
}
@Override
- public ByteBufferSharing getUnwrappedBuffer() {
- return shareBuffer(EMPTY_BUFFER);
- }
-
- private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) {
- return new ByteBufferSharingNoOp(wrappedBuffer);
+ public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
+ return wrappedBuffer;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index d2415e1..bacd538 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,19 +40,24 @@
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
-import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.annotations.internal.MakeImmutable;
import org.apache.geode.internal.net.BufferPool.BufferType;
-import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
- * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe.
- * Its use should be confined to one thread or should be protected by external synchronization.
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
+ * safe. Its use should be confined to one thread or should be protected by external
+ * synchronization.
*/
public class NioSslEngine implements NioFilter {
private static final Logger logger = LogService.getLogger();
+ // this variable requires the MakeImmutable annotation but the buffer is empty and
+ // not really modifiable
+ @MakeImmutable
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
private final BufferPool bufferPool;
private boolean closed;
@@ -60,28 +65,23 @@
SSLEngine engine;
/**
- * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
+ * myNetData holds bytes wrapped by the SSLEngine
*/
- private final ByteBufferSharingImpl outputSharing;
+ ByteBuffer myNetData;
/**
- * holds the last unwrapped data from a peer; a.k.a. peerAppData
+ * peerAppData holds the last unwrapped data from a peer
*/
- private final ByteBufferSharingImpl inputSharing;
+ ByteBuffer peerAppData;
NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
SSLSession session = engine.getSession();
int appBufferSize = session.getApplicationBufferSize();
int packetBufferSize = engine.getSession().getPacketBufferSize();
- closed = false;
this.engine = engine;
this.bufferPool = bufferPool;
- outputSharing =
- new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
- TRACKED_SENDER, bufferPool);
- inputSharing =
- new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
- TRACKED_RECEIVER, bufferPool);
+ this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
+ this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
}
/**
@@ -135,65 +135,57 @@
switch (status) {
case NEED_UNWRAP:
- try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
- final ByteBuffer peerAppData = inputSharing.getBuffer();
+ // Receive handshaking data from peer
+ int dataRead = socketChannel.read(handshakeBuffer);
- // Receive handshaking data from peer
- int dataRead = socketChannel.read(handshakeBuffer);
+ // Process incoming handshaking data
+ handshakeBuffer.flip();
+ engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+ handshakeBuffer.compact();
+ status = engineResult.getHandshakeStatus();
- // Process incoming handshaking data
- handshakeBuffer.flip();
-
-
- engineResult = engine.unwrap(handshakeBuffer, peerAppData);
- handshakeBuffer.compact();
- status = engineResult.getHandshakeStatus();
-
- // if we're not finished, there's nothing to process and no data was read let's hang out
- // for a little
- if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
- Thread.sleep(10);
- }
-
- if (engineResult.getStatus() == BUFFER_OVERFLOW) {
- inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2);
- }
- break;
+ // if we're not finished, there's nothing to process and no data was read let's hang out
+ // for a little
+ if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
+ Thread.sleep(10);
}
+ if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+ peerAppData =
+ expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
+ }
+ break;
+
case NEED_WRAP:
- try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
- final ByteBuffer myNetData = outputSharing.getBuffer();
+ // Empty the local network packet buffer.
+ myNetData.clear();
- // Empty the local network packet buffer.
- myNetData.clear();
+ // Generate handshaking data
+ engineResult = engine.wrap(myAppData, myNetData);
+ status = engineResult.getHandshakeStatus();
- // Generate handshaking data
- engineResult = engine.wrap(myAppData, myNetData);
- status = engineResult.getHandshakeStatus();
-
- // Check status
- switch (engineResult.getStatus()) {
- case BUFFER_OVERFLOW:
- // no need to assign return value because we will never reference it
- outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2);
- break;
- case OK:
- myNetData.flip();
- // Send the handshaking data to peer
- while (myNetData.hasRemaining()) {
- socketChannel.write(myNetData);
- }
- break;
- case CLOSED:
- break;
- default:
- logger.info("handshake terminated with illegal state due to {}", status);
- throw new IllegalStateException(
- "Unknown SSLEngineResult status: " + engineResult.getStatus());
- }
- break;
+ // Check status
+ switch (engineResult.getStatus()) {
+ case BUFFER_OVERFLOW:
+ myNetData =
+ expandWriteBuffer(TRACKED_SENDER, myNetData,
+ myNetData.capacity() * 2);
+ break;
+ case OK:
+ myNetData.flip();
+ // Send the handshaking data to peer
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ logger.info("handshake terminated with illegal state due to {}", status);
+ throw new IllegalStateException(
+ "Unknown SSLEngineResult status: " + engineResult.getStatus());
}
+ break;
case NEED_TASK:
// Handle blocking tasks
handleBlockingTasks();
@@ -221,6 +213,17 @@
return true;
}
+ ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
+ int desiredCapacity) {
+ return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
+ }
+
+ synchronized void checkClosed() throws IOException {
+ if (closed) {
+ throw new IOException("NioSslEngine has been closed");
+ }
+ }
+
void handleBlockingTasks() {
Runnable task;
while ((task = engine.getDelegatedTask()) != null) {
@@ -230,84 +233,79 @@
}
@Override
- public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
- try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+ public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException {
+ checkClosed();
- ByteBuffer myNetData = outputSharing.getBuffer();
+ myNetData.clear();
- myNetData.clear();
+ while (appData.hasRemaining()) {
+ // ensure we have lots of capacity since encrypted data might
+ // be larger than the app data
+ int remaining = myNetData.capacity() - myNetData.position();
- while (appData.hasRemaining()) {
- // ensure we have lots of capacity since encrypted data might
- // be larger than the app data
- int remaining = myNetData.capacity() - myNetData.position();
-
- if (remaining < (appData.remaining() * 2)) {
- int newCapacity = expandedCapacity(appData, myNetData);
- myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
- }
-
- SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
-
- if (wrapResult.getHandshakeStatus() == NEED_TASK) {
- handleBlockingTasks();
- }
-
- if (wrapResult.getStatus() != OK) {
- throw new SSLException("Error encrypting data: " + wrapResult);
- }
+ if (remaining < (appData.remaining() * 2)) {
+ int newCapacity = expandedCapacity(appData, myNetData);
+ myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
}
- myNetData.flip();
+ SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
- return shareOutputBuffer();
+ if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+ handleBlockingTasks();
+ }
+
+ if (wrapResult.getStatus() != OK) {
+ throw new SSLException("Error encrypting data: " + wrapResult);
+ }
}
+
+ myNetData.flip();
+
+ return myNetData;
}
@Override
- public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
- try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+ public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
+ checkClosed();
- ByteBuffer peerAppData = inputSharing.getBuffer();
+ // note that we do not clear peerAppData as it may hold a partial
+ // message. TcpConduit, for instance, uses message chunking to
+ // transmit large payloads and we may have read a partial chunk
+ // during the previous unwrap
- // note that we do not clear peerAppData as it may hold a partial
- // message. TcpConduit, for instance, uses message chunking to
- // transmit large payloads and we may have read a partial chunk
- // during the previous unwrap
-
- peerAppData.limit(peerAppData.capacity());
- boolean stopDecryption = false;
- while (wrappedBuffer.hasRemaining() && !stopDecryption) {
- SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
- switch (unwrapResult.getStatus()) {
- case BUFFER_OVERFLOW:
- // buffer overflow expand and try again - double the available decryption space
- int newCapacity =
- (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
- newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
- peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity);
- peerAppData.limit(peerAppData.capacity());
- break;
- case BUFFER_UNDERFLOW:
- // partial data - need to read more. When this happens the SSLEngine will not have
- // changed the buffer position
- wrappedBuffer.compact();
- return shareInputBuffer();
- case OK:
- break;
- default:
- // if there is data in the decrypted buffer return it. Otherwise signal that we're
- // having trouble
- if (peerAppData.position() <= 0) {
- throw new SSLException("Error decrypting data: " + unwrapResult);
- }
- stopDecryption = true;
- break;
- }
+ peerAppData.limit(peerAppData.capacity());
+ boolean stopDecryption = false;
+ while (wrappedBuffer.hasRemaining() && !stopDecryption) {
+ SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+ switch (unwrapResult.getStatus()) {
+ case BUFFER_OVERFLOW:
+ // buffer overflow expand and try again - double the available decryption space
+ int newCapacity =
+ (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
+ newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
+ peerAppData =
+ bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity);
+ peerAppData.limit(peerAppData.capacity());
+ break;
+ case BUFFER_UNDERFLOW:
+ // partial data - need to read more. When this happens the SSLEngine will not have
+ // changed the buffer position
+ wrappedBuffer.compact();
+ return peerAppData;
+ case OK:
+ break;
+ default:
+ // if there is data in the decrypted buffer return it. Otherwise signal that we're
+ // having trouble
+ if (peerAppData.position() <= 0) {
+ throw new SSLException("Error decrypting data: " + unwrapResult);
+ }
+ stopDecryption = true;
+ break;
}
- wrappedBuffer.clear();
- return shareInputBuffer();
}
+ wrappedBuffer.clear();
+ return peerAppData;
}
@Override
@@ -324,45 +322,50 @@
}
@Override
- public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
+ public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
ByteBuffer wrappedBuffer) throws IOException {
- try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
-
- ByteBuffer peerAppData = inputSharing.getBuffer();
-
- if (peerAppData.capacity() > bytes) {
- // we already have a buffer that's big enough
- if (peerAppData.capacity() - peerAppData.position() < bytes) {
- peerAppData.compact();
- peerAppData.flip();
- }
+ if (peerAppData.capacity() > bytes) {
+ // we already have a buffer that's big enough
+ if (peerAppData.capacity() - peerAppData.position() < bytes) {
+ peerAppData.compact();
+ peerAppData.flip();
}
-
- while (peerAppData.remaining() < bytes) {
- wrappedBuffer.limit(wrappedBuffer.capacity());
- int amountRead = channel.read(wrappedBuffer);
- if (amountRead < 0) {
- throw new EOFException();
- }
- if (amountRead > 0) {
- wrappedBuffer.flip();
- // prep the decoded buffer for writing
- peerAppData.compact();
- try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) {
- // done writing to the decoded buffer - prep it for reading again
- final ByteBuffer peerAppDataNew = inputSharing2.getBuffer();
- peerAppDataNew.flip();
- peerAppData = peerAppDataNew; // loop needs new reference!
- }
- }
- }
- return shareInputBuffer();
}
+
+ while (peerAppData.remaining() < bytes) {
+ wrappedBuffer.limit(wrappedBuffer.capacity());
+ int amountRead = channel.read(wrappedBuffer);
+ if (amountRead < 0) {
+ throw new EOFException();
+ }
+ if (amountRead > 0) {
+ wrappedBuffer.flip();
+ // prep the decoded buffer for writing
+ peerAppData.compact();
+ peerAppData = unwrap(wrappedBuffer);
+ // done writing to the decoded buffer - prep it for reading again
+ peerAppData.flip();
+ }
+ }
+ return peerAppData;
}
@Override
- public ByteBufferSharing getUnwrappedBuffer() {
- return shareInputBuffer();
+ public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
+ return peerAppData;
+ }
+
+ /**
+ * ensures that the unwrapped buffer associated with the given wrapped buffer has
+ * sufficient capacity for the given amount of bytes. This may compact the
+ * buffer or it may return a new buffer.
+ */
+ public ByteBuffer ensureUnwrappedCapacity(int amount) {
+ // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
+ // do not use them for I/O operations
+ peerAppData =
+ bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
+ return peerAppData;
}
@Override
@@ -373,14 +376,16 @@
}
@Override
+ public synchronized boolean isClosed() {
+ return closed;
+ }
+
+ @Override
public synchronized void close(SocketChannel socketChannel) {
if (closed) {
return;
}
- closed = true;
- inputSharing.destruct();
- try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
- final ByteBuffer myNetData = outputSharing.getBuffer();
+ try {
if (!engine.isOutboundDone()) {
ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
@@ -407,13 +412,14 @@
// we can't send a close message if the channel is closed
} catch (IOException e) {
throw new GemFireIOException("exception closing SSL session", e);
- } catch (final OpenAttemptTimedOut _unused) {
- logger.info(String.format("Couldn't get output lock in time, eliding TLS close message"));
- if (!engine.isOutboundDone()) {
- engine.closeOutbound();
- }
} finally {
- outputSharing.destruct();
+ ByteBuffer netData = myNetData;
+ ByteBuffer appData = peerAppData;
+ myNetData = null;
+ peerAppData = EMPTY_BUFFER;
+ bufferPool.releaseBuffer(TRACKED_SENDER, netData);
+ bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
+ this.closed = true;
}
}
@@ -422,17 +428,4 @@
targetBuffer.capacity() * 2);
}
- @VisibleForTesting
- public ByteBufferSharing shareOutputBuffer() {
- return outputSharing.open();
- }
-
- private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
- throws OpenAttemptTimedOut {
- return outputSharing.open(time, unit);
- }
-
- public ByteBufferSharing shareInputBuffer() {
- return inputSharing.open();
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 844ab11..29d15e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -78,7 +78,6 @@
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.ByteBufferSharing;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.net.NioPlainEngine;
import org.apache.geode.internal.net.SocketCreator;
@@ -803,12 +802,11 @@
@VisibleForTesting
void clearSSLInputBuffer() {
if (getConduit().useSSL() && ioFilter != null) {
- try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) {
- // clear out any remaining handshake bytes
- try {
- sharedBuffer.getBuffer().position(0).limit(0);
- } catch (IOException e) {
- // means the NioFilter was already closed
+ synchronized (ioFilter.getSynchObject()) {
+ if (!ioFilter.isClosed()) {
+ // clear out any remaining handshake bytes
+ ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
+ buffer.position(0).limit(0);
}
}
}
@@ -2455,9 +2453,8 @@
long queueTimeoutTarget = now + asyncQueueTimeout;
channel.configureBlocking(false);
try {
- try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
- final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
-
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
int waitTime = 1;
do {
owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2610,9 +2607,9 @@
}
// fall through
}
- try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
- final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
-
+ // synchronize on the ioFilter while using its network buffer
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
while (wrappedBuffer.remaining() > 0) {
int amtWritten = 0;
long start = stats.startSocketWrite(true);
@@ -2664,12 +2661,10 @@
final KnownVersion version = getRemoteVersion();
try {
msgReader = new MsgReader(this, ioFilter, version);
-
ReplyMessage msg;
int len;
- // (we have to lock here to protect between reading header and message body)
- try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) {
+ synchronized (ioFilter.getSynchObject()) {
Header header = msgReader.readHeader();
if (header.getMessageType() == NORMAL_MSG_TYPE) {
@@ -2686,7 +2681,7 @@
releaseMsgDestreamer(header.getMessageId(), destreamer);
len = destreamer.size();
}
- }
+ } // sync
// I'd really just like to call dispatchMessage here. However,
// that call goes through a bunch of checks that knock about
// 10% of the performance. Since this direct-ack stuff is all
@@ -2753,9 +2748,8 @@
private void processInputBuffer() throws ConnectionException, IOException {
inputBuffer.flip();
- try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
- final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
-
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
peerDataBuffer.flip();
boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 42ecf04..48eb984 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -26,7 +26,6 @@
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.ByteBufferSharing;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -55,8 +54,8 @@
}
Header readHeader() throws IOException {
- try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) {
- ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
@@ -90,8 +89,8 @@
*/
DistributionMessage readMessage(Header header)
throws IOException, ClassNotFoundException {
- try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
- ByteBuffer nioInputBuffer = sharedBuffer.getBuffer();
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
this.getStats().incMessagesBeingReceived(true, header.messageLength);
long startSer = this.getStats().startMsgDeserialization();
@@ -113,8 +112,8 @@
void readChunk(Header header, MsgDestreamer md)
throws IOException {
- try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
- ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
md.addChunk(unwrappedBuffer, header.messageLength);
// show that the bytes have been consumed by adjusting the buffer's position
@@ -124,7 +123,7 @@
- private ByteBufferSharing readAtLeast(int bytes) throws IOException {
+ private ByteBuffer readAtLeast(int bytes) throws IOException {
peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
BufferPool.BufferType.TRACKED_RECEIVER);
return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
deleted file mode 100644
index bb5a75f..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.net;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class ByteBufferSharingImplTest {
-
- private ByteBufferSharingImpl sharing;
- private BufferPool poolMock;
- private CountDownLatch clientHasOpenedResource;
- private CountDownLatch clientMayComplete;
-
- @Before
- public void before() {
- poolMock = mock(BufferPool.class);
- sharing =
- new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
- poolMock);
- clientHasOpenedResource = new CountDownLatch(1);
- clientMayComplete = new CountDownLatch(1);
- }
-
- @Test
- public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
- resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
- try (final ByteBufferSharing _unused = sharing.open()) {
- }
- });
- }
-
- @Test
- public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
- resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
- final ByteBufferSharing sharing2 = sharing.open();
- sharing2.close();
- verify(poolMock, times(0)).releaseBuffer(any(), any());
- assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
- verify(poolMock, times(0)).releaseBuffer(any(), any());
- });
- }
-
- @Test
- public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
- clientIsLastReferenceHolder("client with balanced close calls", () -> {
- try (final ByteBufferSharing _unused = sharing.open()) {
- clientHasOpenedResource.countDown();
- blockClient();
- }
- });
- }
-
- @Test
- public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
- clientIsLastReferenceHolder("client with extra close calls", () -> {
- final ByteBufferSharing sharing2 = sharing.open();
- clientHasOpenedResource.countDown();
- blockClient();
- sharing2.close();
- verify(poolMock, times(1)).releaseBuffer(any(), any());
- assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
- System.out.println("here");
- });
- }
-
- @Test
- public void extraCloseDoesNotPrematurelyReturnBufferToPool() {
- final ByteBufferSharing sharing2 = sharing.open();
- sharing2.close();
- assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
- verify(poolMock, times(0)).releaseBuffer(any(), any());
- sharing.destruct();
- verify(poolMock, times(1)).releaseBuffer(any(), any());
- }
-
- @Test
- public void extraCloseDoesNotDecrementRefCount() {
- final ByteBufferSharing sharing2 = sharing.open();
- sharing2.close();
- assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
- final ByteBufferSharing sharing3 = this.sharing.open();
- sharing.destruct();
- verify(poolMock, times(0)).releaseBuffer(any(), any());
- }
-
- private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client)
- throws InterruptedException {
- /*
- * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
- */
-
- /*
- * clientThread thread is playing the role of the client (of the resource owner)
- */
- final Thread clientThread = new Thread(client, name);
- clientThread.start();
- clientThread.join();
-
- verify(poolMock, times(0)).releaseBuffer(any(), any());
-
- sharing.destruct();
-
- verify(poolMock, times(1)).releaseBuffer(any(), any());
- }
-
- private void clientIsLastReferenceHolder(final String name, final Runnable client)
- throws InterruptedException {
- /*
- * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
- */
-
- /*
- * clientThread thread is playing the role of the client (of the resource owner)
- */
- final Thread clientThread = new Thread(client, name);
- clientThread.start();
-
- clientHasOpenedResource.await();
-
- sharing.destruct();
-
- verify(poolMock, times(0)).releaseBuffer(any(), any());
-
- clientMayComplete.countDown(); // let client finish
-
- clientThread.join();
-
- verify(poolMock, times(1)).releaseBuffer(any(), any());
- }
-
- private void blockClient() {
- try {
- clientMayComplete.await();
- } catch (InterruptedException e) {
- fail("test client thread interrupted: " + e);
- }
- }
-
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 7ab838c..3d394fb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -50,8 +50,7 @@
public void unwrap() {
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.position(0).limit(buffer.capacity());
- try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) {
- }
+ nioEngine.unwrap(buffer);
assertThat(buffer.position()).isEqualTo(buffer.limit());
}
@@ -117,29 +116,23 @@
nioEngine.lastReadPosition = 10;
- try (final ByteBufferSharing sharedBuffer =
- nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- ByteBuffer data = sharedBuffer.getBuffer();
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(amountToRead);
- assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
- assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
- }
+ ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(amountToRead);
+ assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
+ assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
- try (final ByteBufferSharing sharedBuffer =
- nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- final ByteBuffer data = sharedBuffer.getBuffer();
- verify(mockChannel, times(5)).read(any(ByteBuffer.class));
- // at end of last readAtLeast data
- assertThat(data.position()).isEqualTo(amountToRead);
- // we read amountToRead bytes
- assertThat(data.limit()).isEqualTo(amountToRead * 2);
- // we did 2 more reads from the network
- assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
- // the next read will start at the end of consumed data
- assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
- }
+ data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ verify(mockChannel, times(5)).read(any(ByteBuffer.class));
+ // at end of last readAtLeast data
+ assertThat(data.position()).isEqualTo(amountToRead);
+ // we read amountToRead bytes
+ assertThat(data.limit()).isEqualTo(amountToRead * 2);
+ // we did 2 more reads from the network
+ assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
+ // the next read will start at the end of consumed data
+ assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
}
@@ -154,9 +147,7 @@
nioEngine.lastReadPosition = 10;
- try (final ByteBufferSharing unused =
- nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- }
+ nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 2f9a88a..88e4f31 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -69,7 +69,6 @@
private DMStats mockStats;
private NioSslEngine nioSslEngine;
private NioSslEngine spyNioSslEngine;
- private BufferPool spyBufferPool;
@Before
public void setUp() throws Exception {
@@ -82,17 +81,13 @@
mockStats = mock(DMStats.class);
- final BufferPool bufferPool = new BufferPool(mockStats);
- spyBufferPool = spy(bufferPool);
- nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool);
+ nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
spyNioSslEngine = spy(nioSslEngine);
}
@Test
- public void engineUsesDirectBuffers() throws IOException {
- try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
- assertThat(outputSharing.getBuffer().isDirect()).isTrue();
- }
+ public void engineUsesDirectBuffers() {
+ assertThat(nioSslEngine.myNetData.isDirect()).isTrue();
}
@Test
@@ -124,7 +119,7 @@
verify(mockEngine, atLeast(2)).getHandshakeStatus();
verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
- verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
+ verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
any(ByteBuffer.class), any(Integer.class));
verify(spyNioSslEngine, times(1)).handleBlockingTasks();
verify(mockChannel, times(3)).read(any(ByteBuffer.class));
@@ -188,173 +183,171 @@
.hasMessageContaining("SSL Handshake terminated with status");
}
+
@Test
- public void wrap() throws Exception {
- try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+ public void checkClosed() throws Exception {
+ nioSslEngine.checkClosed();
+ }
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer appData =
- ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
- byte[] appBytes = new byte[appData.capacity()];
- Arrays.fill(appBytes, (byte) 0x1F);
- appData.put(appBytes);
- appData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(
- new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
- spyNioSslEngine.engine = testEngine;
-
- try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) {
- ByteBuffer wrappedBuffer = outputSharing2.getBuffer();
-
- verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
- any(ByteBuffer.class), any(Integer.class));
- appData.flip();
- assertThat(wrappedBuffer).isEqualTo(appData);
- }
- verify(spyNioSslEngine, times(1)).handleBlockingTasks();
- }
+ @Test(expected = IOException.class)
+ public void checkClosedThrows() throws Exception {
+ when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
+ new SSLEngineResult(CLOSED, FINISHED, 0, 100));
+ nioSslEngine.close(mock(SocketChannel.class));
+ nioSslEngine.checkClosed();
}
@Test
- public void wrapFails() throws IOException {
- try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer appData =
- ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
- byte[] appBytes = new byte[appData.capacity()];
- Arrays.fill(appBytes, (byte) 0x1F);
- appData.put(appBytes);
- appData.flip();
+ public void synchObjectIsSelf() {
+ // for thread-safety the synchronization object given to outside entities
+ // must be the the engine itself. This allows external manipulation or
+ // use of the engine's buffers to be protected in the same way as its synchronized
+ // methods
+ assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
+ }
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(
- new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
- spyNioSslEngine.engine = testEngine;
+ @Test
+ public void wrap() throws Exception {
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
+ byte[] appBytes = new byte[appData.capacity()];
+ Arrays.fill(appBytes, (byte) 0x1F);
+ appData.put(appBytes);
+ appData.flip();
- assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
- .hasMessageContaining("Error encrypting data");
- }
+ // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(
+ new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
+ spyNioSslEngine.engine = testEngine;
+
+ ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
+
+ verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
+ any(ByteBuffer.class), any(Integer.class));
+ appData.flip();
+ assertThat(wrappedBuffer).isEqualTo(appData);
+ verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+ }
+
+ @Test
+ public void wrapFails() {
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
+ byte[] appBytes = new byte[appData.capacity()];
+ Arrays.fill(appBytes, (byte) 0x1F);
+ appData.put(appBytes);
+ appData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(
+ new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
+ spyNioSslEngine.engine = testEngine;
+
+ assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+ .hasMessageContaining("Error encrypting data");
}
@Test
public void unwrapWithBufferOverflow() throws Exception {
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- // make the application data too big to fit into the engine's encryption buffer
- final ByteBuffer peerAppData = inputSharing.getBuffer();
+ // make the application data too big to fit into the engine's encryption buffer
+ int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity();
+ int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
+ nioSslEngine.peerAppData.position(originalPeerAppDataPosition);
+ ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
+ byte[] netBytes = new byte[wrappedData.capacity()];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
- int originalPeerAppDataCapacity = peerAppData.capacity();
- int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
- peerAppData.position(originalPeerAppDataPosition);
- ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
- byte[] netBytes = new byte[wrappedData.capacity()];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
+ // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ spyNioSslEngine.engine = testEngine;
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- spyNioSslEngine.engine = testEngine;
+ testEngine.addReturnResult(
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
+ new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
- testEngine.addReturnResult(
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
- new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
-
- int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
- expectedCapacity =
- 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
- expectedCapacity =
- 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
- try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
- ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
- unwrappedBuffer.flip();
- assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
- }
- }
+ int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
+ expectedCapacity =
+ 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+ expectedCapacity =
+ 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+ ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
+ unwrappedBuffer.flip();
+ assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
}
@Test
public void unwrapWithBufferUnderflow() throws Exception {
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- ByteBuffer wrappedData =
- ByteBuffer.allocate(inputSharing.getBuffer().capacity());
- byte[] netBytes = new byte[wrappedData.capacity() / 2];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
+ ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
+ byte[] netBytes = new byte[wrappedData.capacity() / 2];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
- spyNioSslEngine.engine = testEngine;
+ // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
+ spyNioSslEngine.engine = testEngine;
- try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
- ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
- unwrappedBuffer.flip();
- assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
- }
- assertThat(wrappedData.position()).isEqualTo(netBytes.length);
- }
+ ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
+ unwrappedBuffer.flip();
+ assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
+ assertThat(wrappedData.position()).isEqualTo(netBytes.length);
}
@Test
- public void unwrapWithDecryptionError() throws IOException {
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer wrappedData =
- ByteBuffer.allocate(inputSharing.getBuffer().capacity());
- byte[] netBytes = new byte[wrappedData.capacity() / 2];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
+ public void unwrapWithDecryptionError() {
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
+ byte[] netBytes = new byte[wrappedData.capacity() / 2];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
- spyNioSslEngine.engine = testEngine;
+ // create an engine that will transfer bytes from the application buffer to the encrypted buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+ spyNioSslEngine.engine = testEngine;
- assertThatThrownBy(() -> {
- try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) {
- }
- }).isInstanceOf(SSLException.class)
- .hasMessageContaining("Error decrypting data");
- }
+ assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class)
+ .hasMessageContaining("Error decrypting data");
+ }
+
+ @Test
+ public void ensureUnwrappedCapacity() {
+ ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
+ int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
+ ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity);
+ assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
}
@Test
public void unwrapWithClosedEngineButDataInDecryptedBuffer() throws IOException {
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer wrappedData =
- ByteBuffer.allocate(inputSharing.getBuffer().capacity());
- byte[] netBytes = new byte[wrappedData.capacity() / 2];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
- final int arbitraryAmountOfRealData = 31; // bytes
- inputSharing.getBuffer().position(arbitraryAmountOfRealData);
+ final ByteBuffer unwrappedBuffer = nioSslEngine.getUnwrappedBuffer(null);
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer wrappedData =
+ ByteBuffer.allocate(unwrappedBuffer.capacity());
+ byte[] netBytes = new byte[wrappedData.capacity() / 2];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
+ final int arbitraryAmountOfRealData = 31; // bytes
+ unwrappedBuffer.position(arbitraryAmountOfRealData);
- // create an engine that will transfer bytes from the application buffer to the encrypted
- // buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
- spyNioSslEngine.engine = testEngine;
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+ spyNioSslEngine.engine = testEngine;
- try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) {
- assertThat(inputSharing.getBuffer().position()).isEqualTo(arbitraryAmountOfRealData);
- }
- }
+ final ByteBuffer unwrappedBuffer2 = spyNioSslEngine.unwrap(wrappedData);
+ assertThat(unwrappedBuffer2.position()).isEqualTo(arbitraryAmountOfRealData);
}
@Test
@@ -368,11 +361,7 @@
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 0));
nioSslEngine.close(mockChannel);
- assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
- .isInstanceOf(IOException.class)
- .hasMessageContaining("NioSslEngine has been closed");
- assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
- .isInstanceOf(IOException.class)
+ assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class)
.hasMessageContaining("NioSslEngine has been closed");
nioSslEngine.close(mockChannel);
}
@@ -401,12 +390,10 @@
when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
- try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
- // give the NioSslEngine something to write on its socket channel, simulating a TLS close
- // message
- outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
- return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
- }
+ // give the NioSslEngine something to write on its socket channel, simulating a TLS close
+ // message
+ nioSslEngine.myNetData.put("Goodbye cruel world".getBytes());
+ return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
});
when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
nioSslEngine.close(mockChannel);
@@ -437,42 +424,37 @@
ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
SocketChannel mockChannel = mock(SocketChannel.class);
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- // force a compaction by making the decoded buffer appear near to being full
- ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
- unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
- unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
+ // force a compaction by making the decoded buffer appear near to being full
+ ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData;
+ unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
+ unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
- // simulate some socket reads
- when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
- @Override
- public Integer answer(InvocationOnMock invocation) throws Throwable {
- ByteBuffer buffer = invocation.getArgument(0);
- buffer.position(buffer.position() + individualRead);
- return individualRead;
- }
- });
-
- TestSSLEngine testSSLEngine = new TestSSLEngine();
- testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
- nioSslEngine.engine = testSSLEngine;
-
- try (final ByteBufferSharing sharedBuffer =
- nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- ByteBuffer data = sharedBuffer.getBuffer();
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+ // simulate some socket reads
+ when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ ByteBuffer buffer = invocation.getArgument(0);
+ buffer.position(buffer.position() + individualRead);
+ return individualRead;
}
- }
+ });
+
+ TestSSLEngine testSSLEngine = new TestSSLEngine();
+ testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
+ nioSslEngine.engine = testSSLEngine;
+
+ ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
}
/**
- * This tests the case where a message header has been read and part of a message has been read,
- * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast
- * method will have to expand the capacity of the decoded buffer and return the new, expanded,
- * buffer as the method result.
+ * This tests the case where a message header has been read and part of a message has been
+ * read, but the decoded buffer is too small to hold all of the message. In this case
+ * the readAtLeast method will have to expand the capacity of the decoded buffer and return
+ * the new, expanded, buffer as the method result.
*/
@Test
public void readAtLeastUsingSmallAppBuffer() throws Exception {
@@ -486,11 +468,7 @@
int initialUnwrappedBufferSize = 100;
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
- inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
- }
+ nioSslEngine.peerAppData = unwrappedBuffer;
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -510,26 +488,22 @@
new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
nioSslEngine.engine = testSSLEngine;
- try (final ByteBufferSharing sharedBuffer =
- nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- ByteBuffer data = sharedBuffer.getBuffer();
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
- // The initial available space in the unwrapped buffer should have doubled
- int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- assertThat(inputSharing.getBuffer().capacity())
- .isEqualTo(2 * initialFreeSpace + preexistingBytes);
- }
- }
+ ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+ // The initial available space in the unwrapped buffer should have doubled
+ int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
+ assertThat(nioSslEngine.peerAppData.capacity())
+ .isEqualTo(2 * initialFreeSpace + preexistingBytes);
}
/**
- * This tests the case where a message header has been read and part of a message has been read,
- * but the decoded buffer is too small to hold all of the message. In this case the buffer is
- * completely full and should only take one overflow response to resolve the problem.
+ * This tests the case where a message header has been read and part of a message has been
+ * read, but the decoded buffer is too small to hold all of the message. In this case
+ * the buffer is completely full and should only take one overflow response to resolve
+ * the problem.
*/
@Test
public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception {
@@ -544,10 +518,7 @@
// force buffer expansion by making a small decoded buffer appear near to being full
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
- try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
- final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
- inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
- }
+ nioSslEngine.peerAppData = unwrappedBuffer;
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -567,14 +538,11 @@
new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
nioSslEngine.engine = testSSLEngine;
- try (final ByteBufferSharing sharedBuffer =
- nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
- ByteBuffer data = sharedBuffer.getBuffer();
- verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit())
- .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
- }
+ ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit())
+ .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
}
@@ -713,8 +681,8 @@
}
/**
- * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(),
- * the last return result will repeat forever
+ * add an engine operation result to be returned by wrap or unwrap.
+ * Like Mockito's thenReturn(), the last return result will repeat forever
*/
void addReturnResult(SSLEngineResult... sslEngineResult) {
for (SSLEngineResult result : sslEngineResult) {