| /* |
| * ========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| * ======================================================================== |
| */ |
| package com.gemstone.gemfire.management.internal.pulse; |
| |
| import java.util.Properties; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.management.CacheServerMXBean; |
| import com.gemstone.gemfire.management.MBeanUtil; |
| import com.gemstone.gemfire.management.ManagementTestBase; |
| import com.gemstone.gemfire.management.internal.cli.CliUtil; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.SerializableCallable; |
| import dunit.SerializableRunnable; |
| import dunit.VM; |
| import dunit.DistributedTestCase.WaitCriterion; |
| |
| /** |
| * This is for testing client IDs |
| * @author Ajay Pande |
| * |
| */ |
| |
| public class TestClientIdsDUnitTest extends DistributedTestCase { |
| |
| private static final String k1 = "k1"; |
| private static final String k2 = "k2"; |
| |
| private static final String client_k1 = "client-k1"; |
| |
| private static final String client_k2 = "client-k2"; |
| |
| /** name of the test region */ |
| private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region"; |
| |
| private static VM server = null; |
| |
| private static VM client = null; |
| |
| private static VM client2 = null; |
| |
| private static VM managingNode = null; |
| |
| private ManagementTestBase helper; |
| |
| public TestClientIdsDUnitTest(String name) { |
| super(name); |
| this.helper = new ManagementTestBase(name); |
| } |
| |
| public void setUp() throws Exception { |
| super.setUp(); |
| final Host host = Host.getHost(0); |
| managingNode = host.getVM(0); |
| server = host.getVM(1); |
| client = host.getVM(2); |
| client2 = host.getVM(3); |
| } |
| |
| public void tearDown2() throws Exception { |
| super.tearDown2(); |
| |
| helper.closeCache(managingNode); |
| helper.closeCache(server); |
| helper.closeCache(client); |
| helper.closeCache(client2); |
| |
| disconnectFromDS(); |
| |
| |
| } |
| |
| private static final long serialVersionUID = 1L; |
| |
| public void testClientIds() throws Exception { |
| helper.createManagementCache(managingNode); |
| helper.startManagingNode(managingNode); |
| int port = (Integer) createServerCache(server); |
| DistributedMember serverMember = helper.getMember(server); |
| createClientCache(client, getServerHostName(server.getHost()), port); |
| createClientCache(client2, getServerHostName(server.getHost()), port); |
| put(client); |
| put(client2); |
| verifyClientIds(managingNode, serverMember, port); |
| helper.stopManagingNode(managingNode); |
| } |
| |
| @SuppressWarnings("serial") |
| private Object createServerCache(VM vm) { |
| return vm.invoke(new SerializableCallable("Create Server Cache") { |
| public Object call() { |
| try { |
| return createServerCache(); |
| } catch (Exception e) { |
| fail("Error while createServerCache " + e); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| @SuppressWarnings("serial") |
| private void createClientCache(VM vm, final String host, final Integer port1) { |
| vm.invoke(new SerializableCallable("Create Client Cache") { |
| |
| public Object call() { |
| try { |
| createClientCache(host, port1); |
| } catch (Exception e) { |
| fail("Error while createClientCache " + e); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| private Cache createCache(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| ds.disconnect(); |
| ds = getSystem(props); |
| assertNotNull(ds); |
| Cache cache = (GemFireCacheImpl) CacheFactory.create(ds); |
| assertNotNull(cache); |
| return cache; |
| } |
| |
| private Integer createServerCache(DataPolicy dataPolicy) throws Exception { |
| Cache cache = helper.createCache(false); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(dataPolicy); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(REGION_NAME, attrs); |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer server1 = cache.addCacheServer(); |
| server1.setPort(port); |
| server1.setNotifyBySubscription(true); |
| server1.start(); |
| return new Integer(server1.getPort()); |
| } |
| |
| public Integer createServerCache() throws Exception { |
| return createServerCache(DataPolicy.REPLICATE); |
| } |
| |
| public Cache createClientCache(String host, Integer port1) throws Exception { |
| |
| Properties props = new Properties(); |
| props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); |
| props.setProperty(DistributionConfig.LOCATORS_NAME, ""); |
| Cache cache = createCache(props); |
| PoolImpl p = (PoolImpl) PoolManager.createFactory() |
| .addServer(host, port1.intValue()).setSubscriptionEnabled(false) |
| .setThreadLocalConnections(true).setMinConnections(1) |
| .setReadTimeout(20000).setPingInterval(10000).setRetryAttempts(1) |
| .setSubscriptionEnabled(true).setStatisticInterval(1000) |
| .create("CacheServerManagementDUnitTest"); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setPoolName(p.getName()); |
| |
| RegionAttributes attrs = factory.create(); |
| Region region = cache.createRegion(REGION_NAME, attrs); |
| return cache; |
| |
| } |
| |
| /** |
| * get member id |
| * |
| * @param vm |
| */ |
| @SuppressWarnings("serial") |
| protected static DistributedMember getMember() throws Exception { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| return cache.getDistributedSystem().getDistributedMember(); |
| } |
| |
| /** |
| * Verify the Cache Server details |
| * |
| * @param vm |
| */ |
| @SuppressWarnings("serial") |
| protected void verifyClientIds(final VM vm, |
| final DistributedMember serverMember, final int serverPort) { |
| SerializableRunnable verifyCacheServerRemote = new SerializableRunnable( |
| "Verify Cache Server Remote") { |
| public void run() { |
| try { |
| final WaitCriterion waitCriteria = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| CacheServerMXBean bean = null; |
| try { |
| bean = MBeanUtil.getCacheServerMbeanProxy( |
| serverMember, serverPort); |
| if (bean != null) { |
| if( bean.getClientIds().length > 0){ |
| return true; |
| } |
| } |
| }catch (Exception e) { |
| getLogWriter().info("exception occured " + e.getMessage() + CliUtil.stackTraceAsString((Throwable)e)); |
| } |
| return false; |
| } |
| @Override |
| public String description() { |
| return "wait for getNumOfClients bean to complete and get results"; |
| } |
| }; |
| waitForCriterion(waitCriteria, 2 * 60 * 1000, 3000, true); |
| |
| //Now it is sure that bean would be available |
| CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy( |
| serverMember, serverPort); |
| getLogWriter().info("verifyClientIds = " + bean.getClientIds().length); |
| assertEquals(true, bean.getClientIds().length > 0 ? true : false); |
| } catch (Exception e) { |
| fail("Error while verifying cache server from remote member " + e); |
| } |
| } |
| }; |
| vm.invoke(verifyCacheServerRemote); |
| } |
| |
| /** |
| * Verify the Cache Server details |
| * |
| * @param vm |
| */ |
| @SuppressWarnings("serial") |
| protected void put(final VM vm) { |
| SerializableRunnable put = new SerializableRunnable("put") { |
| public void run() { |
| try { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| Region<Object, Object> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| |
| r1.put(k1, client_k1); |
| assertEquals(r1.getEntry(k1).getValue(), client_k1); |
| r1.put(k2, client_k2); |
| assertEquals(r1.getEntry(k2).getValue(), client_k2); |
| try { |
| Thread.sleep(10000); |
| } catch (Exception e) { |
| // sleep |
| } |
| r1.clear(); |
| |
| r1.put(k1, client_k1); |
| assertEquals(r1.getEntry(k1).getValue(), client_k1); |
| r1.put(k2, client_k2); |
| assertEquals(r1.getEntry(k2).getValue(), client_k2); |
| try { |
| Thread.sleep(10000); |
| } catch (Exception e) { |
| // sleep |
| } |
| r1.clear(); |
| } catch (Exception ex) { |
| fail("failed while put", ex); |
| } |
| } |
| |
| }; |
| vm.invoke(put); |
| } |
| |
| } |