blob: 289a3a61126007ac099bdac2bc51ee3b689b3f18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HSQLDB implementation of {@link FederationStateStore}.
*/
public class HSQLDBFederationStateStore extends SQLFederationStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
private Connection conn;
private static final String TABLE_APPLICATIONSHOMESUBCLUSTER =
" CREATE TABLE applicationsHomeSubCluster ("
+ " applicationId varchar(64) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL,"
+ " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
private static final String TABLE_MEMBERSHIP =
"CREATE TABLE membership ( subClusterId varchar(256) NOT NULL,"
+ " amRMServiceAddress varchar(256) NOT NULL,"
+ " clientRMServiceAddress varchar(256) NOT NULL,"
+ " rmAdminServiceAddress varchar(256) NOT NULL,"
+ " rmWebServiceAddress varchar(256) NOT NULL,"
+ " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL,"
+ " lastStartTime bigint NULL, capability varchar(6000) NOT NULL,"
+ " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))";
private static final String TABLE_POLICIES =
"CREATE TABLE policies ( queue varchar(256) NOT NULL,"
+ " policyType varchar(256) NOT NULL, params varbinary(512),"
+ " CONSTRAINT pk_queue PRIMARY KEY (queue))";
private static final String SP_REGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_registerSubCluster("
+ " IN subClusterId_IN varchar(256),"
+ " IN amRMServiceAddress_IN varchar(256),"
+ " IN clientRMServiceAddress_IN varchar(256),"
+ " IN rmAdminServiceAddress_IN varchar(256),"
+ " IN rmWebServiceAddress_IN varchar(256),"
+ " IN state_IN varchar(256),"
+ " IN lastStartTime_IN bigint, IN capability_IN varchar(6000),"
+ " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);"
+ " INSERT INTO membership ( subClusterId,"
+ " amRMServiceAddress, clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress,"
+ " lastHeartBeat, state, lastStartTime,"
+ " capability) VALUES ( subClusterId_IN,"
+ " amRMServiceAddress_IN, clientRMServiceAddress_IN,"
+ " rmAdminServiceAddress_IN, rmWebServiceAddress_IN,"
+ " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE,"
+ " state_IN, lastStartTime_IN, capability_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_DEREGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_deregisterSubCluster("
+ " IN subClusterId_IN varchar(256),"
+ " IN state_IN varchar(64), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE membership SET state = state_IN WHERE ("
+ " subClusterId = subClusterId_IN AND state != state_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_SUBCLUSTERHEARTBEAT =
"CREATE PROCEDURE sp_subClusterHeartbeat("
+ " IN subClusterId_IN varchar(256), IN state_IN varchar(64),"
+ " IN capability_IN varchar(6000), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership"
+ " SET capability = capability_IN, state = state_IN,"
+ " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'"
+ " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETSUBCLUSTER =
"CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256),"
+ " OUT amRMServiceAddress_OUT varchar(256),"
+ " OUT clientRMServiceAddress_OUT varchar(256),"
+ " OUT rmAdminServiceAddress_OUT varchar(256),"
+ " OUT rmWebServiceAddress_OUT varchar(256),"
+ " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64),"
+ " OUT lastStartTime_OUT bigint,"
+ " OUT capability_OUT varchar(6000))"
+ " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress,"
+ " clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress,"
+ " lastHeartBeat, state, lastStartTime, capability"
+ " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT,"
+ " rmAdminServiceAddress_OUT,"
+ " rmWebServiceAddress_OUT, lastHeartBeat_OUT,"
+ " state_OUT, lastStartTime_OUT, capability_OUT"
+ " FROM membership WHERE subClusterId = subClusterId_IN; END";
private static final String SP_GETSUBCLUSTERS =
"CREATE PROCEDURE sp_getSubClusters()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat,"
+ " state, lastStartTime, capability"
+ " FROM membership; OPEN result; END";
private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256),"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO applicationsHomeSubCluster "
+ " (applicationId,homeSubCluster) "
+ " (SELECT applicationId_IN, homeSubCluster_IN"
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationID_IN; END";
private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE applicationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN"
+ " WHERE applicationId = applicationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " OUT homeSubCluster_OUT varchar(256))"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT homeSubCluster INTO homeSubCluster_OUT"
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationID_IN; END";
private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT applicationId, homeSubCluster"
+ " FROM applicationsHomeSubCluster; OPEN result; END";
private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_SETPOLICYCONFIGURATION =
"CREATE PROCEDURE sp_setPolicyConfiguration("
+ " IN queue_IN varchar(256), IN policyType_IN varchar(256),"
+ " IN params_IN varbinary(512), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM policies WHERE queue = queue_IN;"
+ " INSERT INTO policies (queue, policyType, params)"
+ " VALUES (queue_IN, policyType_IN, params_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETPOLICYCONFIGURATION =
"CREATE PROCEDURE sp_getPolicyConfiguration("
+ " IN queue_IN varchar(256), OUT policyType_OUT varchar(256),"
+ " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT policyType, params INTO policyType_OUT, params_OUT"
+ " FROM policies WHERE queue = queue_IN; END";
private static final String SP_GETPOLICIESCONFIGURATIONS =
"CREATE PROCEDURE sp_getPoliciesConfigurations()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT * FROM policies; OPEN result; END";
@Override
public void init(Configuration conf) {
try {
super.init(conf);
} catch (YarnException e1) {
LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
}
try {
conn = getConnection();
LOG.info("Database Init: Start");
conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(TABLE_MEMBERSHIP).execute();
conn.prepareStatement(TABLE_POLICIES).execute();
conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute();
conn.prepareStatement(SP_GETSUBCLUSTER).execute();
conn.prepareStatement(SP_GETSUBCLUSTERS).execute();
conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute();
conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
LOG.info("Database Init: Complete");
conn.close();
} catch (SQLException e) {
LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
}
}
public void closeConnection() {
try {
conn.close();
} catch (SQLException e) {
LOG.error(
"ERROR: failed to close connection to HSQLDB DB " + e.getMessage());
}
}
}