| /* |
| * ========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| * ======================================================================== |
| */ |
| package com.gemstone.gemfire.management; |
| |
| import java.util.Properties; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionFactory; |
| import com.gemstone.gemfire.cache.RegionShortcut; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.client.ClientCache; |
| import com.gemstone.gemfire.cache.client.ClientCacheFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionShortcut; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.cache.util.CacheListenerAdapter; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.OSProcess; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.ha.Bug48571DUnitTest; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| import dunit.DistributedTestCase.WaitCriterion; |
| |
| /** |
| * Client health stats check |
| * |
| * @author rishim |
| * |
| */ |
| public class ClientHealthStatsDUnitTest extends DistributedTestCase { |
| |
| private static final String k1 = "k1"; |
| |
| private static final String k2 = "k2"; |
| |
| private static final String client_k1 = "client-k1"; |
| |
| private static final String client_k2 = "client-k2"; |
| |
| /** name of the test region */ |
| private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region"; |
| |
| private static VM server = null; |
| |
| private static VM client = null; |
| |
| private static VM client2 = null; |
| |
| private static VM managingNode = null; |
| |
| private static ManagementTestBase helper = new ManagementTestBase("ClientHealthStatsDUnitTest_Helper"); |
| |
| private static int numOfCreates = 0; |
| private static int numOfUpdates = 0; |
| private static int numOfInvalidates = 0; |
| private static boolean lastKeyReceived = false; |
| |
| private static GemFireCacheImpl cache = null; |
| |
| public ClientHealthStatsDUnitTest(String name) { |
| super(name); |
| } |
| |
| public void setUp() throws Exception { |
| disconnectAllFromDS(); |
| super.setUp(); |
| final Host host = Host.getHost(0); |
| managingNode = host.getVM(0); |
| server = host.getVM(1); |
| client = host.getVM(2); |
| client2 = host.getVM(3); |
| } |
| |
| public void tearDown2() throws Exception { |
| super.tearDown2(); |
| reset(); |
| helper.closeCache(managingNode); |
| helper.closeCache(client); |
| helper.closeCache(client2); |
| helper.closeCache(server); |
| |
| disconnectAllFromDS(); |
| } |
| |
| public static void reset() throws Exception { |
| lastKeyReceived = false; |
| numOfCreates = 0; |
| numOfUpdates = 0; |
| numOfInvalidates = 0; |
| } |
| |
| private static final long serialVersionUID = 1L; |
| |
| public void testClientHealthStats_SubscriptionEnabled() throws Exception { |
| |
| helper.createManagementCache(managingNode); |
| helper.startManagingNode(managingNode); |
| |
| int port = (Integer) server.invoke(ClientHealthStatsDUnitTest.class, "createServerCache"); |
| |
| DistributedMember serverMember = helper.getMember(server); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 1, true, false}); |
| |
| client2.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 2, true, false}); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| client2.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| |
| managingNode.invoke(ClientHealthStatsDUnitTest.class, "verifyClientStats", new Object[] {serverMember, port, 2}); |
| helper.stopManagingNode(managingNode); |
| } |
| |
| public void testClientHealthStats_SubscriptionDisabled() throws Exception { |
| |
| helper.createManagementCache(managingNode); |
| helper.startManagingNode(managingNode); |
| |
| int port = (Integer) server.invoke(ClientHealthStatsDUnitTest.class, "createServerCache"); |
| |
| DistributedMember serverMember = helper.getMember(server); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 1, false, false}); |
| |
| client2.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 2, false, false}); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| client2.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| |
| managingNode.invoke(ClientHealthStatsDUnitTest.class, "verifyClientStats", new Object[] {serverMember, port, 0}); |
| helper.stopManagingNode(managingNode); |
| } |
| |
| public void testClientHealthStats_DurableClient() throws Exception { |
| |
| helper.createManagementCache(managingNode); |
| helper.startManagingNode(managingNode); |
| |
| int port = (Integer) server.invoke(ClientHealthStatsDUnitTest.class, "createServerCache"); |
| |
| DistributedMember serverMember = helper.getMember(server); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 1, true, true}); |
| |
| client2.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 2, true, true}); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| client2.invoke(ClientHealthStatsDUnitTest.class, "put"); |
| |
| client.invoke(ClientHealthStatsDUnitTest.class, "closeClientCache"); |
| |
| client2.invoke(ClientHealthStatsDUnitTest.class, "closeClientCache"); |
| |
| managingNode.invoke(ClientHealthStatsDUnitTest.class, "verifyClientStats", new Object[] {serverMember, port, 2}); |
| helper.stopManagingNode(managingNode); |
| } |
| |
| public void testStatsMatchWithSize() throws Exception { |
| // start a server |
| int port = (Integer) server.invoke(ClientHealthStatsDUnitTest.class, "createServerCache"); |
| // create durable client, with durable RI |
| client.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 1, true, false}); |
| // do puts on server from three different threads, pause after 500 puts each. |
| server.invoke(ClientHealthStatsDUnitTest.class, "doPuts"); |
| // close durable client |
| client.invoke(ClientHealthStatsDUnitTest.class, "closeClientCache"); |
| // resume puts on server, add another 100. |
| server.invokeAsync(ClientHealthStatsDUnitTest.class, "resumePuts"); |
| // start durable client |
| client.invoke(ClientHealthStatsDUnitTest.class, "createClientCache", new Object[] {server.getHost(), port, 1, true, false}); |
| // wait for full queue dispatch |
| client.invoke(ClientHealthStatsDUnitTest.class, "waitForLastKey"); |
| // verify the stats |
| server.invoke(ClientHealthStatsDUnitTest.class, "verifyStats",new Object[] {port}); |
| } |
| |
| public static int createServerCache() throws Exception { |
| Cache cache = helper.createCache(false); |
| |
| RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE); |
| rf.setConcurrencyChecksEnabled(false); |
| rf.create(REGION_NAME); |
| |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer server1 = cache.addCacheServer(); |
| server1.setPort(port); |
| server1.start(); |
| return server1.getPort(); |
| } |
| |
| |
| |
| public static void closeClientCache() throws Exception { |
| cache.close(true); |
| } |
| |
| public static void createClientCache(Host host, Integer port, int clientNum, boolean subscriptionEnabled, boolean durable) throws Exception { |
| |
| Properties props = new Properties(); |
| props.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME, "durable-"+clientNum); |
| props.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, "300000"); |
| |
| props.setProperty("log-file", testName+"_client_" + clientNum + ".log"); |
| props.setProperty("log-level", "info"); |
| props.setProperty("statistic-archive-file", testName+"_client_" + clientNum |
| + ".gfs"); |
| props.setProperty("statistic-sampling-enabled", "true"); |
| |
| ClientCacheFactory ccf = new ClientCacheFactory(props); |
| if(subscriptionEnabled){ |
| ccf.setPoolSubscriptionEnabled(true); |
| ccf.setPoolSubscriptionAckInterval(50); |
| ccf.setPoolSubscriptionRedundancy(0); |
| } |
| |
| if(durable){ |
| ccf.set("durable-client-id", "DurableClientId_"+clientNum); |
| ccf.set("durable-client-timeout", "" + 300); |
| } |
| |
| ccf.addPoolServer(host.getHostName(), port); |
| cache = (GemFireCacheImpl) ccf.create(); |
| |
| ClientRegionFactory<String, String> crf = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); |
| crf.setConcurrencyChecksEnabled(false); |
| |
| crf.addCacheListener(new CacheListenerAdapter<String, String>() { |
| public void afterInvalidate(EntryEvent<String, String> event) { |
| cache.getLoggerI18n().fine( |
| "Invalidate Event: " + event.getKey() + ", " + event.getNewValue()); |
| numOfInvalidates++; |
| } |
| public void afterCreate(EntryEvent<String, String> event) { |
| if (((String) event.getKey()).equals("last_key")) { |
| lastKeyReceived = true; |
| } |
| cache.getLoggerI18n().fine( |
| "Create Event: " + event.getKey() + ", " + event.getNewValue()); |
| numOfCreates++; |
| } |
| public void afterUpdate(EntryEvent<String, String> event) { |
| cache.getLoggerI18n().fine( |
| "Update Event: " + event.getKey() + ", " + event.getNewValue()); |
| numOfUpdates++; |
| } |
| }); |
| |
| Region<String, String> r = crf.create(REGION_NAME); |
| if(subscriptionEnabled){ |
| r.registerInterest("ALL_KEYS", true); |
| cache.readyForEvents(); |
| } |
| |
| } |
| |
| public static void doPuts() throws Exception { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| final Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME); |
| Thread t1 = new Thread(new Runnable() { |
| public void run() { |
| for (int i = 0; i < 500; i++) { |
| r.put("T1_KEY_"+i, "VALUE_"+i); |
| } |
| } |
| }); |
| Thread t2 = new Thread(new Runnable() { |
| public void run() { |
| for (int i = 0; i < 500; i++) { |
| r.put("T2_KEY_"+i, "VALUE_"+i); |
| } |
| } |
| }); |
| Thread t3 = new Thread(new Runnable() { |
| public void run() { |
| for (int i = 0; i < 500; i++) { |
| r.put("T3_KEY_"+i, "VALUE_"+i); |
| } |
| } |
| }); |
| |
| t1.start(); |
| t2.start(); |
| t3.start(); |
| |
| t1.join(); |
| t2.join(); |
| t3.join(); |
| } |
| |
| public static void resumePuts() { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME); |
| for (int i = 0; i < 100; i++) { |
| r.put("NEWKEY_"+i, "NEWVALUE_"+i); |
| } |
| r.put("last_key", "last_value"); |
| } |
| |
| public static void waitForLastKey() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return lastKeyReceived; |
| } |
| @Override |
| public String description() { |
| return "Did not receive last key."; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(wc, 60*1000, 500, true); |
| } |
| |
| |
| @SuppressWarnings("serial") |
| protected static DistributedMember getMember() throws Exception { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| return cache.getDistributedSystem().getDistributedMember(); |
| } |
| |
| protected static void verifyClientStats(DistributedMember serverMember, int serverPort, int numSubscriptions) { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| try { |
| ManagementService service = ManagementService.getExistingManagementService(cache); |
| CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); |
| |
| String[] clientIds = bean.getClientIds(); |
| assertTrue(clientIds.length == 2); |
| getLogWriter().info("<ExpectedString> ClientId-1 of the Server is " + clientIds[0] + "</ExpectedString> "); |
| getLogWriter().info("<ExpectedString> ClientId-2 of the Server is " + clientIds[1] + "</ExpectedString> "); |
| |
| ClientHealthStatus[] clientStatuses = bean.showAllClientStats(); |
| |
| |
| |
| ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]); |
| ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]); |
| assertNotNull(clientStatus1); |
| assertNotNull(clientStatus2); |
| getLogWriter().info("<ExpectedString> ClientStats-1 of the Server is " + clientStatus1 + "</ExpectedString> "); |
| getLogWriter().info("<ExpectedString> ClientStats-2 of the Server is " + clientStatus2 + "</ExpectedString> "); |
| |
| getLogWriter().info("<ExpectedString> clientStatuses " + clientStatuses + "</ExpectedString> "); |
| assertNotNull(clientStatuses); |
| |
| assertTrue(clientStatuses.length == 2); |
| for (ClientHealthStatus status : clientStatuses) { |
| getLogWriter().info("<ExpectedString> ClientStats of the Server is " + status + "</ExpectedString> "); |
| |
| } |
| |
| |
| DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); |
| assertEquals(2, dsBean.getNumClients()); |
| assertEquals(numSubscriptions, dsBean.getNumSubscriptions()); |
| |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail("Error while verifying cache server from remote member " + e); |
| } |
| |
| } |
| |
| protected static void put() { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| |
| r1.put(k1, client_k1); |
| assertEquals(r1.getEntry(k1).getValue(), client_k1); |
| r1.put(k2, client_k2); |
| assertEquals(r1.getEntry(k2).getValue(), client_k2); |
| try { |
| Thread.sleep(10000); |
| } catch (Exception e) { |
| // sleep |
| } |
| r1.clear(); |
| |
| r1.put(k1, client_k1); |
| assertEquals(r1.getEntry(k1).getValue(), client_k1); |
| r1.put(k2, client_k2); |
| assertEquals(r1.getEntry(k2).getValue(), client_k2); |
| try { |
| Thread.sleep(10000); |
| } catch (Exception e) { |
| // sleep |
| } |
| r1.clear(); |
| |
| } |
| |
| |
| public static void verifyStats(int serverPort) throws Exception { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| ManagementService service = ManagementService.getExistingManagementService(cache); |
| CacheServerMXBean serverBean = service.getLocalCacheServerMXBean(serverPort); |
| CacheClientNotifier ccn = CacheClientNotifier.getInstance(); |
| CacheClientProxy ccp = ccn.getClientProxies().iterator().next(); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSize() " + ccp.getQueueSize()); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSizeStat() " + ccp.getQueueSizeStat()); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getEventsEnqued() " + ccp.getHARegionQueue().getStatistics().getEventsEnqued()); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getEventsDispatched() " + ccp.getHARegionQueue().getStatistics().getEventsDispatched()); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getEventsRemoved() " + ccp.getHARegionQueue().getStatistics().getEventsRemoved()); |
| cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getNumVoidRemovals() " + ccp.getHARegionQueue().getStatistics().getNumVoidRemovals()); |
| assertEquals(ccp.getQueueSize(), ccp.getQueueSizeStat()); |
| ClientQueueDetail queueDetails = serverBean.showClientQueueDetails()[0]; |
| assertEquals(queueDetails.getQueueSize(), ccp.getQueueSizeStat()); |
| } |
| |
| } |