blob: de8005257af81033d3865406f2e9e81a2bfca282 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.upgrade;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.audit.AuditLoggerModule;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.ControllerModule;
import org.apache.ambari.server.ldap.LdapModule;
import org.apache.ambari.server.orm.DBAccessor;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.apache.ambari.server.utils.VersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.support.JdbcUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.persist.PersistService;
public class SchemaUpgradeHelper {
private static final Logger LOG = LoggerFactory.getLogger
(SchemaUpgradeHelper.class);
private Set<UpgradeCatalog> allUpgradeCatalogs;
private PersistService persistService;
private DBAccessor dbAccessor;
private Configuration configuration;
private static final String[] rcaTableNames = {"workflow", "job", "task", "taskAttempt", "hdfsEvent", "mapreduceEvent", "clusterEvent"};
static final Gson gson = new GsonBuilder().create();
@Inject
public SchemaUpgradeHelper(Set<UpgradeCatalog> allUpgradeCatalogs,
PersistService persistService,
DBAccessor dbAccessor,
Configuration configuration) {
this.allUpgradeCatalogs = allUpgradeCatalogs;
this.persistService = persistService;
this.dbAccessor = dbAccessor;
this.configuration = configuration;
}
public void startPersistenceService() {
persistService.start();
}
public void stopPersistenceService() {
persistService.stop();
}
public Set<UpgradeCatalog> getAllUpgradeCatalogs() {
return allUpgradeCatalogs;
}
public String readSourceVersion() {
Statement statement = null;
ResultSet rs = null;
try {
statement = dbAccessor.getConnection().createStatement();
if (statement != null) {
rs = statement.executeQuery("SELECT " + dbAccessor.quoteObjectName("metainfo_value") +
" from metainfo WHERE " + dbAccessor.quoteObjectName("metainfo_key") + "='version'");
if (rs != null && rs.next()) {
return rs.getString(1);
}
}
} catch (SQLException e) {
throw new RuntimeException("Unable to read database version", e);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
throw new RuntimeException("Cannot close result set");
}
}
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
throw new RuntimeException("Cannot close statement");
}
}
}
//not found, assume oldest version
//doesn't matter as there single upgrade catalog for 1.2.0 - 1.5.0 and 1.4.4 - 1.5.0 upgrades
return "1.2.0";
}
/**
* Read server version file
* @return
*/
protected String getAmbariServerVersion() {
return configuration.getServerVersion();
}
/**
* Return a set Upgrade catalogs to be applied to upgrade from
* @sourceVersion to @targetVersion
*
* @param sourceVersion
* @param targetVersion
* @return
* @throws org.apache.ambari.server.AmbariException
*/
protected List<UpgradeCatalog> getUpgradePath(String sourceVersion,
String targetVersion) throws AmbariException {
List<UpgradeCatalog> upgradeCatalogs = new ArrayList<>();
List<UpgradeCatalog> candidateCatalogs = new ArrayList<>(allUpgradeCatalogs);
Collections.sort(candidateCatalogs, new AbstractUpgradeCatalog.VersionComparator());
for (UpgradeCatalog upgradeCatalog : candidateCatalogs) {
if (sourceVersion == null || VersionUtils.compareVersions(sourceVersion,
upgradeCatalog.getTargetVersion(), 4) < 0) {
// catalog version is newer than source
if (VersionUtils.compareVersions(upgradeCatalog.getTargetVersion(),
targetVersion, 4) <= 0) {
// catalog version is older or equal to target
upgradeCatalogs.add(upgradeCatalog);
}
}
}
LOG.info("Upgrade path: " + upgradeCatalogs);
return upgradeCatalogs;
}
/**
* Extension of main controller module
*/
public static class UpgradeHelperModule extends ControllerModule {
public UpgradeHelperModule() throws Exception {
}
public UpgradeHelperModule(Properties properties) throws Exception {
super(properties);
}
@Override
protected void configure() {
super.configure();
// Add binding to each newly created catalog
Multibinder<UpgradeCatalog> catalogBinder =
Multibinder.newSetBinder(binder(), UpgradeCatalog.class);
catalogBinder.addBinding().to(UpgradeCatalog251.class);
catalogBinder.addBinding().to(UpgradeCatalog252.class);
catalogBinder.addBinding().to(UpgradeCatalog260.class);
catalogBinder.addBinding().to(UpgradeCatalog261.class);
catalogBinder.addBinding().to(UpgradeCatalog262.class);
catalogBinder.addBinding().to(UpgradeCatalog270.class);
catalogBinder.addBinding().to(UpdateAlertScriptPaths.class);
catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
EventBusSynchronizer.synchronizeAmbariEventPublisher(binder());
}
}
public void executeUpgrade(List<UpgradeCatalog> upgradeCatalogs) throws AmbariException {
LOG.info("Executing DDL upgrade...");
if (upgradeCatalogs != null && !upgradeCatalogs.isEmpty()) {
for (UpgradeCatalog upgradeCatalog : upgradeCatalogs) {
try {
upgradeCatalog.upgradeSchema();
} catch (Exception e) {
LOG.error("Upgrade failed. ", e);
throw new AmbariException(e.getMessage(), e);
}
}
}
}
public void executePreDMLUpdates(List<UpgradeCatalog> upgradeCatalogs) throws AmbariException {
LOG.info("Executing Pre-DML changes.");
if (upgradeCatalogs != null && !upgradeCatalogs.isEmpty()) {
for (UpgradeCatalog upgradeCatalog : upgradeCatalogs) {
try {
upgradeCatalog.preUpgradeData();
} catch (Exception e) {
LOG.error("Upgrade failed. ", e);
throw new AmbariException(e.getMessage(), e);
}
}
}
}
public void executeDMLUpdates(List<UpgradeCatalog> upgradeCatalogs, String ambariUpgradeConfigUpdatesFileName) throws AmbariException {
LOG.info("Executing DML changes.");
if (upgradeCatalogs != null && !upgradeCatalogs.isEmpty()) {
for (UpgradeCatalog upgradeCatalog : upgradeCatalogs) {
try {
upgradeCatalog.setConfigUpdatesFileName(ambariUpgradeConfigUpdatesFileName);
upgradeCatalog.upgradeData();
} catch (Exception e) {
LOG.error("Upgrade failed. ", e);
throw new AmbariException(e.getMessage(), e);
}
}
}
}
public void executeOnPostUpgrade(List<UpgradeCatalog> upgradeCatalogs)
throws AmbariException {
LOG.info("Finalizing catalog upgrade.");
if (upgradeCatalogs != null && !upgradeCatalogs.isEmpty()) {
for (UpgradeCatalog upgradeCatalog : upgradeCatalogs) {
try {
upgradeCatalog.onPostUpgrade();
upgradeCatalog.updateDatabaseSchemaVersion();
} catch (Exception e) {
LOG.error("Upgrade failed. ", e);
throw new AmbariException(e.getMessage(), e);
}
}
}
}
public void outputUpgradeJsonOutput(List<UpgradeCatalog> upgradeCatalogs)
throws AmbariException {
LOG.info("Combining upgrade json output.");
Map<String,String> combinedUpgradeJsonOutput = new HashMap<>();
if (upgradeCatalogs != null && !upgradeCatalogs.isEmpty()) {
for (UpgradeCatalog upgradeCatalog : upgradeCatalogs) {
try {
combinedUpgradeJsonOutput.putAll(upgradeCatalog.getUpgradeJsonOutput());
} catch (Exception e) {
LOG.error("Upgrade failed. ", e);
throw new AmbariException(e.getMessage(), e);
}
}
}
String content = gson.toJson(combinedUpgradeJsonOutput);
System.out.println(content);
}
public void resetUIState() throws AmbariException {
LOG.info("Resetting UI state.");
try {
dbAccessor.updateTable("key_value_store", dbAccessor.quoteObjectName("value"),
"{\"clusterState\":\"CLUSTER_STARTED_5\"}",
"where " + dbAccessor.quoteObjectName("key") + "='CLUSTER_CURRENT_STATUS'");
} catch (SQLException e) {
throw new AmbariException("Unable to reset UI state", e);
}
}
public void cleanUpRCATables() {
LOG.info("Cleaning up RCA tables.");
for (String tableName : rcaTableNames) {
try {
if (dbAccessor.tableExists(tableName)) {
dbAccessor.truncateTable(tableName);
}
} catch (Exception e) {
LOG.warn("Error cleaning rca table " + tableName, e);
}
}
try {
cleanUpTablesFromRCADatabase();
} catch (Exception e) {
LOG.warn("Error cleaning rca tables from ambarirca db", e);
}
}
private void cleanUpTablesFromRCADatabase() throws ClassNotFoundException, SQLException {
String driverName = configuration.getRcaDatabaseDriver();
String connectionURL = configuration.getRcaDatabaseUrl();
if (connectionURL.contains(Configuration.HOSTNAME_MACRO)) {
connectionURL = connectionURL.replace(Configuration.HOSTNAME_MACRO, "localhost");
}
String username = configuration.getRcaDatabaseUser();
String password = configuration.getRcaDatabasePassword();
Class.forName(driverName);
try (Connection connection = DriverManager.getConnection(connectionURL, username, password)) {
connection.setAutoCommit(true);
for (String tableName : rcaTableNames) {
String query = "DELETE FROM " + tableName;
try (Statement statement = connection.createStatement()) {
statement.execute(query);
} catch (Exception e) {
LOG.warn("Error while executing query: " + query, e);
}
}
}
}
/**
* Returns minimal version of available {@link UpgradeCatalog}
*
* @return string representation of minimal version of {@link UpgradeCatalog}
*/
private String getMinimalUpgradeCatalogVersion(){
List<UpgradeCatalog> candidateCatalogs = new ArrayList<>(allUpgradeCatalogs);
Collections.sort(candidateCatalogs, new AbstractUpgradeCatalog.VersionComparator());
if (candidateCatalogs.isEmpty()) {
return null;
}
return candidateCatalogs.iterator().next().getTargetVersion();
}
/**
* Checks if source version meets minimal requirements for upgrade
*
* @param minUpgradeVersion min allowed version for the upgrade, could be obtained via {@link #getMinimalUpgradeCatalogVersion()}
* @param sourceVersion current version of the Database, which need to be upgraded
*
* @return true if upgrade is allowed or false if not
*/
private boolean verifyUpgradePath(String minUpgradeVersion, String sourceVersion){
if (null == minUpgradeVersion){
return false;
}
return VersionUtils.compareVersions(sourceVersion, minUpgradeVersion) >= 0;
}
private List<String> getMyISAMTables() throws SQLException {
if (!configuration.getDatabaseType().equals(Configuration.DatabaseType.MYSQL)) {
return Collections.emptyList();
}
List<String> myISAMTables = new ArrayList<>();
String query = String.format("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' " +
"AND engine = 'MyISAM' AND table_type = 'BASE TABLE'", configuration.getServerDBName());
Statement statement = null;
ResultSet rs = null;
try {
statement = dbAccessor.getConnection().createStatement();
rs = statement.executeQuery(query);
if (rs != null) {
while (rs.next()) {
myISAMTables.add(rs.getString("table_name"));
}
}
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(statement);
}
return myISAMTables;
}
/**
* Upgrade Ambari DB schema to the target version passed in as the only
* argument.
* @param args args[0] = target version to upgrade to.
*/
public static void main(String[] args) throws Exception {
try {
// check java version to be higher then 1.6
String[] splittedJavaVersion = System.getProperty("java.version").split("\\.");
float javaVersion = Float.parseFloat(splittedJavaVersion[0] + "." + splittedJavaVersion[1]);
if (javaVersion < Configuration.JDK_MIN_VERSION) {
LOG.error(String.format("Oracle JDK version is lower than %.1f It can cause problems during upgrade process. Please," +
" use 'ambari-server setup' command to upgrade JDK!", Configuration.JDK_MIN_VERSION));
System.exit(1);
}
Injector injector = Guice.createInjector(new UpgradeHelperModule(), new AuditLoggerModule(), new LdapModule());
// Startup the JPA infrastructure, but do not indicate it is initialized since the underlying
// database schema may not be updated to meet the expectations of the Entity instances.
GuiceJpaInitializer jpaInitializer = injector.getInstance(GuiceJpaInitializer.class);
SchemaUpgradeHelper schemaUpgradeHelper = injector.getInstance(SchemaUpgradeHelper.class);
//Fail if MySQL database has tables with MyISAM engine
List<String> myISAMTables = schemaUpgradeHelper.getMyISAMTables();
if (!myISAMTables.isEmpty()) {
String errorMessage = String.format("Unsupported MyISAM table %s detected. " +
"For correct upgrade database should be migrated to InnoDB engine.", myISAMTables.get(0));
LOG.error(errorMessage);
throw new AmbariException(errorMessage);
}
String targetVersion = schemaUpgradeHelper.getAmbariServerVersion();
LOG.info("Upgrading schema to target version = " + targetVersion);
UpgradeCatalog targetUpgradeCatalog = AbstractUpgradeCatalog
.getUpgradeCatalog(targetVersion);
LOG.debug("Target upgrade catalog. {}", targetUpgradeCatalog);
// Read source version from DB
String sourceVersion = schemaUpgradeHelper.readSourceVersion();
LOG.info("Upgrading schema from source version = " + sourceVersion);
String minimalRequiredUpgradeVersion = schemaUpgradeHelper.getMinimalUpgradeCatalogVersion();
if (!schemaUpgradeHelper.verifyUpgradePath(minimalRequiredUpgradeVersion, sourceVersion)){
throw new AmbariException(String.format("Database version does not meet minimal upgrade requirements. Expected version should be not less than %s, current version is %s",
minimalRequiredUpgradeVersion, sourceVersion));
}
List<UpgradeCatalog> upgradeCatalogs =
schemaUpgradeHelper.getUpgradePath(sourceVersion, targetVersion);
String date = new SimpleDateFormat("MM-dd-yyyy_HH:mm:ss").format(new Date());
String ambariUpgradeConfigUpdatesFileName = "ambari_upgrade_config_changes_" + date + ".json";
schemaUpgradeHelper.executeUpgrade(upgradeCatalogs);
// The DDL is expected to be updated, now send the JPA initialized event so Entity
// implementations can be created.
jpaInitializer.setInitialized();
schemaUpgradeHelper.executePreDMLUpdates(upgradeCatalogs);
schemaUpgradeHelper.executeDMLUpdates(upgradeCatalogs, ambariUpgradeConfigUpdatesFileName);
schemaUpgradeHelper.executeOnPostUpgrade(upgradeCatalogs);
schemaUpgradeHelper.outputUpgradeJsonOutput(upgradeCatalogs);
schemaUpgradeHelper.resetUIState();
LOG.info("Upgrade successful.");
schemaUpgradeHelper.cleanUpRCATables();
schemaUpgradeHelper.stopPersistenceService();
// Signal all threads that we are ready to exit...
System.exit(0);
} catch (Throwable e) {
if (e instanceof AmbariException) {
LOG.error("Exception occurred during upgrade, failed", e);
throw (AmbariException)e;
}else{
LOG.error("Unexpected error, upgrade failed", e);
throw new Exception("Unexpected error, upgrade failed", e);
}
}
}
}