blob: 10ad070275237285cb8bb3f6a027825fb24403eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/*
* Created on Feb 3, 2006
*
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.client.PoolManager.createFactory;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.PutOp;
import org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
*
* Tests the functionality of operations of AbstractConnectionProxy & its derived classes.
*/
@Category({ClientSubscriptionTest.class})
public class ConnectionProxyJUnitTest {
private static final String expectedRedundantErrorMsg =
"Could not find any server to host redundant client queue.";
private static final String expectedPrimaryErrorMsg =
"Could not find any server to host primary client queue.";
DistributedSystem system;
Cache cache;
PoolImpl proxy = null;
SequenceIdAndExpirationObject seo = null;
final Duration timeoutToVerifyExpiry = Duration.ofSeconds(30);
final Duration timeoutToVerifyAckSend = Duration.ofSeconds(30);
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty(MCAST_PORT, "0");
p.setProperty(LOCATORS, "");
this.system = DistributedSystem.connect(p);
this.cache = CacheFactory.create(system);
final String addExpectedPEM =
"<ExpectedException action=add>" + expectedPrimaryErrorMsg + "</ExpectedException>";
final String addExpectedREM =
"<ExpectedException action=add>" + expectedRedundantErrorMsg + "</ExpectedException>";
system.getLogWriter().info(addExpectedPEM);
system.getLogWriter().info(addExpectedREM);
}
@After
public void tearDown() throws Exception {
this.cache.close();
final String removeExpectedPEM =
"<ExpectedException action=remove>" + expectedPrimaryErrorMsg + "</ExpectedException>";
final String removeExpectedREM =
"<ExpectedException action=remove>" + expectedRedundantErrorMsg + "</ExpectedException>";
system.getLogWriter().info(removeExpectedPEM);
system.getLogWriter().info(removeExpectedREM);
this.system.disconnect();
if (proxy != null)
proxy.destroy();
}
/**
* This test verifies the behaviour of client request when the listener on the server sits
* forever. This is done in following steps:<br>
* 1)create server<br>
* 2)initialize proxy object and create region for client having a CacheListener and make
* afterCreate in the listener to wait infinitely<br>
* 3)perform a PUT on client by acquiring Connection through proxy<br>
* 4)Verify that exception occurs due to infinite wait in the listener<br>
* 5)Verify that above exception occurs sometime after the readTimeout configured for the client
* <br>
*
*/
@Ignore
@Test
public void testListenerOnServerSitForever() throws Exception {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
Region testRegion = null;
CacheServer server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(false);
pf.setSubscriptionRedundancy(-1);
pf.setReadTimeout(2000);
pf.setSocketBufferSize(32768);
pf.setRetryAttempts(1);
pf.setPingInterval(10000);
proxy = (PoolImpl) pf.create("clientPool");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
synchronized (ConnectionProxyJUnitTest.this) {
try {
ConnectionProxyJUnitTest.this.wait();
} catch (InterruptedException e) {
fail("interrupted");
}
}
}
});
RegionAttributes attrs = factory.create();
testRegion = cache.createRegion("testregion", attrs);
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
Connection conn = (proxy).acquireConnection();
long t1 = 0;
try {
t1 = System.currentTimeMillis();
EntryEventImpl event = new EntryEventImpl((Object) null, false);
try {
event.setEventId(new EventID(new byte[] {1}, 1, 1));
PutOp.execute(conn, proxy, testRegion.getFullPath(), "key1", "val1", event, null, false);
} finally {
event.release();
}
fail("Test failed as exception was expected");
} catch (Exception e) {
long t2 = System.currentTimeMillis();
long net = (t2 - t1);
assertTrue(net / 1000 < 5);
}
synchronized (ConnectionProxyJUnitTest.this) {
ConnectionProxyJUnitTest.this.notify();
}
}
/**
* Tests the DeadServerMonitor when identifying an Endpoint as alive , does not create a
* persistent Ping connection ( i.e sends a CLOSE protocol , if the number of connections is zero.
*/
@Test
public void testDeadServerMonitorPingNature1() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
// final int maxWaitTime = 10000;
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(false);
pf.setReadTimeout(2000);
pf.setMinConnections(1);
pf.setSocketBufferSize(32768);
pf.setRetryAttempts(1);
pf.setPingInterval(500);
proxy = (PoolImpl) pf.create("clientPool");
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
try {
(proxy).acquireConnection();
} catch (Exception ok) {
ok.printStackTrace();
}
try {
(proxy).acquireConnection();
} catch (Exception ok) {
ok.printStackTrace();
}
// long start = System.currentTimeMillis();
assertEquals(0, proxy.getConnectedServerCount());
// start the server
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(15000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
GeodeAwaitility.await().untilAsserted(() -> {
assertEquals(1, proxy.getConnectedServerCount());
});
} finally {
if (server != null) {
server.stop();
}
}
}
/**
* Tests the DeadServerMonitor when identifying an Endpoint as alive , does creates a persistent
* Ping connection ( i.e sends a PING protocol , if the number of connections is more than zero.
*/
@Test
public void testDeadServerMonitorPingNature2() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
// final int maxWaitTime = 10000;
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(false);
pf.setReadTimeout(2000);
pf.setMinConnections(1);
pf.setSocketBufferSize(32768);
pf.setRetryAttempts(1);
pf.setPingInterval(500);
proxy = (PoolImpl) pf.create("clientPool");
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
// let LiveServerMonitor detect it as alive as the numConnection is more than zero
// long start = System.currentTimeMillis();
assertEquals(0, proxy.getConnectedServerCount());
// start the server
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(15000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
GeodeAwaitility.await().untilAsserted(() -> {
assertEquals(1, proxy.getConnectedServerCount());
});
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapCreation() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
proxy = (PoolImpl) pf.create("clientPool");
if (proxy.getThreadIdToSequenceIdMap() == null) {
fail(" ThreadIdToSequenceIdMap is null. ");
}
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
fail("interrupted");
}
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapExpiryPositive() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(4000);
pf.setSubscriptionAckInterval(2000);
proxy = (PoolImpl) pf.create("clientPool");
EventID eid = new EventID(new byte[0], 1, 1);
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as it is a new entry");
}
verifyExpiry();
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as the previous entry should have expired ");
}
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapExpiryNegative() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(10000);
proxy = (PoolImpl) pf.create("clientPool");
final EventID eid = new EventID(new byte[0], 1, 1);
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as it is a new entry");
}
GeodeAwaitility.await().untilAsserted(() -> assertTrue(proxy.verifyIfDuplicate(eid)));
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapConcurrency() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(5000);
pf.setSubscriptionAckInterval(2000);
proxy = (PoolImpl) pf.create("clientPool");
final int EVENT_ID_COUNT = 10000; // why 10,000?
EventID[] eid = new EventID[EVENT_ID_COUNT];
for (int i = 0; i < EVENT_ID_COUNT; i++) {
eid[i] = new EventID(new byte[0], i, i);
if (proxy.verifyIfDuplicate(eid[i])) {
fail(" eid can never be duplicate, it is being created for the first time! ");
}
}
verifyExpiry();
for (int i = 0; i < EVENT_ID_COUNT; i++) {
if (proxy.verifyIfDuplicate(eid[i])) {
fail(
" eid can not be found to be duplicate since the entry should have expired! " + i);
}
}
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testDuplicateSeqIdLesserThanCurrentSeqIdBeingIgnored() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(100000);
proxy = (PoolImpl) pf.create("clientPool");
EventID eid1 = new EventID(new byte[0], 1, 5);
if (proxy.verifyIfDuplicate(eid1)) {
fail(" eid1 can never be duplicate, it is being created for the first time! ");
}
EventID eid2 = new EventID(new byte[0], 1, 2);
if (!proxy.verifyIfDuplicate(eid2)) {
fail(" eid2 should be duplicate, seqId is less than highest (5)");
}
EventID eid3 = new EventID(new byte[0], 1, 3);
if (!proxy.verifyIfDuplicate(eid3)) {
fail(" eid3 should be duplicate, seqId is less than highest (5)");
}
assertTrue(!proxy.getThreadIdToSequenceIdMap().isEmpty());
proxy.destroy();
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testCleanCloseOfThreadIdToSeqId() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(100000);
proxy = (PoolImpl) pf.create("clientPool");
EventID eid1 = new EventID(new byte[0], 1, 2);
if (proxy.verifyIfDuplicate(eid1)) {
fail(" eid can never be duplicate, it is being created for the first time! ");
}
EventID eid2 = new EventID(new byte[0], 1, 3);
if (proxy.verifyIfDuplicate(eid2)) {
fail(" eid can never be duplicate, since sequenceId is greater ");
}
if (!proxy.verifyIfDuplicate(eid2)) {
fail(" eid had to be a duplicate, since sequenceId is equal ");
}
EventID eid3 = new EventID(new byte[0], 1, 1);
if (!proxy.verifyIfDuplicate(eid3)) {
fail(" eid had to be a duplicate, since sequenceId is lesser ");
}
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
fail("interrupted");
}
server.stop();
}
}
}
@Test
public void testTwoClientsHavingDifferentThreadIdMaps() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port3);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(-1);
pf.setSubscriptionMessageTrackingTimeout(100000);
PoolImpl proxy1 = (PoolImpl) pf.create("clientPool1");
try {
PoolImpl proxy2 = (PoolImpl) pf.create("clientPool2");
try {
Map map1 = proxy1.getThreadIdToSequenceIdMap();
Map map2 = proxy2.getThreadIdToSequenceIdMap();
assertTrue(!(map1 == map2));
} finally {
proxy2.destroy();
}
} finally {
proxy1.destroy();
}
} catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
} finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testPeriodicAckSendByClient() {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setPort(port);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(1);
pf.setReadTimeout(20000);
pf.setSubscriptionMessageTrackingTimeout(15000);
pf.setSubscriptionAckInterval(5000);
proxy = (PoolImpl) pf.create("clientPool");
EventID eid = new EventID(new byte[0], 1, 1);
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
assertFalse(seo.getAckSend());
// should send the ack to server
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
verifyAckSend(true);
// New update on same threadId
eid = new EventID(new byte[0], 1, 2);
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
assertFalse(seo.getAckSend());
// should send another ack to server
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
verifyAckSend(true);
// should expire with the this mentioned.
verifyExpiry();
} catch (Exception ex) {
ex.printStackTrace();
fail("Test testPeriodicAckSendByClient Failed");
}
} finally {
if (server != null) {
server.stop();
}
}
}
// No ack will be send if Redundancy level = 0
@Test
public void testNoAckSendByClient() {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server = null;
try {
try {
server = this.cache.addCacheServer();
server.setPort(port);
server.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", port);
pf.setSubscriptionEnabled(true);
pf.setSubscriptionRedundancy(1);
pf.setReadTimeout(20000);
pf.setSubscriptionMessageTrackingTimeout(8000);
pf.setSubscriptionAckInterval(2000);
proxy = (PoolImpl) pf.create("clientPool");
EventID eid = new EventID(new byte[0], 1, 1);
if (proxy.verifyIfDuplicate(eid)) {
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
assertFalse(seo.getAckSend());
// should not send an ack as redundancy level = 0;
seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
.get(new ThreadIdentifier(new byte[0], 1));
verifyAckSend(false);
// should expire without sending an ack as redundancy level = 0.
verifyExpiry();
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test testPeriodicAckSendByClient Failed");
}
} finally {
if (server != null) {
server.stop();
}
}
}
private void verifyAckSend(final boolean expectedAckSend) {
GeodeAwaitility.await().timeout(timeoutToVerifyAckSend).untilAsserted(() -> {
assertEquals(expectedAckSend, seo.getAckSend());
});
}
private void verifyExpiry() {
GeodeAwaitility.await().timeout(timeoutToVerifyExpiry).untilAsserted(() -> {
assertEquals(0, proxy.getThreadIdToSequenceIdMap().size());
});
}
}