blob: d5cbf1b3b022637eb479776c1ac870a7fbfd77d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.InternalCacheServer;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Class <code>DurableClientTestCase</code> tests durable client functionality.
*
* @since GemFire 5.2
*/
@Category({ClientSubscriptionTest.class})
public class DurableClientTestCase extends DurableClientTestBase {
/**
* Test that starting a durable client is correctly processed by the server.
*/
@Test
public void testSimpleDurableClient() {
startupDurableClientAndServer(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.closeDurableClient();
}
/**
* Test that starting a durable client is correctly processed by the server. In this test we will
* set gemfire.SPECIAL_DURABLE property to true and will see durableID appended by poolname or
* not
*/
@Test
public void testSpecialDurableProperty() throws InterruptedException {
final Properties jp = new Properties();
jp.setProperty(DistributionConfig.GEMFIRE_PREFIX + "SPECIAL_DURABLE", "true");
try {
server1Port = this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
durableClientId = getName() + "_client";
final String dId = durableClientId + "_gem_" + "CacheServerTestUtil";
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), server1Port, Boolean.TRUE),
regionName, getClientDistributedSystemProperties(durableClientId,
DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT),
Boolean.TRUE, jp));
this.durableClientVM.invoke(() -> {
await().atMost(1 * HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES)
.pollInterval(100, MILLISECONDS)
.until(CacheServerTestUtil::getCache, notNullValue());
});
// Send clientReady message
sendClientReady(durableClientVM);
// Verify durable client on server
this.server1VM.invoke(() -> {
// Find the proxy
checkNumberOfClientProxies(1);
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
// Verify that it is durable and its properties are correct
assertThat(proxy.isDurable()).isTrue();
System.out.println("BRUCE: durableClientId is " + durableClientId);
System.out.println("BRUCE: proxy durable id is " + proxy.getDurableId());
assertThat(durableClientId).isNotEqualTo(proxy.getDurableId());
/*
* new durable id will be like this durableClientId _gem_ //separator client pool name
*/
assertThat(dId).isEqualTo(proxy.getDurableId());
assertThat(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT)
.isEqualTo(proxy.getDurableTimeout());
});
// Stop the durable client
this.disconnectDurableClient(false);
// Verify the durable client is present on the server for closeCache=false case.
this.verifyDurableClientNotPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT,
durableClientId, server1VM);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.closeDurableClient();
} finally {
this.durableClientVM.invoke(() -> CacheServerTestUtil.unsetJavaSystemProperties(jp));
}
}
/**
* Test that starting, stopping then restarting a durable client is correctly processed by the
* server.
*/
@Test
public void testStartStopStartDurableClient() {
startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS);
// Stop the durable client
this.disconnectDurableClient(true);
verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
// Re-start the durable client
this.restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.TRUE);
// Verify durable client on server
verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Test that starting, stopping then restarting a durable client is correctly processed by the
* server. This is a test of bug 39630
*/
@Test
public void test39630() {
startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS);
// Stop the durable client
this.disconnectDurableClient(true);
// Verify the durable client still exists on the server, and the socket is closed
this.server1VM.invoke(() -> {
// Find the proxy
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
assertThat(proxy._socket).isNotNull();
await()
.untilAsserted(() -> assertThat(proxy._socket.isClosed()).isTrue());
});
// Re-start the durable client (this is necessary so the
// netDown test will set the appropriate system properties.
this.restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.TRUE);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Test that disconnecting a durable client for longer than the timeout period is correctly
* processed by the server.
*/
@Test
public void testStartStopTimeoutDurableClient() {
final int durableClientTimeout = 5;
startupDurableClientAndServer(durableClientTimeout);
// Stop the durable client
this.disconnectDurableClient(true);
// Verify it no longer exists on the server
this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
@Override
public void run2() throws CacheException {
// Find the proxy
checkNumberOfClientProxies(0);
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNull();
}
});
this.restartDurableClient(durableClientTimeout, Boolean.TRUE);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Test that a durable client correctly receives updates after it reconnects.
*/
@Test
public void testDurableClientPrimaryUpdate() {
startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS);
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
// Start normal publisher client
this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), server1Port,
false),
regionName));
// Publish some entries
publishEntries(0, 1);
// Wait until queue count is 0 on server1VM
waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 0);
// Verify the durable client received the updates
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
// Stop the durable client
this.disconnectDurableClient(true);
// Make sure the proxy is actually paused, not dispatching
this.server1VM.invoke(DurableClientTestBase::waitForCacheClientProxyPaused);
// Publish some more entries
publishEntries(1, 1);
// Verify the durable client's queue contains the entries
waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 1);
// Verify that disconnected client does not receive any events.
this.verifyListenerUpdatesDisconnected(1);
// Re-start the durable client
this.restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Verify the durable client received the updates held for it on the server
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the durable client VM
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Test that a durable client correctly receives updates after it reconnects.
*/
@Test
public void testStartStopStartDurableClientUpdate() {
startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS);
// Have the durable client register interest in all keys
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
// Start normal publisher client
this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), server1Port,
false),
regionName));
// Publish some entries
publishEntries(0, 1);
// Verify the durable client received the updates
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
// Stop the durable client
this.disconnectDurableClient(true);
// Verify the durable client still exists on the server
this.server1VM.invoke(DurableClientTestBase::waitForCacheClientProxyPaused);
// Publish some entries
publishEntries(1, 1);
// Verify the durable client's queue contains the entries
this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
@Override
public void run2() throws CacheException {
// Find the proxy
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
// Verify the queue size
assertThat(1).isEqualTo(proxy.getQueueSize());
}
});
// Verify that disconnected client does not receive any events.
this.verifyListenerUpdatesDisconnected(1);
// Re-start the durable client
this.restartDurableClient(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, Boolean.TRUE);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Verify the durable client received the updates held for it on the server
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the durable client VM
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Test whether a durable client reconnects properly to a server that is stopped and restarted.
*/
@Test
public void testDurableClientConnectServerStopStart() {
// Start a server
// Start server 1
Integer[] ports = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServerReturnPorts(regionName, Boolean.TRUE));
final int serverPort = ports[0];
// Start a durable client that is not kept alive on the server when it
// stops normally
final String durableClientId = getName() + "_client";
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort, true),
regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getClientCache().readyForEvents();
}
});
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Re-start the server
this.server1VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
serverPort));
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Start a publisher
this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), serverPort,
false),
regionName));
// Publish some entries
publishEntries(0, 10);
// Verify the durable client received the updates
checkListenerEvents(10, 1, -1, this.durableClientVM);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
@Test
public void testDurableNonHAFailover() {
durableFailover(0);
durableFailoverAfterReconnect(0);
}
@Test
public void testDurableHAFailover() {
// Clients see this when the servers disconnect
IgnoredException.addIgnoredException("Could not find any server");
durableFailover(1);
durableFailoverAfterReconnect(1);
}
/**
* Test a durable client with 2 servers where the client fails over from one to another server
* with a publisher/feeder performing operations and the client verifying updates received.
* Redundancy level is set to 1 for this test case.
*/
private void durableFailover(int redundancyLevel) {
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Stop server 2
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Start a durable client
durableClientId = getName() + "_client";
Pool clientPool;
if (redundancyLevel == 1) {
clientPool = getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, true);
} else {
clientPool = getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, true, 0);
}
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(clientPool, regionName,
getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getClientCache().readyForEvents();
}
});
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
// Re-start server2
this.server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
server2Port));
// Start normal publisher client
this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, false),
regionName));
// Publish some entries
publishEntries(0, 1);
// Verify the durable client received the updates
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
// Stop the durable client, which discards the known entry
this.disconnectDurableClient(true);
// Publish updates during client downtime
publishEntries(1, 1);
// Re-start the durable client that is kept alive on the server
this.restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, Boolean.TRUE);
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
publishEntries(2, 1);
// Verify the durable client received the updates before failover
this.checkListenerEvents(2, 1, -1, this.durableClientVM);
this.durableClientVM.invoke(new CacheSerializableRunnable("Get") {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertThat(region).isNotNull();
assertThat(region.getEntry("0")).isNull();
assertThat(region.getEntry("2")).isNotNull();
}
});
// Stop server 1
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Verify durable client failed over if redundancyLevel=0
if (redundancyLevel == 0) {
this.server2VM.invoke(() -> verifyClientHasConnected());
}
publishEntries(3, 1);
// Verify the durable client received the updates after failover
this.checkListenerEvents(3, 1, -1, this.durableClientVM);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop server 2
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
private void waitUntilQueueContainsRequiredNumberOfEvents(final VM vm,
final int requiredEntryCount) {
vm.invoke(new CacheSerializableRunnable("Verify durable client") {
@Override
public void run2() throws CacheException {
await().until(() -> {
CacheClientProxy proxy = getClientProxy();
if (proxy == null) {
return false;
}
// Verify the queue size
int sz = proxy.getQueueSize();
return requiredEntryCount == sz;
});
}
});
}
private void durableFailoverAfterReconnect(int redundancyLevel) {
// Start server 1
server1Port = this.server1VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, true));
int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, true));
// Start a durable client
durableClientId = getName() + "_client";
Pool clientPool;
if (redundancyLevel == 1) {
clientPool = getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, true);
} else {
clientPool = getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, true, 0);
}
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(clientPool, regionName,
getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS),
Boolean.TRUE));
// Send clientReady message
this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@Override
public void run2() throws CacheException {
CacheServerTestUtil.getCache().readyForEvents();
}
});
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
// Start normal publisher client
this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
getClientPool(NetworkUtils.getServerHostName(), server1Port,
server2Port, false),
regionName));
// Publish some entries
publishEntries(0, 1);
// Verify the durable client received the updates
this.checkListenerEvents(1, 1, -1, this.durableClientVM);
verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Stop server 1 - publisher will put 10 entries during shutdown/primary identification
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Publish updates during client downtime
publishEntries(1, 1);
// Re-start the durable client that is kept alive on the server
this.restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, clientPool, Boolean.TRUE);
registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE);
publishEntries(2, 2);
// Verify the durable client received the updates before failover
if (redundancyLevel == 1) {
this.checkListenerEvents(4, 1, -1, this.durableClientVM);
} else {
this.checkListenerEvents(2, 1, -1, this.durableClientVM);
}
this.durableClientVM.invoke(new CacheSerializableRunnable("Get") {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = CacheServerTestUtil.getCache().getRegion(regionName);
assertThat(region).isNotNull();
// Register interest in all keys
assertThat(region.getEntry("0")).isNull();
}
});
publishEntries(4, 1);
// Verify the durable client received the updates after failover
if (redundancyLevel == 1) {
this.checkListenerEvents(5, 1, -1, this.durableClientVM);
} else {
this.checkListenerEvents(3, 1, -1, this.durableClientVM);
}
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop server 2
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
private void verifyClientHasConnected() {
CacheServer cacheServer = CacheServerTestUtil.getCache().getCacheServers().get(0);
CacheClientNotifier ccn =
((InternalCacheServer) cacheServer).getAcceptor().getCacheClientNotifier();
await().until(() -> ccn.getClientProxies().size() == 1);
}
}