| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.management; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; |
| import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_HTTP_PORT; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.apache.geode.internal.cache.GemFireCacheImpl.getInstance; |
| import static org.apache.geode.management.ManagementService.getManagementService; |
| import static org.apache.geode.management.internal.MBeanJMXAdapter.getClientServiceMBeanName; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.Properties; |
| |
| import javax.management.InstanceNotFoundException; |
| import javax.management.MBeanServer; |
| import javax.management.Notification; |
| import javax.management.NotificationListener; |
| import javax.management.ObjectName; |
| |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.client.internal.LocatorTestBase; |
| import org.apache.geode.cache.query.IndexExistsException; |
| import org.apache.geode.cache.query.IndexInvalidException; |
| import org.apache.geode.cache.query.IndexNameConflictException; |
| import org.apache.geode.cache.query.QueryService; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.cq.dunit.CqQueryDUnitTest; |
| import org.apache.geode.cache.query.internal.cq.CqService; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.Locator; |
| import org.apache.geode.internal.AvailablePortHelper; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.management.internal.JmxManagerLocatorRequest; |
| import org.apache.geode.management.internal.MBeanJMXAdapter; |
| import org.apache.geode.management.internal.SystemManagementService; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * Cache Server related management test cases |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class CacheServerManagementDUnitTest extends LocatorTestBase { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000; |
| |
| private ManagementTestBase helper; |
| |
| private static final String queryName = "testClientWithFeederAndCQ_0"; |
| |
| private static final String indexName = "testIndex"; |
| |
| private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; |
| |
| protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest(); |
| |
| public CacheServerManagementDUnitTest() { |
| super(); |
| this.helper = new ManagementTestBase() { |
| { |
| } |
| }; |
| |
| } |
| |
| @Override |
| public final void preSetUp() throws Exception { |
| disconnectAllFromDS(); |
| } |
| |
| @Override |
| protected final void postTearDownLocatorTestBase() throws Exception { |
| disconnectAllFromDS(); |
| } |
| |
| @Test |
| public void testCacheServerMBean() throws Exception { |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| VM managingNode = host.getVM(2); |
| |
| // Managing Node is created first |
| helper.createManagementCache(managingNode); |
| helper.startManagingNode(managingNode); |
| // helper.createCache(server); |
| int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); |
| cqDUnitTest.createServer(server, serverPort); |
| |
| DistributedMember member = helper.getMember(server); |
| |
| verifyCacheServer(server, serverPort); |
| |
| final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server.getHost()); |
| |
| // Create client. |
| cqDUnitTest.createClient(client, port, host0); |
| |
| cqDUnitTest.createCQ(client, queryName, cqDUnitTest.cqs[0]); |
| cqDUnitTest.executeCQ(client, queryName, false, null); |
| |
| final int size = 10; |
| cqDUnitTest.createValues(client, cqDUnitTest.regions[0], size); |
| cqDUnitTest.waitForCreated(client, queryName, CqQueryDUnitTest.KEY + size); |
| |
| cqDUnitTest.validateCQ(client, queryName, /* resultSize: */CqQueryDUnitTest.noTest, |
| /* creates: */size, /* updates: */0, /* deletes; */0, /* queryInserts: */size, |
| /* queryUpdates: */0, /* queryDeletes: */0, /* totalEvents: */size); |
| |
| // Close. |
| |
| Wait.pause(2000); |
| checkNavigation(managingNode, member, serverPort); |
| verifyIndex(server, serverPort); |
| // This will test all CQs and will close the cq in its final step |
| verifyCacheServerRemote(managingNode, member, serverPort); |
| |
| verifyClosedCQ(server); |
| |
| cqDUnitTest.closeClient(client); |
| cqDUnitTest.closeServer(server); |
| helper.stopManagingNode(managingNode); |
| helper.closeCache(client); |
| helper.closeCache(server); |
| helper.closeCache(managingNode); |
| } |
| |
| /** |
| * Test for client server connection related management artifacts like notifications |
| * |
| */ |
| |
| @Test |
| public void testCacheClient() throws Exception { |
| |
| final Host host = Host.getHost(0); |
| VM locator = host.getVM(0); |
| VM server = host.getVM(1); |
| VM client = host.getVM(2); |
| |
| int locatorPort = getRandomAvailableTCPPort(); |
| locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); |
| |
| String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; |
| |
| int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); |
| |
| addClientNotifListener(server, serverPort); |
| |
| // Start a client and make sure that proper notification is received |
| client.invoke("Start BridgeClient", () -> startBridgeClient(null, |
| NetworkUtils.getServerHostName(locator.getHost()), locatorPort)); |
| |
| // stop the client and make sure the cache server notifies |
| stopBridgeMemberVM(client); |
| helper.closeCache(locator); |
| helper.closeCache(server); |
| helper.closeCache(client); |
| |
| } |
| |
| /** |
| * Intention of this test is to check if a node becomes manager after all the nodes are alive it |
| * should have all the information of all the members. |
| * <p> |
| * Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node ready for |
| * federation when manager node comes up |
| * |
| */ |
| |
| // renable when bug 46138 |
| @Ignore("Bug46049") |
| @Test |
| public void testBug46049() throws Exception { |
| final Host host = Host.getHost(0); |
| VM locator = host.getVM(0); |
| VM server = host.getVM(1); |
| |
| // Step 1: |
| final int locatorPort = getRandomAvailableTCPPort(); |
| locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); |
| |
| String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; |
| |
| // Step 2: |
| server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); |
| |
| // Step 3: |
| server.invoke("Check Server", () -> { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| assertNotNull(cache); |
| SystemManagementService service = |
| (SystemManagementService) ManagementService.getExistingManagementService(cache); |
| assertNotNull(service); |
| assertFalse(service.isManager()); |
| assertNotNull(service.getMemberMXBean()); |
| service.getLocalManager().runManagementTaskAdhoc(); |
| }); |
| |
| // Step 4: |
| JmxManagerLocatorRequest.send(locator.getHost().getHostName(), locatorPort, |
| CONNECT_LOCATOR_TIMEOUT_MS, new Properties()); |
| |
| // Step 5: |
| locator.invoke("Check locator", () -> { |
| Cache cache = GemFireCacheImpl.getInstance(); |
| assertNotNull(cache); |
| ManagementService service = ManagementService.getExistingManagementService(cache); |
| assertNotNull(service); |
| assertTrue(service.isManager()); |
| LocatorMXBean bean = service.getLocalLocatorMXBean(); |
| assertEquals(locatorPort, bean.getPort()); |
| DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); |
| |
| assertEquals(2, dsBean.listMemberObjectNames().length); |
| }); |
| |
| helper.closeCache(locator); |
| helper.closeCache(server); |
| |
| } |
| |
| protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) { |
| disconnectFromDS(); |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, String.valueOf(0)); |
| props.setProperty(LOCATORS, otherLocators); |
| props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); |
| props.setProperty(JMX_MANAGER_HTTP_PORT, "0"); |
| props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); |
| File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log"); |
| try { |
| InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost)); |
| Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); |
| remoteObjects.put(LOCATOR_KEY, locator); |
| } catch (UnknownHostException uhe) { |
| Assert.fail("While resolving bind address ", uhe); |
| } catch (IOException ex) { |
| Assert.fail("While starting locator on port " + locatorPort, ex); |
| } |
| } |
| |
| protected void checkNavigation(final VM vm, final DistributedMember cacheServerMember, |
| final int serverPort) { |
| SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") { |
| @Override |
| public void run() { |
| |
| final ManagementService service = helper.getManagementService(); |
| |
| DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean(); |
| try { |
| ObjectName expected = |
| MBeanJMXAdapter.getClientServiceMBeanName(serverPort, cacheServerMember.getId()); |
| ObjectName actual = |
| disMBean.fetchCacheServerObjectName(cacheServerMember.getId(), serverPort); |
| assertEquals(expected, actual); |
| } catch (Exception e) { |
| fail("Cache Server Navigation Failed " + e); |
| } |
| |
| try { |
| assertEquals(1, disMBean.listCacheServerObjectNames().length); |
| } catch (Exception e) { |
| fail("Cache Server Navigation Failed " + e); |
| } |
| |
| } |
| }; |
| vm.invoke(checkNavigation); |
| } |
| |
| /** |
| * Verify the Cache Server details |
| * |
| */ |
| @SuppressWarnings("serial") |
| protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception { |
| SerializableRunnable addClientNotifListener = |
| new SerializableRunnable("Add Client Notif Listener") { |
| @Override |
| public void run() { |
| GemFireCacheImpl cache = getInstance(); |
| ManagementService service = getManagementService(cache); |
| final CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort); |
| assertNotNull(bean); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (bean.isRunning()) |
| return true; |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertTrue(bean.isRunning()); |
| TestCacheServerNotif nt = new TestCacheServerNotif(); |
| try { |
| mbeanServer.addNotificationListener(getClientServiceMBeanName( |
| serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null); |
| } catch (InstanceNotFoundException e) { |
| throw new RuntimeException("Failed With Exception ", e); |
| } |
| |
| } |
| }; |
| vm.invoke(addClientNotifListener); |
| } |
| |
| /** |
| * Verify the closed CQ which is closed from Managing Node |
| * |
| */ |
| @SuppressWarnings("serial") |
| protected void verifyIndex(final VM vm, final int serverPort) throws Exception { |
| SerializableRunnable verifyIndex = new SerializableRunnable("Verify Index ") { |
| @Override |
| public void run() { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| ManagementService service = ManagementService.getManagementService(cache); |
| QueryService qs = cache.getQueryService(); |
| try { |
| qs.createIndex(indexName, "p.ID", |
| SEPARATOR + "root" + SEPARATOR + cqDUnitTest.regions[0]); |
| } catch (RegionNotFoundException e) { |
| fail("Failed With Exception " + e); |
| } catch (IndexInvalidException e) { |
| fail("Failed With Exception " + e); |
| } catch (IndexNameConflictException e) { |
| fail("Failed With Exception " + e); |
| } catch (IndexExistsException e) { |
| fail("Failed With Exception " + e); |
| } catch (UnsupportedOperationException e) { |
| fail("Failed With Exception " + e); |
| } |
| |
| CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort); |
| assertEquals(bean.getIndexCount(), 1); |
| LogWriterUtils.getLogWriter() |
| .info("<ExpectedString> Index is " + bean.getIndexList()[0] + "</ExpectedString> "); |
| try { |
| bean.removeIndex(indexName); |
| } catch (Exception e) { |
| fail("Failed With Exception " + e); |
| |
| } |
| assertEquals(bean.getIndexCount(), 0); |
| |
| } |
| }; |
| vm.invoke(verifyIndex); |
| } |
| |
| /** |
| * Verify the closed CQ which is closed from Managing Node |
| * |
| */ |
| @SuppressWarnings("serial") |
| protected void verifyClosedCQ(final VM vm) throws Exception { |
| SerializableRunnable verifyClosedCQ = new SerializableRunnable("Verify Closed CQ") { |
| @Override |
| public void run() { |
| CqService cqService = GemFireCacheImpl.getInstance().getCqService(); |
| if (cqService != null) { |
| assertNull(cqService.getCq(queryName)); |
| } |
| |
| } |
| }; |
| vm.invoke(verifyClosedCQ); |
| } |
| |
| /** |
| * Verify the Cache Server details |
| * |
| */ |
| @SuppressWarnings("serial") |
| protected void verifyCacheServer(final VM vm, final int serverPort) throws Exception { |
| SerializableRunnable verifyCacheServer = new SerializableRunnable("Verify Cache Server") { |
| @Override |
| public void run() { |
| GemFireCacheImpl cache = getInstance(); |
| ManagementService service = getManagementService(cache); |
| final CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort); |
| assertNotNull(bean); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (bean.isRunning()) |
| return true; |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertTrue(bean.isRunning()); |
| assertCacheServerConfig(bean); |
| |
| } |
| }; |
| vm.invoke(verifyCacheServer); |
| } |
| |
| protected void assertCacheServerConfig(CacheServerMXBean bean) { |
| // assertIndexDetailsEquals(ServerInfo.getInstance().getServerPort(), bean.getPort()); |
| assertEquals(CacheServer.DEFAULT_BIND_ADDRESS, bean.getBindAddress()); |
| assertEquals(CacheServer.DEFAULT_HOSTNAME_FOR_CLIENTS, bean.getHostNameForClients()); |
| assertEquals(CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, bean.getSocketBufferSize()); |
| assertEquals(CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, bean.getMaximumTimeBetweenPings()); |
| assertEquals(CacheServer.DEFAULT_MAX_CONNECTIONS, bean.getMaxConnections()); |
| assertEquals(CacheServer.DEFAULT_MAX_THREADS, bean.getMaxThreads()); |
| assertEquals(CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, bean.getMaximumMessageCount()); |
| assertEquals(CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, bean.getMessageTimeToLive()); |
| assertEquals(CacheServer.DEFAULT_LOAD_POLL_INTERVAL, bean.getLoadPollInterval()); |
| LogWriterUtils.getLogWriter().info("<ExpectedString> LoadProbe of the Server is " |
| + bean.fetchLoadProbe().toString() + "</ExpectedString> "); |
| } |
| |
| /** |
| * Verify the Cache Server details |
| * |
| */ |
| @SuppressWarnings("serial") |
| protected void verifyCacheServerRemote(final VM vm, final DistributedMember serverMember, |
| final int serverPort) { |
| SerializableRunnable verifyCacheServerRemote = |
| new SerializableRunnable("Verify Cache Server Remote") { |
| @Override |
| public void run() { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| try { |
| |
| CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); |
| |
| // Check for bean configuration |
| assertCacheServerConfig(bean); |
| |
| String clientId = bean.getClientIds()[0]; |
| assertNotNull(clientId); |
| LogWriterUtils.getLogWriter().info( |
| "<ExpectedString> ClientId of the Server is " + clientId + "</ExpectedString> "); |
| LogWriterUtils.getLogWriter().info("<ExpectedString> Active Query Count " |
| + bean.getActiveCQCount() + "</ExpectedString> "); |
| |
| LogWriterUtils.getLogWriter().info("<ExpectedString> Registered Query Count " |
| + bean.getRegisteredCQCount() + "</ExpectedString> "); |
| |
| assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1); |
| int numQueues = bean.getNumSubscriptions(); |
| assertEquals(numQueues, 1); |
| |
| bean.getContinuousQueryList(); |
| // Only temporarily stops the query |
| bean.stopContinuousQuery("testClientWithFeederAndCQ_0"); |
| |
| // Start a stopped query |
| bean.executeContinuousQuery("testClientWithFeederAndCQ_0"); |
| |
| // Close the continuous query |
| bean.closeContinuousQuery("testClientWithFeederAndCQ_0"); |
| } catch (Exception e) { |
| fail("Error while verifying cache server from remote member " + e); |
| } |
| |
| } |
| }; |
| vm.invoke(verifyCacheServerRemote); |
| } |
| |
| /** |
| * Notification handler |
| */ |
| private static class TestCacheServerNotif implements NotificationListener { |
| |
| @Override |
| public void handleNotification(Notification notification, Object handback) { |
| assertNotNull(notification); |
| LogWriterUtils.getLogWriter().info("Expected String :" + notification.toString()); |
| } |
| |
| } |
| |
| } |