blob: 49e339c0740452f4693290d50d80bce7dc4a0d7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.knox.gateway.topology.discovery.cm.monitor;
import com.cloudera.api.swagger.client.ApiClient;
import com.cloudera.api.swagger.model.ApiEvent;
import com.cloudera.api.swagger.model.ApiEventAttribute;
import com.cloudera.api.swagger.model.ApiEventCategory;
import org.apache.commons.io.FileUtils;
import org.apache.knox.gateway.GatewayServer;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
import org.easymock.EasyMock;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class PollingConfigurationAnalyzerTest {
@Test(expected = IllegalArgumentException.class)
public void testRestartEventWithWrongApiEventCategory() {
doTestRestartEvent(ApiEventCategory.LOG_EVENT);
}
@Test
public void testRestartEvent() {
doTestRestartEvent(ApiEventCategory.AUDIT_EVENT);
}
private void doTestRestartEvent(final ApiEventCategory category) {
final String clusterName = "My Cluster";
final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
final String service = NameNodeServiceModelGenerator.SERVICE;
List<ApiEventAttribute> apiEventAttrs = new ArrayList<>();
apiEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
apiEventAttrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
apiEventAttrs.add(createEventAttribute("SERVICE", service));
ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
assertNotNull(restartEvent);
assertEquals(clusterName, restartEvent.getClusterName());
assertEquals(serviceType, restartEvent.getServiceType());
assertEquals(service, restartEvent.getService());
assertNotNull(restartEvent.getTimestamp());
}
@Test
public void testPollingConfigChangeNotificationForChangedPropertyValue() {
final String address = "http://host1:1234";
final String clusterName = "Cluster 5";
final String failoverPropertyName = "autofailover_enabled";
final String nsPropertyName = "dfs_federation_namenode_nameservice";
final String portPropertyName = "namenode_port";
// Mock the service discovery details
ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
EasyMock.replay(sdc);
final Map<String, List<String>> clusterNames = new HashMap<>();
clusterNames.put(address, Collections.singletonList(clusterName));
// Create the original ServiceConfigurationModel details
final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
final Map<String, String> nnServiceConf = new HashMap<>();
final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
final Map<String, String> nnRoleProps = new HashMap<>();
nnRoleProps.put(failoverPropertyName, "false");
nnRoleProps.put(nsPropertyName, "");
nnRoleProps.put(portPropertyName, "54321");
nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
// Mock a ClusterConfigurationCache for the monitor to use
ClusterConfigurationCache configCache = EasyMock.createNiceMock(ClusterConfigurationCache.class);
EasyMock.expect(configCache.getDiscoveryConfig(address, clusterName)).andReturn(sdc).anyTimes();
EasyMock.expect(configCache.getClusterNames()).andReturn(clusterNames).anyTimes();
EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName))
.andReturn(serviceConfigurationModels)
.anyTimes();
EasyMock.replay(configCache);
// Create the monitor, registering a listener so we can verify that change notification works
ChangeListener listener = new ChangeListener();
TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache, listener);
pca.setInterval(5);
// Create another version of the same ServiceConfigurationModel with a modified property value
ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, "false");
updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, "");
updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, "12345");
pca.addCurrentServiceConfigModel(address, clusterName, NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", updatedNNModel);
// Start the polling thread
ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
pollingThreadExecutor.execute(pca);
pollingThreadExecutor.shutdown();
// Simulate a service restart event
List<ApiEventAttribute> restartEventAttrs = new ArrayList<>();
restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE));
restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE));
restartEventAttrs.add(createEventAttribute("COMMAND", "Restart"));
restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED"));
ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs);
pca.addRestartEvent(clusterName, restartEvent);
// Simulate a service Start event
List<ApiEventAttribute> startEventAttrs = new ArrayList<>();
startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE));
startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE));
startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED"));
ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
pca.addRestartEvent(clusterName, startEvent);
// Simulate a failed service Start event
startEventAttrs = new ArrayList<>();
startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED"));
ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
pca.addRestartEvent(clusterName, failedStartEvent);
// Simulate an event w/o COMMAND and/or COMMAND_STATUS attributes
final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
revisionEventAttrs.add(createEventAttribute("EVENTCODE", "EV_REVISION_CREATED"));
final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs);
pca.addRestartEvent(clusterName, revisionEvent);
try {
pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//
}
// Stop the config analyzer thread
pca.stop();
assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
assertEquals(2, listener.howManyNotifications(address, clusterName));
}
@Test
public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
final String address = "http://host1:1234";
final String clusterName = "Cluster 5";
final String updatedAddress = "http://host2:1234";
final String descContent =
"{\n" +
" \"discovery-type\": \"ClouderaManager\",\n" +
" \"discovery-address\": \"" + updatedAddress + "\",\n" +
" \"cluster\": \"" + clusterName + "\",\n" +
" \"provider-config-ref\": \"ldap\",\n" +
" \"services\": [\n" +
" {\n" +
" \"name\": \"WEBHDFS\"\n" +
" }\n" +
" ]\n" +
"}";
File descriptor = null;
try {
descriptor = File.createTempFile("test", ".json");
FileUtils.writeStringToFile(descriptor, descContent, StandardCharsets.UTF_8);
} catch (IOException e) {
e.printStackTrace();
}
// Mock the service discovery details
ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
EasyMock.replay(sdc);
final Map<String, List<String>> clusterNames = new HashMap<>();
clusterNames.put(address, Collections.singletonList(clusterName));
// Create the original ServiceConfigurationModel details
final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
final Map<String, String> nnServiceConf = new HashMap<>();
final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, Collections.emptyMap());
serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
// Create a ClusterConfigurationCache for the monitor to use
final ClusterConfigurationCache configCache = new ClusterConfigurationCache();
configCache.addDiscoveryConfig(sdc);
configCache.addServiceConfiguration(address, clusterName, serviceConfigurationModels);
assertEquals(1, configCache.getClusterNames().get(address).size());
// Set up GatewayServices
// TopologyService mock
TopologyService ts = EasyMock.createNiceMock(TopologyService.class);
EasyMock.expect(ts.getDescriptors()).andReturn(Collections.singletonList(descriptor)).anyTimes();
// ClusterConfigurationMonitorService mock
ClusterConfigurationMonitorService ccms = EasyMock.createNiceMock(ClusterConfigurationMonitorService.class);
// Implement the clearing of the cache for the mock
ccms.clearCache(address, clusterName);
EasyMock.expectLastCall().andAnswer(() -> {
Object[] args = getCurrentArguments();
configCache.removeServiceConfiguration((String)args[0], (String)args[1]);
return null;
}).once();
// GatewayServices mock
GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
EasyMock.replay(ts, ccms, gws);
try {
setGatewayServices(gws);
// Create the monitor
TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache);
pca.setInterval(5);
// Start the polling thread
ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
pollingThreadExecutor.execute(pca);
pollingThreadExecutor.shutdown();
try {
pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//
}
// Stop the config analyzer thread
pca.stop();
if (descriptor != null && descriptor.exists()) {
descriptor.deleteOnExit();
}
assertEquals("Expected the config cache entry for " + clusterName + " to have been removed.",
0,
configCache.getClusterNames().get(address).size());
} finally {
// Reset the GatewayServices field of GatewayServer
setGatewayServices(null);
}
}
/**
* Set the static GatewayServices field to the specified value.
*
* @param gws A GatewayServices object, or null.
*/
private void setGatewayServices(final GatewayServices gws) {
try {
Field gwsField = GatewayServer.class.getDeclaredField("services");
gwsField.setAccessible(true);
gwsField.set(null, gws);
} catch (Exception e) {
e.printStackTrace();
}
}
private ApiEvent createApiEvent(final ApiEventCategory category, final List<ApiEventAttribute> attrs) {
ApiEvent event = EasyMock.createNiceMock(ApiEvent.class);
EasyMock.expect(event.getTimeOccurred()).andReturn(Instant.now().toString()).anyTimes();
EasyMock.expect(event.getCategory()).andReturn(category).anyTimes();
EasyMock.expect(event.getAttributes()).andReturn(attrs).anyTimes();
EasyMock.replay(event);
return event;
}
private ApiEventAttribute createEventAttribute(final String name, final String value) {
ApiEventAttribute attr = EasyMock.createNiceMock(ApiEventAttribute.class);
EasyMock.expect(attr.getName()).andReturn(name).anyTimes();
EasyMock.expect(attr.getValues()).andReturn(Collections.singletonList(value)).anyTimes();
EasyMock.replay(attr);
return attr;
}
private ServiceConfigurationModel createModel(Map<String, String> serviceConfig,
Map<String, Map<String, String>> roleConfig) {
ServiceConfigurationModel model = new ServiceConfigurationModel();
for (Map.Entry<String, String> entry : serviceConfig.entrySet()) {
model.addServiceProperty(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Map<String, String>> entry : roleConfig.entrySet()) {
String roleType = entry.getKey();
for (Map.Entry<String, String> prop : entry.getValue().entrySet()) {
model.addRoleProperty(roleType, prop.getKey(), prop.getValue());
}
}
return model;
}
/**
* PollingConfigurationAnalyzer extension to override CM API invocations.
*/
private static class TestablePollingConfigAnalyzer extends PollingConfigurationAnalyzer {
private Map<String, List<ApiEvent>> restartEvents = new HashMap<>();
private Map<String, ServiceConfigurationModel> serviceConfigModels = new HashMap<>();
TestablePollingConfigAnalyzer(ClusterConfigurationCache cache) {
this(cache, null);
}
TestablePollingConfigAnalyzer(ClusterConfigurationCache cache,
ConfigurationChangeListener listener) {
super(cache, null, null, listener);
}
TestablePollingConfigAnalyzer(ClusterConfigurationCache cache,
ConfigurationChangeListener listener,
int interval) {
super(cache, null, null, listener, interval);
}
void addRestartEvent(final String service, final ApiEvent restartEvent) {
restartEvents.computeIfAbsent(service, l -> new ArrayList<>()).add(restartEvent);
}
void addCurrentServiceConfigModel(final String address, final String clusterName, final String service, final ServiceConfigurationModel model) {
serviceConfigModels.put(getServiceConfigModelKey(address, clusterName, service), model);
}
@Override
protected List<ApiEvent> queryEvents(ApiClient client, String clusterName, String since) {
return restartEvents.computeIfAbsent(clusterName, l -> new ArrayList<>());
}
@Override
protected ServiceConfigurationModel getCurrentServiceConfiguration(String address,
String clusterName,
String service) {
return serviceConfigModels.get(getServiceConfigModelKey(address, clusterName, service));
}
static String getServiceConfigModelKey(final String address, final String clusterName, final String service) {
return address + ":" + clusterName + ":" + service;
}
}
private static class ChangeListener implements ConfigurationChangeListener {
private final Map<String, String> notifications = new HashMap<>();
private final List<String> events = new ArrayList<>();
@Override
public void onConfigurationChange(String source, String clusterName) {
notifications.put(source, clusterName);
events.add(source + "+" + clusterName);
}
boolean wasNotified(final String source, final String clusterName) {
return clusterName.equals(notifications.get(source));
}
int howManyNotifications(final String source, final String clusterName) {
return events.size();
}
}
}