Revert "AMBARI-17929. Kafka brokers went down after Ambari upgrade due to IllegalArgumentException.(vbrodetskyi)"
This reverts commit d6b8617167484d22ceccfc6f1eb71d0b246392f4.
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index a3d9c89..5495655 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -176,7 +176,6 @@
protected static final String SLIDER_SERVICE_NAME = "SLIDER";
private static final String OOZIE_ENV_CONFIG = "oozie-env";
- protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
private static final String SLIDER_CLIENT_CONFIG = "slider-client";
private static final String HIVE_ENV_CONFIG = "hive-env";
private static final String AMS_SITE = "ams-site";
@@ -394,7 +393,6 @@
addManageUserPersistedDataPermission();
allowClusterOperatorToManageCredentials();
updateHDFSConfigs();
- updateKAFKAConfigs();
updateHIVEConfigs();
updateAMSConfigs();
updateClusterEnv();
@@ -1902,27 +1900,6 @@
}
}
- protected void updateKAFKAConfigs() throws AmbariException {
- AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
- Clusters clusters = ambariManagementController.getClusters();
- Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
-
- for (final Cluster cluster : clusterMap.values()) {
- Set<String> installedServices = cluster.getServices().keySet();
-
- if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
- Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
- if (kafkaBroker != null) {
- String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
- if (StringUtils.isNotEmpty(listenersPropertyValue)) {
- String newListenersPropertyValue = listenersPropertyValue.replaceAll("PLAINTEXT", "PLAINTEXTSASL");
- updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
- }
- }
- }
- }
- }
-
protected void updateHIVEConfigs() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index 5bbfebd..34ca199 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -56,6 +56,8 @@
import java.util.List;
import java.util.Map;
+import javax.persistence.EntityManager;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -116,12 +118,8 @@
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import org.junit.rules.TemporaryFolder;
-import org.springframework.security.crypto.password.PasswordEncoder;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@@ -132,6 +130,8 @@
import com.google.inject.Module;
import com.google.inject.Provider;
+import org.springframework.security.crypto.password.PasswordEncoder;
+
public class UpgradeCatalog240Test {
private static final String CAPACITY_SCHEDULER_CONFIG_TYPE = "capacity-scheduler";
private static final String WEBHCAT_SITE_CONFIG_TYPE = "webhcat-site";
@@ -584,7 +584,6 @@
Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML");
Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert");
Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties");
- Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs");
Capture<String> capturedStatements = newCapture(CaptureType.ALL);
@@ -634,7 +633,6 @@
.addMockedMethod(updateRecoveryConfigurationDML)
.addMockedMethod(removeAtlasMetaserverAlert)
.addMockedMethod(updateRangerHbasePluginProperties)
- .addMockedMethod(updateKAFKAConfigs)
.createMock();
Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -676,7 +674,6 @@
upgradeCatalog240.updateRecoveryConfigurationDML();
upgradeCatalog240.removeAtlasMetaserverAlert();
upgradeCatalog240.updateRangerHbasePluginProperties();
- upgradeCatalog240.updateKAFKAConfigs();
replay(upgradeCatalog240, dbAccessor);
@@ -1111,63 +1108,6 @@
assertTrue(Maps.difference(newPropertiesYarnEnv, updatedProperties).areEqual());
}
- @Test
- public void testUpdateKAFKAConfigs() throws Exception{
- EasyMockSupport easyMockSupport = new EasyMockSupport();
- final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
- final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
- final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
-
- final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
- expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{
- put("listeners", "PLAINTEXT://localhost:PLAINTEXT6667,PLAINTEXTSSL://localhost:6666PLAINTEXT");
- }}
- ).anyTimes();
-
- final Injector mockInjector = Guice.createInjector(new AbstractModule() {
- @Override
- protected void configure() {
- bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
- bind(Clusters.class).toInstance(mockClusters);
- bind(EntityManager.class).toInstance(entityManager);
- bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
- bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
- bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
- }
- });
-
- expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
- expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
- put("normal", mockClusterExpected);
- }}).atLeastOnce();
- expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
- expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS);
- expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() {
- {
- put("KAFKA", null);
- }
- }).atLeastOnce();
-
- UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class)
- .withConstructor(Injector.class)
- .withArgs(mockInjector)
- .addMockedMethod("updateConfigurationProperties", String.class,
- Map.class, boolean.class, boolean.class)
- .createMock();
-
- Map<String, String> expectedUpdates = new HashMap<>();
- expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:PLAINTEXTSASL6667,PLAINTEXTSASLSSL://localhost:6666PLAINTEXTSASL");
-
- upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates,
- true, false);
- expectLastCall().once();
-
- easyMockSupport.replayAll();
- replay(upgradeCatalog240);
- upgradeCatalog240.updateKAFKAConfigs();
- easyMockSupport.verifyAll();
- }
-
/**
* Test that queue names updated in mapred-site, webhcat-site, tez-site, yarn-env
* @throws Exception