blob: b8708e4850e57e21d7d75e12ba2a4e3dde3a8142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category(ClientServerTest.class)
public class ServerConnectionIntegrationTest {
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
private AcceptorImpl acceptor;
private Socket socket;
private InternalCache cache;
private SecurityService securityService;
private CacheServerStats stats;
@Before
public void setUp() throws IOException {
InetAddress inetAddress = mock(InetAddress.class);
acceptor = mock(AcceptorImpl.class);
socket = mock(Socket.class);
cache = mock(InternalCache.class);
securityService = mock(SecurityService.class);
stats = mock(CacheServerStats.class);
when(inetAddress.getHostAddress()).thenReturn("localhost");
when(socket.getInetAddress()).thenReturn(inetAddress);
}
/**
* This test sets up a TestConnection which will register with the ClientHealthMonitor and then
* block waiting to receive a fake message. This message will arrive just after the health monitor
* times out this connection and kills it. The test then makes sure that the connection correctly
* handles the terminated state and exits.
*/
@Test
public void terminatingConnectionHandlesNewRequestsGracefully() {
ClientHealthMonitor.createInstance(cache, 100, mock(CacheClientNotifierStats.class));
ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));
when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
TestServerConnection testServerConnection =
new TestServerConnection(socket, cache, mock(CachedRegionHelper.class), stats, 0, 0, null,
CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
assertThatCode(() -> testServerConnection.run()).doesNotThrowAnyException();
}
private static class TestMessage extends Message {
private final Lock lock = new ReentrantLock();
private final Condition testGate = lock.newCondition();
private volatile boolean signalled;
TestMessage() {
super(3, Version.CURRENT);
messageType = MessageType.REQUEST;
securePart = new Part();
}
@Override
public void receive() {
try {
lock.lock();
testGate.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
if (!signalled) {
fail("Message never received continueProcessing call");
}
}
}
void continueProcessing() {
lock.lock();
testGate.signalAll();
signalled = true;
lock.unlock();
}
}
private static class TestServerConnection extends OriginalServerConnection {
private volatile TestMessage testMessage;
/**
* Creates a new {@code ServerConnection} that processes messages received from an edge
* client over a given {@code Socket}.
*/
TestServerConnection(Socket socket, InternalCache internalCache,
CachedRegionHelper cachedRegionHelper, CacheServerStats stats, int hsTimeout,
int socketBufferSize, String communicationModeStr, byte communicationMode,
Acceptor acceptor, SecurityService securityService) {
super(socket, internalCache, cachedRegionHelper, stats, hsTimeout, socketBufferSize,
communicationModeStr, communicationMode, acceptor, securityService);
// Not clear where this is supposed to be set in the timeout path
setClientDisconnectCleanly();
}
@Override
protected void doHandshake() {
ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
ServerSideHandshake handshake = mock(ServerSideHandshake.class);
MessageIdExtractor extractor = mock(MessageIdExtractor.class);
when(handshake.getVersion()).thenReturn(Version.CURRENT);
when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
setHandshake(handshake);
setProxyId(proxyID);
processHandShake();
initializeCommands();
setFakeRequest();
setMessageIdExtractor(extractor);
}
@Override
void handleTermination(boolean timedOut) {
super.handleTermination(timedOut);
testMessage.continueProcessing();
}
private void setFakeRequest() {
testMessage = new TestMessage();
setRequestMessage(testMessage);
}
}
}