blob: 3efb1e98f0ecfd78fd4221811bf51885065dcd3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.upgrade;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
import org.apache.ambari.server.orm.dao.ClusterStateDAO;
import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.KeyValueDAO;
import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
import org.apache.ambari.server.orm.entities.ClusterConfigEntityPK;
import org.apache.ambari.server.orm.entities.ClusterConfigMappingEntity;
import org.apache.ambari.server.orm.entities.ClusterEntity;
import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
import org.apache.ambari.server.orm.entities.ClusterServiceEntityPK;
import org.apache.ambari.server.orm.entities.ClusterStateEntity;
import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntityPK;
import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.KeyValueEntity;
import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
import org.apache.ambari.server.state.HostComponentAdminState;
import org.apache.ambari.server.state.PropertyInfo;
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.State;
import org.eclipse.persistence.jpa.JpaEntityManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.*;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.HashMap;
public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog150.class);
private static final String quartzScriptFilePattern = "quartz.%s.sql";
private Injector injector;
@Inject
public UpgradeCatalog150(Injector injector) {
super(injector);
this.injector = injector;
}
@Override
public void executeDDLUpdates() throws AmbariException, SQLException {
LOG.debug("Upgrading schema...");
String dbType = getDbType();
List<DBColumnInfo> columns = new ArrayList<DBColumnInfo>();
// ========================================================================
// Create tables
// ClusterConfigMapping
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("type_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("version_tag", String.class, 255, null, false));
columns.add(new DBColumnInfo("create_timestamp", Long.class, null, null, false));
columns.add(new DBColumnInfo("selected", Integer.class, 0, null, false));
columns.add(new DBColumnInfo("user_name", String.class, 255, "_db", false));
dbAccessor.createTable("clusterconfigmapping", columns, "cluster_id", "type_name", "create_timestamp");
// Request
columns.clear();
columns.add(new DBColumnInfo("request_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, true));
columns.add(new DBColumnInfo("request_schedule_id", Long.class, null, null, true));
columns.add(new DBColumnInfo("command_name", String.class, 255, null, true));
columns.add(new DBColumnInfo("create_time", Long.class, null, null, true));
columns.add(new DBColumnInfo("end_time", Long.class, null, null, true));
columns.add(new DBColumnInfo("inputs", byte[].class, null, null, true));
columns.add(new DBColumnInfo("request_context", String.class, 255, null, true));
columns.add(new DBColumnInfo("request_type", String.class, 255, null, true));
columns.add(new DBColumnInfo("start_time", Long.class, null, null, false));
columns.add(new DBColumnInfo("status", String.class, 255));
dbAccessor.createTable("request", columns, "request_id");
// RequestSchedule
columns.clear();
columns.add(new DBColumnInfo("schedule_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("description", String.class, 255, null, true));
columns.add(new DBColumnInfo("status", String.class, 255, null, true));
columns.add(new DBColumnInfo("batch_separation_seconds", Integer.class, null, null, true));
columns.add(new DBColumnInfo("batch_toleration_limit", Integer.class, null, null, true));
columns.add(new DBColumnInfo("create_user", String.class, 255, null, true));
columns.add(new DBColumnInfo("create_timestamp", Long.class, null, null, true));
columns.add(new DBColumnInfo("update_user", String.class, 255, null, true));
columns.add(new DBColumnInfo("update_timestamp", Long.class, null, null, true));
columns.add(new DBColumnInfo("minutes", String.class, 10, null, true));
columns.add(new DBColumnInfo("hours", String.class, 10, null, true));
columns.add(new DBColumnInfo("days_of_month", String.class, 10, null, true));
columns.add(new DBColumnInfo("month", String.class, 10, null, true));
columns.add(new DBColumnInfo("day_of_week", String.class, 10, null, true));
columns.add(new DBColumnInfo("yearToSchedule", String.class, 10, null, true));
columns.add(new DBColumnInfo("startTime", String.class, 50, null, true));
columns.add(new DBColumnInfo("endTime", String.class, 50, null, true));
columns.add(new DBColumnInfo("last_execution_status", String.class, 255, null, true));
dbAccessor.createTable("requestschedule", columns, "schedule_id");
// RequestScheduleBatchRequest
columns.clear();
columns.add(new DBColumnInfo("schedule_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("batch_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("request_id", Long.class, null, null, true));
columns.add(new DBColumnInfo("request_type", String.class, 255, null, true));
columns.add(new DBColumnInfo("request_uri", String.class, 1024, null, true));
columns.add(new DBColumnInfo("request_body", byte[].class, null, null, true));
columns.add(new DBColumnInfo("request_status", String.class, 255, null, true));
columns.add(new DBColumnInfo("return_code", Integer.class, null, null, true));
columns.add(new DBColumnInfo("return_message", String.class, 2000, null, true));
dbAccessor.createTable("requestschedulebatchrequest", columns, "schedule_id", "batch_id");
// HostConfigMapping
columns.clear();
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("host_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("type_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("version_tag", String.class, 255, null, true));
columns.add(new DBColumnInfo("service_name", String.class, 255, null, true));
columns.add(new DBColumnInfo("create_timestamp", Long.class, null, null, false));
columns.add(new DBColumnInfo("selected", Integer.class, 0, null, false));
dbAccessor.createTable("hostconfigmapping", columns, "cluster_id", "host_name", "type_name", "create_timestamp");
// Sequences
columns.clear();
columns.add(new DBColumnInfo("sequence_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("value", Long.class, null, null, false));
dbAccessor.createTable("ambari_sequences", columns, "sequence_name");
// Metainfo
columns.clear();
columns.add(new DBColumnInfo("metainfo_key", String.class, 255, null, false));
columns.add(new DBColumnInfo("metainfo_value", String.class, 255, null, false));
dbAccessor.createTable("metainfo", columns, "metainfo_key");
// ConfigGroup
columns.clear();
columns.add(new DBColumnInfo("group_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("group_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("tag", String.class, 1024, null, false));
columns.add(new DBColumnInfo("description", String.class, 1024, null, true));
columns.add(new DBColumnInfo("create_timestamp", Long.class, null, null, false));
dbAccessor.createTable("configgroup", columns, "group_id");
// ConfigGroupClusterConfigMapping
columns.clear();
columns.add(new DBColumnInfo("config_group_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("config_type", String.class, 255, null, false));
columns.add(new DBColumnInfo("version_tag", String.class, 255, null, false));
columns.add(new DBColumnInfo("user_name", String.class, 255, "_db", true));
columns.add(new DBColumnInfo("create_timestamp", Long.class, null, null, false));
dbAccessor.createTable("confgroupclusterconfigmapping", columns, "config_group_id", "cluster_id", "config_type");
// ConfigGroupHostMapping
columns.clear();
columns.add(new DBColumnInfo("config_group_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("host_name", String.class, 255, null, false));
dbAccessor.createTable("configgrouphostmapping", columns, "config_group_id", "host_name");
// Blueprint
columns.clear();
columns.add(new DBColumnInfo("blueprint_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("stack_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("stack_version", String.class, 255, null, false));
dbAccessor.createTable("blueprint", columns, "blueprint_name");
// Blueprint Config
columns.clear();
columns.add(new DBColumnInfo("blueprint_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("type_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("config_data", byte[].class, null, null, false));
dbAccessor.createTable("blueprint_configuration", columns, "blueprint_name", "type_name");
// HostGroup
columns.clear();
columns.add(new DBColumnInfo("blueprint_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("name", String.class, 255, null, false));
columns.add(new DBColumnInfo("cardinality", String.class, 255, null, false));
dbAccessor.createTable("hostgroup", columns, "blueprint_name", "name");
// HostGroupComponent
columns.clear();
columns.add(new DBColumnInfo("blueprint_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("hostgroup_name", String.class, 255, null, false));
columns.add(new DBColumnInfo("name", String.class, 255, null, false));
dbAccessor.createTable("hostgroup_component", columns, "blueprint_name", "hostgroup_name", "name");
// RequestResourceFilter
columns.clear();
columns.add(new DBColumnInfo("filter_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("request_id", Long.class, null, null, false));
columns.add(new DBColumnInfo("service_name", String.class, 255, null, true));
columns.add(new DBColumnInfo("component_name", String.class, 255, null, true));
columns.add(new DBColumnInfo("hosts", byte[].class, null, null, true));
dbAccessor.createTable("requestresourcefilter", columns, "filter_id");
createQuartzTables();
// ========================================================================
// Add columns
dbAccessor.addColumn("hostcomponentdesiredstate", new DBColumnInfo("maintenance_state", String.class, 32, "OFF", false));
dbAccessor.addColumn("servicedesiredstate", new DBColumnInfo("maintenance_state", String.class, 32, "OFF", false));
dbAccessor.addColumn("hoststate", new DBColumnInfo("maintenance_state", String.class, 512, null, true));
dbAccessor.addColumn("hostcomponentdesiredstate", new DBColumnInfo("admin_state", String.class, 32, null, true));
dbAccessor.addColumn("hosts", new DBColumnInfo("ph_cpu_count", Integer.class, 32, null, true));
dbAccessor.addColumn("clusterstate", new DBColumnInfo("current_stack_version", String.class, 255, null, false));
dbAccessor.addColumn("hostconfigmapping", new DBColumnInfo("user_name", String.class, 255, "_db", false));
dbAccessor.addColumn("stage", new DBColumnInfo("request_context", String.class, 255, null, true));
dbAccessor.addColumn("stage", new DBColumnInfo("cluster_host_info", byte[].class, null, null, true));
dbAccessor.addColumn("clusterconfigmapping", new DBColumnInfo("user_name", String.class, 255, "_db", false));
dbAccessor.addColumn("host_role_command", new DBColumnInfo("end_time", Long.class, null, null, true));
dbAccessor.addColumn("host_role_command", new DBColumnInfo("structured_out", byte[].class, null, null, true));
dbAccessor.addColumn("host_role_command", new DBColumnInfo("command_detail", String.class, 255, null, true));
dbAccessor.addColumn("host_role_command", new DBColumnInfo("custom_command_name", String.class, 255, null, true));
// Alter columns
if (dbType.equals(Configuration.POSTGRES_DB_NAME)) {
if (dbAccessor.tableExists("hostcomponentdesiredconfigmapping")) {
dbAccessor.executeQuery("ALTER TABLE hostcomponentdesiredconfigmapping rename to hcdesiredconfigmapping", true);
}
dbAccessor.executeQuery("ALTER TABLE users ALTER column user_id DROP DEFAULT", true);
dbAccessor.executeQuery("ALTER TABLE users ALTER column ldap_user TYPE INTEGER USING CASE WHEN ldap_user=true THEN 1 ELSE 0 END", true);
}
if (Configuration.ORACLE_DB_NAME.equals(dbType) ||
Configuration.POSTGRES_DB_NAME.equals(dbType)) {
if (dbAccessor.tableHasColumn("hosts", "disks_info")) {
dbAccessor.executeQuery("ALTER TABLE hosts DROP COLUMN disks_info", true);
}
}
//Move tables from ambarirca db to ambari db; drop ambarirca; Mysql
if (dbType.equals(Configuration.MYSQL_DB_NAME)) {
String dbName = configuration.getServerJDBCSchemaName();
moveRCATableInMySQL("workflow", dbName);
moveRCATableInMySQL("job", dbName);
moveRCATableInMySQL("task", dbName);
moveRCATableInMySQL("taskAttempt", dbName);
moveRCATableInMySQL("hdfsEvent", dbName);
moveRCATableInMySQL("mapreduceEvent", dbName);
moveRCATableInMySQL("clusterEvent", dbName);
dbAccessor.executeQuery("DROP DATABASE IF EXISTS ambarirca");
}
//Newly created tables should be filled before creating FKs
// Request Entries
String tableName = "request";
if (!dbAccessor.tableExists(tableName)) {
String msg = String.format("Table \"%s\" was not created during schema upgrade", tableName);
LOG.error(msg);
throw new AmbariException(msg);
} else if (!dbAccessor.tableHasData(tableName)) {
String query;
if (dbType.equals(Configuration.POSTGRES_DB_NAME)) {
query = getPostgresRequestUpgradeQuery();
} else if (dbType.equals(Configuration.ORACLE_DB_NAME)) {
query = getOracleRequestUpgradeQuery();
} else {
query = getMysqlRequestUpgradeQuery();
}
dbAccessor.executeQuery(query);
} else {
LOG.info("Table {} already filled", tableName);
}
// Drop old constraints
// ========================================================================
if (Configuration.POSTGRES_DB_NAME.equals(dbType)
|| Configuration.MYSQL_DB_NAME.equals(dbType)) {
//recreate old constraints to sync with oracle
dbAccessor.dropConstraint("clusterconfigmapping", "FK_clusterconfigmapping_cluster_id");
dbAccessor.dropConstraint("hostcomponentdesiredstate", "FK_hostcomponentdesiredstate_host_name");
dbAccessor.dropConstraint("hostcomponentdesiredstate", "FK_hostcomponentdesiredstate_component_name");
dbAccessor.dropConstraint("hostcomponentstate", "FK_hostcomponentstate_component_name");
dbAccessor.dropConstraint("hostcomponentstate", "FK_hostcomponentstate_host_name");
dbAccessor.dropConstraint("servicecomponentdesiredstate", "FK_servicecomponentdesiredstate_service_name");
dbAccessor.dropConstraint("servicedesiredstate", "FK_servicedesiredstate_service_name");
dbAccessor.dropConstraint("role_success_criteria", "FK_role_success_criteria_stage_id");
dbAccessor.dropConstraint("ClusterHostMapping", "FK_ClusterHostMapping_host_name");
dbAccessor.dropConstraint("ClusterHostMapping", "FK_ClusterHostMapping_cluster_id");
dbAccessor.addFKConstraint("clusterconfigmapping", "clusterconfigmappingcluster_id", "cluster_id", "clusters", "cluster_id", false);
dbAccessor.addFKConstraint("hostcomponentdesiredstate", "hstcmponentdesiredstatehstname", "host_name", "hosts", "host_name", false);
dbAccessor.addFKConstraint("hostcomponentdesiredstate", "hstcmpnntdesiredstatecmpnntnme",
new String[] {"component_name", "cluster_id", "service_name"}, "servicecomponentdesiredstate",
new String[] {"component_name", "cluster_id", "service_name"}, false);
dbAccessor.addFKConstraint("hostcomponentstate", "hstcomponentstatecomponentname",
new String[] {"component_name", "cluster_id", "service_name"}, "servicecomponentdesiredstate",
new String[] {"component_name", "cluster_id", "service_name"}, false);
dbAccessor.addFKConstraint("hostcomponentstate", "hostcomponentstate_host_name", "host_name", "hosts", "host_name", false);
dbAccessor.addFKConstraint("servicecomponentdesiredstate", "srvccmponentdesiredstatesrvcnm",
new String[] {"service_name", "cluster_id"}, "clusterservices",
new String[] {"service_name", "cluster_id"}, false);
dbAccessor.addFKConstraint("servicedesiredstate", "servicedesiredstateservicename",
new String[] {"service_name", "cluster_id"}, "clusterservices",
new String[] {"service_name", "cluster_id"}, false);
dbAccessor.addFKConstraint("role_success_criteria", "role_success_criteria_stage_id",
new String[] {"stage_id", "request_id"}, "stage",
new String[] {"stage_id", "request_id"}, false);
dbAccessor.addFKConstraint("ClusterHostMapping", "ClusterHostMapping_cluster_id", "cluster_id", "clusters", "cluster_id", false);
dbAccessor.addFKConstraint("ClusterHostMapping", "ClusterHostMapping_host_name", "host_name", "hosts", "host_name", false);
//drop new constraints with to sync with oracle
dbAccessor.dropConstraint("confgroupclusterconfigmapping", "FK_confgroupclusterconfigmapping_config_tag", true);
dbAccessor.dropConstraint("confgroupclusterconfigmapping", "FK_confgroupclusterconfigmapping_group_id", true);
dbAccessor.dropConstraint("configgrouphostmapping", "FK_configgrouphostmapping_configgroup_id", true);
dbAccessor.dropConstraint("configgrouphostmapping", "FK_configgrouphostmapping_host_name", true);
}
// ========================================================================
// Add constraints
dbAccessor.addFKConstraint("stage", "FK_stage_request_id", "request_id", "request", "request_id", true);
dbAccessor.addFKConstraint("request", "FK_request_cluster_id", "cluster_id", "clusters", "cluster_id", true);
dbAccessor.addFKConstraint("request", "FK_request_schedule_id", "request_schedule_id", "requestschedule", "schedule_id", true);
dbAccessor.addFKConstraint("requestschedulebatchrequest", "FK_rsbatchrequest_schedule_id", "schedule_id", "requestschedule", "schedule_id", true);
dbAccessor.addFKConstraint("hostconfigmapping", "FK_hostconfmapping_cluster_id", "cluster_id", "clusters", "cluster_id", true);
dbAccessor.addFKConstraint("hostconfigmapping", "FK_hostconfmapping_host_name", "host_name", "hosts", "host_name", true);
dbAccessor.addFKConstraint("configgroup", "FK_configgroup_cluster_id", "cluster_id", "clusters", "cluster_id", true);
dbAccessor.addFKConstraint("confgroupclusterconfigmapping", "FK_confg", new String[] {"version_tag", "config_type", "cluster_id"}, "clusterconfig", new String[] {"version_tag", "type_name", "cluster_id"}, true);
dbAccessor.addFKConstraint("confgroupclusterconfigmapping", "FK_cgccm_gid", "config_group_id", "configgroup", "group_id", true);
dbAccessor.addFKConstraint("configgrouphostmapping", "FK_cghm_cgid", "config_group_id", "configgroup", "group_id", true);
dbAccessor.addFKConstraint("configgrouphostmapping", "FK_cghm_hname", "host_name", "hosts", "host_name", true);
dbAccessor.addFKConstraint("clusterconfigmapping", "FK_clustercfgmap_cluster_id", "cluster_id", "clusters", "cluster_id", true);
dbAccessor.addFKConstraint("requestresourcefilter", "FK_reqresfilter_req_id", "request_id", "request", "request_id", true);
dbAccessor.addFKConstraint("hostgroup", "FK_hostgroup_blueprint_name", "blueprint_name", "blueprint", "blueprint_name", true);
dbAccessor.addFKConstraint("hostgroup_component", "FK_hg_blueprint_name", "blueprint_name", "hostgroup", "blueprint_name", true);
dbAccessor.addFKConstraint("hostgroup_component", "FK_hgc_blueprint_name", "hostgroup_name", "hostgroup", "name", true);
dbAccessor.addFKConstraint("blueprint_configuration", "FK_cfg_blueprint_name", "blueprint_name", "blueprint", "blueprint_name", true);
}
private void moveRCATableInMySQL(String tableName, String dbName) throws SQLException {
if (!dbAccessor.tableExists(tableName)) {
dbAccessor.executeQuery(String.format("RENAME TABLE ambarirca.%s TO %s.%s", tableName, dbName, tableName), true);
}
}
@Override
public void executeDMLUpdates() throws AmbariException, SQLException {
// Service Config mapping
String tableName = "serviceconfigmapping";
String dbType = getDbType();
EntityManager em = getEntityManagerProvider().get();
//unable to do via dao, as they were dropped
//TODO should we move this to ddl and drop unused tables then?
if (dbAccessor.tableExists(tableName)
&& dbAccessor.tableHasData(tableName)
&& dbAccessor.tableExists("clusterconfigmapping")) {
if (dbType.equals(Configuration.POSTGRES_DB_NAME)) {
// Service config mapping entity object will be deleted so need to
// proceed with executing as query
dbAccessor.executeQuery(getPostgresServiceConfigMappingQuery());
dbAccessor.truncateTable(tableName);
} else {
LOG.warn("Unsupported database for service config mapping query. " +
"database = " + dbType);
}
}
// TODO: Convert all possible native queries using Criteria builder
// Sequences
if (dbAccessor.tableExists("ambari_sequences")) {
if (dbType.equals(Configuration.POSTGRES_DB_NAME)) {
try {
dbAccessor.executeQuery(getPostgresSequenceUpgradeQuery());
// Deletes
dbAccessor.dropSequence("host_role_command_task_id_seq");
dbAccessor.dropSequence("users_user_id_seq");
dbAccessor.dropSequence("clusters_cluster_id_seq");
} catch (SQLException sql) {
LOG.warn("Sequence update threw exception. ", sql);
}
}
}
//add new sequences for config groups
//TODO evalate possibility to automatically wrap object names in DBAcessor
String valueColumnName = "\"value\"";
if (Configuration.ORACLE_DB_NAME.equals(dbType) || Configuration.MYSQL_DB_NAME.equals(dbType)) {
valueColumnName = "value";
}
dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, " + valueColumnName + ") " +
"VALUES('configgroup_id_seq', 1)", true);
dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, " + valueColumnName + ") " +
"VALUES('requestschedule_id_seq', 1)", true);
dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, " + valueColumnName + ") " +
"VALUES('resourcefilter_id_seq', 1)", true);
//clear cache due to direct table manipulation
((JpaEntityManager)em.getDelegate()).getServerSession().getIdentityMapAccessor().invalidateAll();
// Updates
// HostComponentState - reverted to native query due to incorrect criteria api usage
// (it forces us to use enums not strings, which were deleted)
executeInTransaction(new Runnable() {
@Override
public void run() {
EntityManager em = getEntityManagerProvider().get();
Query nativeQuery = em.createNativeQuery("UPDATE hostcomponentstate SET current_state=?1 WHERE current_state in (?2, ?3)");
nativeQuery.setParameter(1, "INSTALLED");
nativeQuery.setParameter(2, "STOP_FAILED");
nativeQuery.setParameter(3, "START_FAILED");
nativeQuery.executeUpdate();
}
});
// HostRoleCommand
executeInTransaction(new Runnable() {
@Override
public void run() {
EntityManager em = getEntityManagerProvider().get();
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<HostRoleCommandEntity> c2 = cb.createQuery(HostRoleCommandEntity.class);
Root<HostRoleCommandEntity> hrc = c2.from(HostRoleCommandEntity.class);
List<HostRoleStatus> statuses = new ArrayList<HostRoleStatus>() {{
add(HostRoleStatus.PENDING);
add(HostRoleStatus.QUEUED);
add(HostRoleStatus.IN_PROGRESS);
}};
Expression<String> exp = hrc.get("status");
Predicate predicate = exp.in(statuses);
c2.select(hrc).where(predicate);
TypedQuery<HostRoleCommandEntity> q2 = em.createQuery(c2);
List<HostRoleCommandEntity> r2 = q2.getResultList();
HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
if (r2 != null && !r2.isEmpty()) {
for (HostRoleCommandEntity entity : r2) {
entity.setStatus(HostRoleStatus.ABORTED);
hostRoleCommandDAO.merge(entity);
}
}
}
});
// Stack version changes from HDPLocal to HDP
stackUpgradeUtil.updateStackDetails("HDP", null);
//create cluster state entities if not present
executeInTransaction(new Runnable() {
@Override
public void run() {
ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
ClusterStateDAO clusterStateDAO = injector.getInstance(ClusterStateDAO.class);
List<ClusterEntity> clusterEntities = clusterDAO.findAll();
for (ClusterEntity clusterEntity : clusterEntities) {
if (clusterStateDAO.findByPK(clusterEntity.getClusterId()) == null) {
ClusterStateEntity clusterStateEntity = new ClusterStateEntity();
clusterStateEntity.setClusterEntity(clusterEntity);
clusterStateEntity.setCurrentStackVersion(clusterEntity.getDesiredStackVersion());
clusterStateDAO.create(clusterStateEntity);
clusterEntity.setClusterStateEntity(clusterStateEntity);
clusterDAO.merge(clusterEntity);
}
}
}
});
// add history server on the host where jobtracker is
executeInTransaction(new Runnable() {
@Override
public void run() {
addHistoryServer();
}
});
// Add default log4j configs if they are absent
executeInTransaction(new Runnable() {
@Override
public void run() {
addMissingLog4jConfigs();
}
});
// ========================================================================
// Finally update schema version
updateMetaInfoVersion(getTargetVersion());
// Move decommissioned datanode data to new table
executeInTransaction(new Runnable() {
@Override
public void run() {
try {
processDecommissionedDatanodes();
} catch (Exception e) {
LOG.warn("Updating decommissioned datanodes to new format threw " +
"exception. ", e);
}
}
});
}
protected void addHistoryServer() {
ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
ClusterServiceDAO clusterServiceDAO = injector.getInstance(ClusterServiceDAO.class);
ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO = injector.getInstance(ServiceComponentDesiredStateDAO.class);
List<ClusterEntity> clusterEntities = clusterDAO.findAll();
for (final ClusterEntity clusterEntity : clusterEntities) {
ServiceComponentDesiredStateEntityPK pkHS = new ServiceComponentDesiredStateEntityPK();
pkHS.setComponentName("HISTORYSERVER");
pkHS.setClusterId(clusterEntity.getClusterId());
pkHS.setServiceName("MAPREDUCE");
ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntityHS = serviceComponentDesiredStateDAO.findByPK(pkHS);
// already have historyserver
if(serviceComponentDesiredStateEntityHS != null)
continue;
ServiceComponentDesiredStateEntityPK pkJT = new ServiceComponentDesiredStateEntityPK();
pkJT.setComponentName("JOBTRACKER");
pkJT.setClusterId(clusterEntity.getClusterId());
pkJT.setServiceName("MAPREDUCE");
ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntityJT = serviceComponentDesiredStateDAO.findByPK(pkJT);
// no jobtracker present probably mapreduce is not installed
if(serviceComponentDesiredStateEntityJT == null)
continue;
HostComponentStateEntity jtHostComponentStateEntity = serviceComponentDesiredStateEntityJT.getHostComponentStateEntities().iterator().next();
HostComponentDesiredStateEntity jtHostComponentDesiredStateEntity = serviceComponentDesiredStateEntityJT.getHostComponentDesiredStateEntities().iterator().next();
String jtHostname = jtHostComponentStateEntity.getHostName();
State jtCurrState = jtHostComponentStateEntity.getCurrentState();
State jtHostComponentDesiredState = jtHostComponentDesiredStateEntity.getDesiredState();
State jtServiceComponentDesiredState = serviceComponentDesiredStateEntityJT.getDesiredState();
ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
pk.setClusterId(clusterEntity.getClusterId());
pk.setServiceName("MAPREDUCE");
ClusterServiceEntity clusterServiceEntity = clusterServiceDAO.findByPK(pk);
final ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = new ServiceComponentDesiredStateEntity();
serviceComponentDesiredStateEntity.setComponentName("HISTORYSERVER");
serviceComponentDesiredStateEntity.setDesiredStackVersion(clusterEntity.getDesiredStackVersion());
serviceComponentDesiredStateEntity.setDesiredState(jtServiceComponentDesiredState);
serviceComponentDesiredStateEntity.setClusterServiceEntity(clusterServiceEntity);
serviceComponentDesiredStateEntity.setHostComponentDesiredStateEntities(new ArrayList<HostComponentDesiredStateEntity>());
final HostComponentStateEntity stateEntity = new HostComponentStateEntity();
stateEntity.setHostName(jtHostname);
stateEntity.setCurrentState(jtCurrState);
stateEntity.setCurrentStackVersion(clusterEntity.getDesiredStackVersion());
final HostComponentDesiredStateEntity desiredStateEntity = new HostComponentDesiredStateEntity();
desiredStateEntity.setDesiredState(jtHostComponentDesiredState);
desiredStateEntity.setDesiredStackVersion(clusterEntity.getDesiredStackVersion());
persistComponentEntities(stateEntity, desiredStateEntity, serviceComponentDesiredStateEntity);
}
}
private void persistComponentEntities(HostComponentStateEntity stateEntity, HostComponentDesiredStateEntity desiredStateEntity, ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity) {
ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO = injector.getInstance(ServiceComponentDesiredStateDAO.class);
HostComponentStateDAO hostComponentStateDAO = injector.getInstance(HostComponentStateDAO.class);
HostComponentDesiredStateDAO hostComponentDesiredStateDAO = injector.getInstance(HostComponentDesiredStateDAO.class);
HostDAO hostDAO = injector.getInstance(HostDAO.class);
HostEntity hostEntity = hostDAO.findByName(stateEntity.getHostName());
hostEntity.getHostComponentStateEntities().add(stateEntity);
hostEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
serviceComponentDesiredStateEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
desiredStateEntity.setHostEntity(hostEntity);
stateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
stateEntity.setHostEntity(hostEntity);
hostComponentStateDAO.create(stateEntity);
hostComponentDesiredStateDAO.create(desiredStateEntity);
serviceComponentDesiredStateDAO.create(serviceComponentDesiredStateEntity);
hostDAO.merge(hostEntity);
}
protected void addMissingLog4jConfigs() {
final String log4jConfigTypeContains = "log4j";
final String defaultVersionTag = "version1";
final String defaultUser = "admin";
LOG.debug("Adding missing configs into Ambari DB.");
ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
ClusterServiceDAO clusterServiceDAO = injector.getInstance(ClusterServiceDAO.class);
AmbariMetaInfo ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
Gson gson = injector.getInstance(Gson.class);
List <ClusterEntity> clusterEntities = clusterDAO.findAll();
for (final ClusterEntity clusterEntity : clusterEntities) {
Long clusterId = clusterEntity.getClusterId();
String desiredStackVersion = clusterEntity.getDesiredStackVersion();
Map<String, String> clusterInfo =
gson.<Map<String, String>>fromJson(desiredStackVersion, Map.class);
String stackName = clusterInfo.get("stackName");
String stackVersion = clusterInfo.get("stackVersion");
List<ClusterServiceEntity> clusterServiceEntities = clusterServiceDAO.findAll();
for (final ClusterServiceEntity clusterServiceEntity : clusterServiceEntities) {
String serviceName = clusterServiceEntity.getServiceName();
ServiceInfo serviceInfo = null;
try {
serviceInfo = ambariMetaInfo.getService(stackName, stackVersion, serviceName);
} catch (AmbariException e) {
LOG.error("Service " + serviceName + " not found for " + stackName + stackVersion);
continue;
}
List<String> configTypes = serviceInfo.getConfigDependencies();
if (configTypes != null) {
for (String configType : configTypes) {
if (configType.contains(log4jConfigTypeContains)) {
ClusterConfigEntityPK configEntityPK = new ClusterConfigEntityPK();
configEntityPK.setClusterId(clusterId);
configEntityPK.setType(configType);
configEntityPK.setTag(defaultVersionTag);
ClusterConfigEntity configEntity = clusterDAO.findConfig(configEntityPK);
if (configEntity == null) {
String filename = configType + ".xml";
Map<String, String> properties = new HashMap<String, String>();
for (PropertyInfo propertyInfo : serviceInfo.getProperties()) {
if (filename.equals(propertyInfo.getFilename())) {
properties.put(propertyInfo.getName(), propertyInfo.getValue());
}
}
if (!properties.isEmpty()) {
String configData = gson.toJson(properties);
configEntity = new ClusterConfigEntity();
configEntity.setClusterId(clusterId);
configEntity.setType(configType);
configEntity.setTag(defaultVersionTag);
configEntity.setData(configData);
configEntity.setTimestamp(System.currentTimeMillis());
configEntity.setClusterEntity(clusterEntity);
LOG.debug("Creating new " + configType + " config...");
clusterDAO.createConfig(configEntity);
Collection<ClusterConfigMappingEntity> entities =
clusterEntity.getConfigMappingEntities();
ClusterConfigMappingEntity clusterConfigMappingEntity =
new ClusterConfigMappingEntity();
clusterConfigMappingEntity.setClusterEntity(clusterEntity);
clusterConfigMappingEntity.setClusterId(clusterId);
clusterConfigMappingEntity.setType(configType);
clusterConfigMappingEntity.setCreateTimestamp(
Long.valueOf(System.currentTimeMillis()));
clusterConfigMappingEntity.setSelected(1);
clusterConfigMappingEntity.setUser(defaultUser);
clusterConfigMappingEntity.setVersion(configEntity.getTag());
entities.add(clusterConfigMappingEntity);
clusterDAO.merge(clusterEntity);
}
}
}
}
}
}
}
LOG.debug("Missing configs have been successfully added into Ambari DB.");
}
protected void processDecommissionedDatanodes() {
KeyValueDAO keyValueDAO = injector.getInstance(KeyValueDAO.class);
ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
Gson gson = injector.getInstance(Gson.class);
HostComponentDesiredStateDAO desiredStateDAO = injector.getInstance
(HostComponentDesiredStateDAO.class);
KeyValueEntity keyValueEntity = keyValueDAO.findByKey("decommissionDataNodesTag");
String value = null;
if (keyValueEntity != null) {
value = keyValueEntity.getValue();
if (value != null && !value.isEmpty()) {
List<ClusterEntity> clusterEntities = clusterDAO.findAll();
for (ClusterEntity clusterEntity : clusterEntities) {
Long clusterId = clusterEntity.getClusterId();
ClusterConfigEntityPK configEntityPK = new ClusterConfigEntityPK();
configEntityPK.setClusterId(clusterId);
configEntityPK.setType("hdfs-exclude-file");
configEntityPK.setTag(value.trim());
ClusterConfigEntity configEntity = clusterDAO.findConfig(configEntityPK);
if (configEntity != null) {
String configData = configEntity.getData();
if (configData != null) {
Map<String, String> properties = gson.<Map<String, String>>fromJson(configData, Map.class);
if (properties != null && !properties.isEmpty()) {
String decommissionedNodes = properties.get("datanodes");
if (decommissionedNodes != null) {
String[] nodes = decommissionedNodes.split(",");
if (nodes.length > 0) {
for (String node : nodes) {
HostComponentDesiredStateEntityPK entityPK =
new HostComponentDesiredStateEntityPK();
entityPK.setClusterId(clusterId);
entityPK.setServiceName("HDFS");
entityPK.setComponentName("DATANODE");
entityPK.setHostName(node.trim());
HostComponentDesiredStateEntity desiredStateEntity =
desiredStateDAO.findByPK(entityPK);
desiredStateEntity.setAdminState(HostComponentAdminState.DECOMMISSIONED);
desiredStateDAO.merge(desiredStateEntity);
}
}
}
}
}
}
}
}
// Rename saved key value entity so that the move is finalized
KeyValueEntity newEntity = new KeyValueEntity();
newEntity.setKey("decommissionDataNodesTag-Moved");
newEntity.setValue(value);
keyValueDAO.create(newEntity);
keyValueDAO.remove(keyValueEntity);
}
}
private String getPostgresServiceConfigMappingQuery() {
return "INSERT INTO clusterconfigmapping " +
"(cluster_id, type_name, version_tag, create_timestamp, selected) " +
"(SELECT DISTINCT cluster_id, config_type, config_tag, " +
"cast(date_part('epoch', now()) as bigint), 1 " +
"FROM serviceconfigmapping scm " +
"WHERE timestamp = (SELECT max(timestamp) FROM serviceconfigmapping " +
"WHERE cluster_id = scm.cluster_id AND config_type = scm.config_type))";
}
private String getPostgresSequenceUpgradeQuery() {
return "INSERT INTO ambari_sequences(sequence_name, \"value\") " +
"SELECT 'cluster_id_seq', nextval('clusters_cluster_id_seq') " +
"UNION ALL " +
"SELECT 'user_id_seq', nextval('users_user_id_seq') " +
"UNION ALL " +
"SELECT 'host_role_command_id_seq', COALESCE((SELECT max(task_id) FROM host_role_command), 1) + 50 ";
}
private String getPostgresRequestUpgradeQuery() {
return "insert into ambari.request(request_id, cluster_id, request_context, start_time, end_time, create_time) (\n" +
" select distinct s.request_id, s.cluster_id, s.request_context, coalesce (cmd.start_time, -1), coalesce (cmd.end_time, -1), -1\n" +
" from\n" +
" (select distinct request_id, cluster_id, request_context from ambari.stage ) s\n" +
" left join\n" +
" (select request_id, min(start_time) as start_time, max(end_time) as end_time from ambari.host_role_command group by request_id) cmd\n" +
" on s.request_id=cmd.request_id\n" +
")";
}
private String getOracleRequestUpgradeQuery() {
return "INSERT INTO request" +
"(request_id, cluster_id, request_context, start_time, end_time, create_time) " +
"SELECT DISTINCT s.request_id, s.cluster_id, s.request_context, " +
"nvl(cmd.start_time, -1), nvl(cmd.end_time, -1), -1 " +
"FROM " +
"(SELECT DISTINCT request_id, cluster_id, request_context FROM stage ) s " +
"LEFT JOIN " +
"(SELECT request_id, min(start_time) as start_time, max(end_time) " +
"as end_time FROM host_role_command GROUP BY request_id) cmd " +
"ON s.request_id=cmd.request_id";
}
private String getMysqlRequestUpgradeQuery() {
return "insert into request" +
"(request_id, cluster_id, request_context, start_time, end_time, create_time) " +
"select distinct s.request_id, s.cluster_id, s.request_context, " +
"coalesce (cmd.start_time, -1), coalesce (cmd.end_time, -1), -1 " +
"from " +
"(select distinct request_id, cluster_id, request_context from stage ) s " +
"left join " +
"(select request_id, min(start_time) as start_time, max(end_time) " +
"as end_time from host_role_command group by request_id) cmd " +
"on s.request_id=cmd.request_id";
}
private void createQuartzTables() throws SQLException {
String dbType = getDbType();
// Run script to create quartz tables
String scriptPath = configuration.getResourceDirPath() +
File.separator + "upgrade" + File.separator + "ddl" +
File.separator + String.format(quartzScriptFilePattern, dbType);
try {
dbAccessor.executeScript(scriptPath);
} catch (IOException e) {
LOG.error("Error reading file.", e);
}
}
@Override
public String getTargetVersion() {
return "1.5.0";
}
}