blob: b3d5dc5336e503c379bf593ad621b9fbdd522f4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static junit.framework.TestCase.assertNotNull;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.util.internal.GeodeGlossary;
/**
* Start client 1 Start client 2 Start Server 1 Start Server 2 Register interest for client 1 on
* Server 1/2 Kill Server 1 Verify that interest fails over to Server 2 Restart Server 1 Do a put
* which goes against Server 1 Verify that Client 1 does not get the update Verify that Client 2
* does get the update
*
* The key is to verify that the memberid being used by the client to register with the server is
* the same across servers
*/
@Category({ClientSubscriptionTest.class})
public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
private static final String REGION_NAME = "UpdatePropagationDUnitTest_region";
private VM server1 = null;
private VM server2 = null;
private VM client1 = null;
private VM client2 = null;
private int PORT1;
private int PORT2;
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
// Server1 VM
server1 = host.getVM(0);
// Server2 VM
server2 = host.getVM(1);
// Client 1 VM
client1 = host.getVM(2);
// client 2 VM
client2 = host.getVM(3);
PORT1 = server1.invoke(() -> createServerCache());
PORT2 = server2.invoke(() -> createServerCache());
client1.invoke(
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
client2.invoke(
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
IgnoredException.addIgnoredException("java.net.SocketException");
IgnoredException.addIgnoredException("Unexpected IOException");
}
/**
* This tests whether the updates are received by other clients or not , if there are situation of
* Interest List fail over
*/
@Test
public void updatesAreProgegatedAfterFailover() {
// First create entries on both servers via the two client
client1.invoke(() -> createEntriesK1andK2());
client2.invoke(() -> createEntriesK1andK2());
client1.invoke(() -> registerKeysK1andK2());
client2.invoke(() -> registerKeysK1andK2());
// Induce fail over of InteretsList Endpoint to Server 2 by killing server1
server1.invoke(() -> killServer(new Integer(PORT1)));
// Wait for 10 seconds to allow fail over. This would mean that Interstist has failed
// over to Server2.
final CacheSerializableRunnable waitToDetectDeadServer =
new CacheSerializableRunnable("Wait for server on port1 to be dead") {
@Override
public void run2() throws CacheException {
Region r = getCache().getRegion(REGION_NAME);
String poolName = r.getAttributes().getPoolName();
final PoolImpl pool = (PoolImpl) PoolManager.find(poolName);
await()
.until(() -> !hasEndPointWithPort(pool, PORT1));
}
};
client1.invoke(waitToDetectDeadServer);
client2.invoke(waitToDetectDeadServer);
// Start Server1 again so that both clients1 & Client 2 will establish connection to server1
// too.
server1.invoke(() -> startServer(new Integer(PORT1)));
final CacheSerializableRunnable waitToDetectLiveServer =
new CacheSerializableRunnable("Wait for servers to be alive") {
@Override
public void run2() throws CacheException {
Region r = getCache().getRegion(REGION_NAME);
String poolName = r.getAttributes().getPoolName();
final PoolImpl pool = (PoolImpl) PoolManager.find(poolName);
await()
.until(() -> hasEndPointWithPort(pool, PORT1));
}
};
client1.invoke(waitToDetectLiveServer);
client2.invoke(waitToDetectLiveServer);
// Do a put on Server1 via Connection object from client1.
// Client1 should not receive updated value while client2 should receive
client1.invoke(
() -> acquireConnectionsAndPutonK1andK2(NetworkUtils.getServerHostName(client1.getHost())));
// Check if both the puts ( on key1 & key2 ) have reached the servers
server1.invoke(() -> verifyUpdates());
server2.invoke(() -> verifyUpdates());
// verify updates to other client
client2.invoke(() -> verifyUpdates());
// verify no updates for update originator
client1.invoke(() -> verifySenderUpdateCount());
}
/**
* Check to see if a client is connected to an endpoint with a specific port
*/
private boolean hasEndPointWithPort(final PoolImpl pool, final int port) {
EndpointManager endpointManager = pool.getEndpointManager();
final Set<ServerLocationAndMemberId> slAndMemberIds = endpointManager.getEndpointMap().keySet();
return slAndMemberIds.stream()
.anyMatch(slAndMemberId -> slAndMemberId.getServerLocation().getPort() == port);
}
private void acquireConnectionsAndPutonK1andK2(String host) {
Region r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
r1.put("key1", "server-value1");
r1.put("key2", "server-value2");
}
private void killServer(Integer port) {
Iterator iter = getCache().getCacheServers().iterator();
if (iter.hasNext()) {
CacheServer server = (CacheServer) iter.next();
if (server.getPort() == port.intValue()) {
server.stop();
}
}
}
private void startServer(Integer port) throws IOException {
CacheServer server1 = getCache().addCacheServer();
server1.setPort(port.intValue());
server1.setNotifyBySubscription(true);
server1.start();
}
/**
* Creates entries on the server
*/
private void createEntriesK1andK2() {
Region r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey("key1")) {
r1.put("key1", "key-1");
}
if (!r1.containsKey("key2")) {
r1.put("key2", "key-2");
}
assertEquals(r1.get("key1"), "key-1");
if (r1.getAttributes().getPartitionAttributes() == null) {
assertEquals(r1.getEntry("key1").getValue(), "key-1");
assertEquals(r1.getEntry("key2").getValue(), "key-2");
} else {
assertEquals(r1.get("key1"), "key-1");
assertEquals(r1.get("key2"), "key-2");
}
}
private void createClientCache(String host, Integer port1, Integer port2) throws Exception {
ClientCache cache;
try {
System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "PoolImpl.DISABLE_RANDOM", "true");
int PORT1 = port1.intValue();
int PORT2 = port2.intValue();
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(host, PORT1).addPoolServer(host, PORT2).setPoolSubscriptionEnabled(true)
.setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
.setPoolReadTimeout(2000).setPoolPingInterval(300);
cache = getClientCache(cf);
} finally {
System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "PoolImpl.DISABLE_RANDOM", "false");
CacheServerTestUtil.enableShufflingOfEndpoints();
}
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
.addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME);
}
private Integer createServerCache() throws Exception {
Cache cache = getCache();
RegionAttributes attrs = createCacheServerAttributes();
cache.createRegion(REGION_NAME, attrs);
CacheServer server = cache.addCacheServer();
assertNotNull(server);
int port = getRandomAvailableTCPPort();
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
protected RegionAttributes createCacheServerAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
return factory.create();
}
private void registerKeysK1andK2() {
Region r = getCache().getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add("key1");
list.add("key2");
r.registerInterest(list);
}
private void verifySenderUpdateCount() {
Region r = getCache().getRegion(SEPARATOR + REGION_NAME);
EventTrackingCacheListener listener =
(EventTrackingCacheListener) r.getAttributes().getCacheListeners()[0];
final List<EntryEvent> events = listener.receivedEvents;
// We only expect to see 1 create and 1 update from the original put
assertEquals("Expected only 2 events for key1", 2,
events.stream().filter(event -> event.getKey().equals("key1")).count());
assertEquals("Expected only 2 events for key2", 2,
events.stream().filter(event -> event.getKey().equals("key2")).count());
}
private void verifyUpdates() {
await().untilAsserted(() -> {
Region r = getCache().getRegion(SEPARATOR + REGION_NAME);
// verify updates
if (r.getAttributes().getPartitionAttributes() == null) {
assertEquals("server-value2", r.getEntry("key2").getValue());
assertEquals("server-value1", r.getEntry("key1").getValue());
} else {
assertEquals("server-value2", r.get("key2"));
assertEquals("server-value1", r.get("key1"));
}
});
}
private static class EventTrackingCacheListener extends CacheListenerAdapter {
List<EntryEvent> receivedEvents = new ArrayList<>();
@Override
public void afterCreate(final EntryEvent event) {
receivedEvents.add(event);
}
@Override
public void afterUpdate(final EntryEvent event) {
receivedEvents.add(event);
}
@Override
public void afterDestroy(final EntryEvent event) {
receivedEvents.add(event);
}
}
}