blob: d51de09bea8925ddb39b044043b87e1810b2056b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_HTTP_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.cache.GemFireCacheImpl.getInstance;
import static org.apache.geode.management.ManagementService.getManagementService;
import static org.apache.geode.management.internal.MBeanJMXAdapter.getClientServiceMBeanName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.client.internal.LocatorTestBase;
import org.apache.geode.cache.query.IndexExistsException;
import org.apache.geode.cache.query.IndexInvalidException;
import org.apache.geode.cache.query.IndexNameConflictException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.cq.dunit.CqQueryDUnitTest;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.management.internal.JmxManagerLocatorRequest;
import org.apache.geode.management.internal.MBeanJMXAdapter;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Cache Server related management test cases
*/
@Category({ClientSubscriptionTest.class})
public class CacheServerManagementDUnitTest extends LocatorTestBase {
private static final long serialVersionUID = 1L;
private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000;
private ManagementTestBase helper;
private static final String queryName = "testClientWithFeederAndCQ_0";
private static final String indexName = "testIndex";
private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
public CacheServerManagementDUnitTest() {
super();
this.helper = new ManagementTestBase() {
{
}
};
}
@Override
public final void preSetUp() throws Exception {
disconnectAllFromDS();
}
@Override
protected final void postTearDownLocatorTestBase() throws Exception {
disconnectAllFromDS();
}
@Test
public void testCacheServerMBean() throws Exception {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
VM managingNode = host.getVM(2);
// Managing Node is created first
helper.createManagementCache(managingNode);
helper.startManagingNode(managingNode);
// helper.createCache(server);
int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
cqDUnitTest.createServer(server, serverPort);
DistributedMember member = helper.getMember(server);
verifyCacheServer(server, serverPort);
final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
// Create client.
cqDUnitTest.createClient(client, port, host0);
cqDUnitTest.createCQ(client, queryName, cqDUnitTest.cqs[0]);
cqDUnitTest.executeCQ(client, queryName, false, null);
final int size = 10;
cqDUnitTest.createValues(client, cqDUnitTest.regions[0], size);
cqDUnitTest.waitForCreated(client, queryName, CqQueryDUnitTest.KEY + size);
cqDUnitTest.validateCQ(client, queryName, /* resultSize: */CqQueryDUnitTest.noTest,
/* creates: */size, /* updates: */0, /* deletes; */0, /* queryInserts: */size,
/* queryUpdates: */0, /* queryDeletes: */0, /* totalEvents: */size);
// Close.
Wait.pause(2000);
checkNavigation(managingNode, member, serverPort);
verifyIndex(server, serverPort);
// This will test all CQs and will close the cq in its final step
verifyCacheServerRemote(managingNode, member, serverPort);
verifyClosedCQ(server);
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
helper.stopManagingNode(managingNode);
helper.closeCache(client);
helper.closeCache(server);
helper.closeCache(managingNode);
}
/**
* Test for client server connection related management artifacts like notifications
*
*/
@Test
public void testCacheClient() throws Exception {
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
VM client = host.getVM(2);
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
addClientNotifListener(server, serverPort);
// Start a client and make sure that proper notification is received
client.invoke("Start BridgeClient", () -> startBridgeClient(null,
NetworkUtils.getServerHostName(locator.getHost()), locatorPort));
// stop the client and make sure the cache server notifies
stopBridgeMemberVM(client);
helper.closeCache(locator);
helper.closeCache(server);
helper.closeCache(client);
}
/**
* Intention of this test is to check if a node becomes manager after all the nodes are alive it
* should have all the information of all the members.
* <p>
* Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node ready for
* federation when manager node comes up
*
*/
// renable when bug 46138
@Ignore("Bug46049")
@Test
public void testBug46049() throws Exception {
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
// Step 1:
final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
// Step 2:
server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
// Step 3:
server.invoke("Check Server", () -> {
Cache cache = GemFireCacheImpl.getInstance();
assertNotNull(cache);
SystemManagementService service =
(SystemManagementService) ManagementService.getExistingManagementService(cache);
assertNotNull(service);
assertFalse(service.isManager());
assertNotNull(service.getMemberMXBean());
service.getLocalManager().runManagementTaskAdhoc();
});
// Step 4:
JmxManagerLocatorRequest.send(locator.getHost().getHostName(), locatorPort,
CONNECT_LOCATOR_TIMEOUT_MS, new Properties());
// Step 5:
locator.invoke("Check locator", () -> {
Cache cache = GemFireCacheImpl.getInstance();
assertNotNull(cache);
ManagementService service = ManagementService.getExistingManagementService(cache);
assertNotNull(service);
assertTrue(service.isManager());
LocatorMXBean bean = service.getLocalLocatorMXBean();
assertEquals(locatorPort, bean.getPort());
DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
assertEquals(2, dsBean.listMemberObjectNames().length);
});
helper.closeCache(locator);
helper.closeCache(server);
}
protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) {
disconnectFromDS();
Properties props = new Properties();
props.setProperty(MCAST_PORT, String.valueOf(0));
props.setProperty(LOCATORS, otherLocators);
props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
props.setProperty(JMX_MANAGER_HTTP_PORT, "0");
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log");
try {
InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost));
Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
remoteObjects.put(LOCATOR_KEY, locator);
} catch (UnknownHostException uhe) {
Assert.fail("While resolving bind address ", uhe);
} catch (IOException ex) {
Assert.fail("While starting locator on port " + locatorPort, ex);
}
}
protected void checkNavigation(final VM vm, final DistributedMember cacheServerMember,
final int serverPort) {
SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") {
@Override
public void run() {
final ManagementService service = helper.getManagementService();
DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean();
try {
ObjectName expected =
MBeanJMXAdapter.getClientServiceMBeanName(serverPort, cacheServerMember.getId());
ObjectName actual =
disMBean.fetchCacheServerObjectName(cacheServerMember.getId(), serverPort);
assertEquals(expected, actual);
} catch (Exception e) {
fail("Cache Server Navigation Failed " + e);
}
try {
assertEquals(1, disMBean.listCacheServerObjectNames().length);
} catch (Exception e) {
fail("Cache Server Navigation Failed " + e);
}
}
};
vm.invoke(checkNavigation);
}
/**
* Verify the Cache Server details
*
*/
@SuppressWarnings("serial")
protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception {
SerializableRunnable addClientNotifListener =
new SerializableRunnable("Add Client Notif Listener") {
@Override
public void run() {
GemFireCacheImpl cache = getInstance();
ManagementService service = getManagementService(cache);
final CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort);
assertNotNull(bean);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
if (bean.isRunning())
return true;
return false;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertTrue(bean.isRunning());
TestCacheServerNotif nt = new TestCacheServerNotif();
try {
mbeanServer.addNotificationListener(getClientServiceMBeanName(
serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null);
} catch (InstanceNotFoundException e) {
throw new RuntimeException("Failed With Exception ", e);
}
}
};
vm.invoke(addClientNotifListener);
}
/**
* Verify the closed CQ which is closed from Managing Node
*
*/
@SuppressWarnings("serial")
protected void verifyIndex(final VM vm, final int serverPort) throws Exception {
SerializableRunnable verifyIndex = new SerializableRunnable("Verify Index ") {
@Override
public void run() {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
ManagementService service = ManagementService.getManagementService(cache);
QueryService qs = cache.getQueryService();
try {
qs.createIndex(indexName, "p.ID", "/root/" + cqDUnitTest.regions[0]);
} catch (RegionNotFoundException e) {
fail("Failed With Exception " + e);
} catch (IndexInvalidException e) {
fail("Failed With Exception " + e);
} catch (IndexNameConflictException e) {
fail("Failed With Exception " + e);
} catch (IndexExistsException e) {
fail("Failed With Exception " + e);
} catch (UnsupportedOperationException e) {
fail("Failed With Exception " + e);
}
CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort);
assertEquals(bean.getIndexCount(), 1);
LogWriterUtils.getLogWriter()
.info("<ExpectedString> Index is " + bean.getIndexList()[0] + "</ExpectedString> ");
try {
bean.removeIndex(indexName);
} catch (Exception e) {
fail("Failed With Exception " + e);
}
assertEquals(bean.getIndexCount(), 0);
}
};
vm.invoke(verifyIndex);
}
/**
* Verify the closed CQ which is closed from Managing Node
*
*/
@SuppressWarnings("serial")
protected void verifyClosedCQ(final VM vm) throws Exception {
SerializableRunnable verifyClosedCQ = new SerializableRunnable("Verify Closed CQ") {
@Override
public void run() {
CqService cqService = GemFireCacheImpl.getInstance().getCqService();
if (cqService != null) {
assertNull(cqService.getCq(queryName));
}
}
};
vm.invoke(verifyClosedCQ);
}
/**
* Verify the Cache Server details
*
*/
@SuppressWarnings("serial")
protected void verifyCacheServer(final VM vm, final int serverPort) throws Exception {
SerializableRunnable verifyCacheServer = new SerializableRunnable("Verify Cache Server") {
@Override
public void run() {
GemFireCacheImpl cache = getInstance();
ManagementService service = getManagementService(cache);
final CacheServerMXBean bean = service.getLocalCacheServerMXBean(serverPort);
assertNotNull(bean);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
if (bean.isRunning())
return true;
return false;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertTrue(bean.isRunning());
assertCacheServerConfig(bean);
}
};
vm.invoke(verifyCacheServer);
}
protected void assertCacheServerConfig(CacheServerMXBean bean) {
// assertIndexDetailsEquals(ServerInfo.getInstance().getServerPort(), bean.getPort());
assertEquals(CacheServer.DEFAULT_BIND_ADDRESS, bean.getBindAddress());
assertEquals(CacheServer.DEFAULT_HOSTNAME_FOR_CLIENTS, bean.getHostNameForClients());
assertEquals(CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, bean.getSocketBufferSize());
assertEquals(CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, bean.getMaximumTimeBetweenPings());
assertEquals(CacheServer.DEFAULT_MAX_CONNECTIONS, bean.getMaxConnections());
assertEquals(CacheServer.DEFAULT_MAX_THREADS, bean.getMaxThreads());
assertEquals(CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, bean.getMaximumMessageCount());
assertEquals(CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, bean.getMessageTimeToLive());
assertEquals(CacheServer.DEFAULT_LOAD_POLL_INTERVAL, bean.getLoadPollInterval());
LogWriterUtils.getLogWriter().info("<ExpectedString> LoadProbe of the Server is "
+ bean.fetchLoadProbe().toString() + "</ExpectedString> ");
}
/**
* Verify the Cache Server details
*
*/
@SuppressWarnings("serial")
protected void verifyCacheServerRemote(final VM vm, final DistributedMember serverMember,
final int serverPort) {
SerializableRunnable verifyCacheServerRemote =
new SerializableRunnable("Verify Cache Server Remote") {
@Override
public void run() {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
try {
CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort);
// Check for bean configuration
assertCacheServerConfig(bean);
String clientId = bean.getClientIds()[0];
assertNotNull(clientId);
LogWriterUtils.getLogWriter().info(
"<ExpectedString> ClientId of the Server is " + clientId + "</ExpectedString> ");
LogWriterUtils.getLogWriter().info("<ExpectedString> Active Query Count "
+ bean.getActiveCQCount() + "</ExpectedString> ");
LogWriterUtils.getLogWriter().info("<ExpectedString> Registered Query Count "
+ bean.getRegisteredCQCount() + "</ExpectedString> ");
assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1);
int numQueues = bean.getNumSubscriptions();
assertEquals(numQueues, 1);
bean.getContinuousQueryList();
// Only temporarily stops the query
bean.stopContinuousQuery("testClientWithFeederAndCQ_0");
// Start a stopped query
bean.executeContinuousQuery("testClientWithFeederAndCQ_0");
// Close the continuous query
bean.closeContinuousQuery("testClientWithFeederAndCQ_0");
} catch (Exception e) {
fail("Error while verifying cache server from remote member " + e);
}
}
};
vm.invoke(verifyCacheServerRemote);
}
/**
* Notification handler
*/
private static class TestCacheServerNotif implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object handback) {
assertNotNull(notification);
LogWriterUtils.getLogWriter().info("Expected String :" + notification.toString());
}
}
}