blob: 07c7d3ed019a90f766e29fa48d69d44a3e92fa5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.upgrade;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.SecurityType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Injector;
/**
* The {@link UpgradeCatalog251} upgrades Ambari from 2.5.0 to 2.5.1.
*/
public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
static final String HOST_ROLE_COMMAND_TABLE = "host_role_command";
static final String HRC_IS_BACKGROUND_COLUMN = "is_background";
protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
private static final String STAGE_TABLE = "stage";
private static final String REQUEST_TABLE = "request";
private static final String CLUSTER_HOST_INFO_COLUMN = "cluster_host_info";
private static final String REQUEST_ID_COLUMN = "request_id";
protected static final String STORM_ENV_CONFIG = "storm-env";
/**
* Logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog251.class);
/**
* Constructor.
*
* @param injector
*/
@Inject
public UpgradeCatalog251(Injector injector) {
super(injector);
}
/**
* {@inheritDoc}
*/
@Override
public String getSourceVersion() {
return "2.5.0";
}
/**
* {@inheritDoc}
*/
@Override
public String getTargetVersion() {
return "2.5.1";
}
/**
* {@inheritDoc}
*/
@Override
protected void executeDDLUpdates() throws AmbariException, SQLException {
addBackgroundColumnToHostRoleCommand();
moveClusterHostColumnFromStageToRequest();
}
/**
* {@inheritDoc}
*/
@Override
protected void executePreDMLUpdates() throws AmbariException, SQLException {
}
/**
* {@inheritDoc}
*/
@Override
protected void executeDMLUpdates() throws AmbariException, SQLException {
addNewConfigurationsFromXml();
updateKAFKAConfigs();
updateSTORMConfigs();
}
/**
* Ensure that the updates from Ambari 2.4.0 are applied in the event the initial version is
* Ambari 2.5.0, since this Kafka change failed to make it into Ambari 2.5.0.
*
* If the base version was before Ambari 2.5.0, this method should wind up doing nothing.
* @throws AmbariException
*/
protected void updateKAFKAConfigs() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
Clusters clusters = ambariManagementController.getClusters();
if (clusters != null) {
Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
if (clusterMap != null && !clusterMap.isEmpty()) {
for (final Cluster cluster : clusterMap.values()) {
Set<String> installedServices = cluster.getServices().keySet();
if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
if (kafkaBroker != null) {
String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
if (StringUtils.isNotEmpty(listenersPropertyValue)) {
String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
if(!newListenersPropertyValue.equals(listenersPropertyValue)) {
updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
}
}
}
}
}
}
}
}
/**
* Adds the {@value #HRC_IS_BACKGROUND_COLUMN} column to the
* {@value #HOST_ROLE_COMMAND_TABLE} table.
*
* @throws SQLException
*/
private void addBackgroundColumnToHostRoleCommand() throws SQLException {
dbAccessor.addColumn(HOST_ROLE_COMMAND_TABLE,
new DBColumnInfo(HRC_IS_BACKGROUND_COLUMN, Short.class, null, 0, false));
}
/**
* Moves the {@value #CLUSTER_HOST_INFO_COLUMN} column from {@value #STAGE_TABLE} table to the
* {@value #REQUEST_TABLE} table
*
*
* @throws SQLException
*/
private void moveClusterHostColumnFromStageToRequest() throws SQLException {
DBColumnInfo sourceColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false);
DBColumnInfo targetColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false);
dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn, REQUEST_ID_COLUMN, "{}".getBytes());
}
/**
* Make sure storm-env changes are applied to anyone upgrading to HDP-2.6.1 Storm
* If the base version was before Ambari 2.5.0, this method should wind up doing nothing.
* @throws AmbariException
*/
protected void updateSTORMConfigs() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
Clusters clusters = ambariManagementController.getClusters();
if (clusters != null) {
Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
if (clusterMap != null && !clusterMap.isEmpty()) {
for (final Cluster cluster : clusterMap.values()) {
Set<String> installedServices = cluster.getServices().keySet();
// Technically, this should be added when the cluster is Kerberized on HDP 2.6.1, but is safe to add even
// without security or on an older stack version (such as HDP 2.5)
// The problem is that Kerberizing a cluster does not invoke Stack Advisor and has no easy way of setting
// these configs, so instead, add them as long as Storm is present.
if (installedServices.contains("STORM")) {
Config stormEnv = cluster.getDesiredConfigByType(STORM_ENV_CONFIG);
String content = stormEnv.getProperties().get("content");
if (content != null && !content.contains("STORM_AUTOCREDS_LIB_DIR")) {
Map<String, String> newProperties = new HashMap<>();
String stormEnvConfigs = "\n# set storm-auto creds\n" +
"# check if storm_jaas.conf in config, only enable storm_auto_creds in secure mode.\n" +
"STORM_JAAS_CONF=$STORM_HOME/conf/storm_jaas.conf\n" +
"STORM_AUTOCREDS_LIB_DIR=$STORM_HOME/external/storm-autocreds\n" +
"if [ -f $STORM_JAAS_CONF ] && [ -d $STORM_AUTOCREDS_LIB_DIR ]; then\n" +
" export STORM_EXT_CLASSPATH=$STORM_AUTOCREDS_LIB_DIR\n" +
"fi\n";
content += stormEnvConfigs;
newProperties.put("content", content);
updateConfigurationPropertiesForCluster(cluster, "storm-env", newProperties, true, false);
}
}
}
}
}
}
}