blob: d561141309b3a7bfc83f82c5b8cdc881aa06f337 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import com.zaxxer.hikari.HikariDataSource;
/**
* SQL implementation of {@link FederationStateStore}.
*/
public class SQLFederationStateStore implements FederationStateStore {
public static final Logger LOG =
LoggerFactory.getLogger(SQLFederationStateStore.class);
// Stored procedures patterns
private static final String CALL_SP_REGISTER_SUBCLUSTER =
"{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
private static final String CALL_SP_DEREGISTER_SUBCLUSTER =
"{call sp_deregisterSubCluster(?, ?, ?)}";
private static final String CALL_SP_GET_SUBCLUSTER =
"{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
private static final String CALL_SP_GET_SUBCLUSTERS =
"{call sp_getSubClusters()}";
private static final String CALL_SP_SUBCLUSTER_HEARTBEAT =
"{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER =
"{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}";
private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_updateApplicationHomeSubCluster(?, ?, ?)}";
private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_deleteApplicationHomeSubCluster(?, ?)}";
private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER =
"{call sp_getApplicationHomeSubCluster(?, ?)}";
private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster()}";
private static final String CALL_SP_SET_POLICY_CONFIGURATION =
"{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
private static final String CALL_SP_GET_POLICY_CONFIGURATION =
"{call sp_getPolicyConfiguration(?, ?, ?)}";
private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS =
"{call sp_getPoliciesConfigurations()}";
protected static final String CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER =
"{call sp_addReservationHomeSubCluster(?, ?, ?, ?)}";
protected static final String CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER =
"{call sp_getReservationHomeSubCluster(?, ?)}";
protected static final String CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER =
"{call sp_getReservationsHomeSubCluster()}";
protected static final String CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER =
"{call sp_deleteReservationHomeSubCluster(?, ?)}";
protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER =
"{call sp_updateReservationHomeSubCluster(?, ?, ?)}";
private Calendar utcCalendar =
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
// SQL database configurations
private String userName;
private String password;
private String driverClass;
private String url;
private int maximumPoolSize;
private HikariDataSource dataSource = null;
private final Clock clock = new MonotonicClock();
@VisibleForTesting
Connection conn = null;
@Override
public void init(Configuration conf) throws YarnException {
driverClass =
conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS);
maximumPoolSize =
conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS);
// An helper method avoids to assign a null value to these property
userName = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME);
password = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD);
url = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL);
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
FederationStateStoreUtils.logAndThrowException(LOG,
"Driver class not found.", e);
}
// Create the data source to pool connections in a thread-safe manner
dataSource = new HikariDataSource();
dataSource.setDataSourceClassName(driverClass);
FederationStateStoreUtils.setUsername(dataSource, userName);
FederationStateStoreUtils.setPassword(dataSource, password);
FederationStateStoreUtils.setProperty(dataSource,
FederationStateStoreUtils.FEDERATION_STORE_URL, url);
dataSource.setMaximumPoolSize(maximumPoolSize);
LOG.info("Initialized connection pool to the Federation StateStore "
+ "database at address: " + url);
try {
conn = getConnection();
LOG.debug("Connection created");
} catch (SQLException e) {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Not able to get Connection", e);
}
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest registerSubClusterRequest)
throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(registerSubClusterRequest);
CallableStatement cstmt = null;
SubClusterInfo subClusterInfo =
registerSubClusterRequest.getSubClusterInfo();
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
try {
cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, subClusterInfo.getAMRMServiceAddress());
cstmt.setString(3, subClusterInfo.getClientRMServiceAddress());
cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress());
cstmt.setString(5, subClusterInfo.getRMWebServiceAddress());
cstmt.setString(6, subClusterInfo.getState().toString());
cstmt.setLong(7, subClusterInfo.getLastStartTime());
cstmt.setString(8, subClusterInfo.getCapability());
cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new subcluster into FederationStateStore
if (cstmt.getInt(9) == 0) {
String errMsg = "SubCluster " + subClusterId
+ " was not registered into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(9) != 1) {
String errMsg = "Wrong behavior during registration of SubCluster "
+ subClusterId + " into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info(
"Registered the SubCluster " + subClusterId + " into the StateStore");
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to register the SubCluster " + subClusterId
+ " into the StateStore",
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SubClusterRegisterResponse.newInstance();
}
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest subClusterDeregisterRequest)
throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(subClusterDeregisterRequest);
CallableStatement cstmt = null;
SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
SubClusterState state = subClusterDeregisterRequest.getState();
try {
cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, state.toString());
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not deregister the subcluster into FederationStateStore
if (cstmt.getInt(3) == 0) {
String errMsg = "SubCluster " + subClusterId + " not found";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(3) != 1) {
String errMsg = "Wrong behavior during deregistration of SubCluster "
+ subClusterId + " from the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+ state.toString());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to deregister the sub-cluster " + subClusterId + " state to "
+ state.toString(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SubClusterDeregisterResponse.newInstance();
}
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest subClusterHeartbeatRequest)
throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator
.validate(subClusterHeartbeatRequest);
CallableStatement cstmt = null;
SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
SubClusterState state = subClusterHeartbeatRequest.getState();
try {
cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
cstmt.setString(2, state.toString());
cstmt.setString(3, subClusterHeartbeatRequest.getCapability());
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the subcluster into FederationStateStore
if (cstmt.getInt(4) == 0) {
String errMsg = "SubCluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(4) != 1) {
String errMsg =
"Wrong behavior during the heartbeat of SubCluster " + subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Heartbeated the StateStore for the specified SubCluster "
+ subClusterId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to heartbeat the StateStore for the specified SubCluster "
+ subClusterId,
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SubClusterHeartbeatResponse.newInstance();
}
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest subClusterRequest) throws YarnException {
// Input validator
FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
CallableStatement cstmt = null;
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = subClusterRequest.getSubClusterId();
try {
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER);
cstmt.setString(1, subClusterId.getId());
// Set the parameters for the stored procedure
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(4, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(5, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP);
cstmt.registerOutParameter(7, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(8, java.sql.Types.BIGINT);
cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
String amRMAddress = cstmt.getString(2);
String clientRMAddress = cstmt.getString(3);
String rmAdminAddress = cstmt.getString(4);
String webAppAddress = cstmt.getString(5);
// first check if the subCluster exists
if((amRMAddress == null) || (clientRMAddress == null)) {
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
return null;
}
Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar);
long lastHeartBeat =
heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
SubClusterState state = SubClusterState.fromString(cstmt.getString(7));
long lastStartTime = cstmt.getLong(8);
String capability = cstmt.getString(9);
subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
lastStartTime, capability);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
.checkSubClusterInfo(subClusterInfo);
} catch (FederationStateStoreInvalidInputException e) {
String errMsg =
"SubCluster " + subClusterId.toString() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.debug("Got the information about the specified SubCluster {}",
subClusterInfo);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the SubCluster information for " + subClusterId, e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetSubClusterInfoResponse.newInstance(subClusterInfo);
}
@Override
public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest subClustersRequest) throws YarnException {
CallableStatement cstmt = null;
ResultSet rs = null;
List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
try {
cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
String subClusterName = rs.getString(1);
String amRMAddress = rs.getString(2);
String clientRMAddress = rs.getString(3);
String rmAdminAddress = rs.getString(4);
String webAppAddress = rs.getString(5);
long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime();
SubClusterState state = SubClusterState.fromString(rs.getString(7));
long lastStartTime = rs.getLong(8);
String capability = rs.getString(9);
SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
lastHeartBeat, state, lastStartTime, capability);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
.checkSubClusterInfo(subClusterInfo);
} catch (FederationStateStoreInvalidInputException e) {
String errMsg =
"SubCluster " + subClusterId.toString() + " is not valid";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Filter the inactive
if (!subClustersRequest.getFilterInactiveSubClusters()
|| subClusterInfo.getState().isActive()) {
subClusters.add(subClusterInfo);
}
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the SubClusters ", e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetSubClustersInfoResponse.newInstance(subClusters);
}
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
// Input validator
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
String subClusterHome = null;
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
SubClusterId subClusterId =
request.getApplicationHomeSubCluster().getHomeSubCluster();
try {
cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
cstmt.setString(2, subClusterId.getId());
cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
subClusterHome = cstmt.getString(3);
SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
// For failover reason, we check the returned SubClusterId.
// If it is equal to the subclusterId we sent, the call added the new
// application into FederationStateStore. If the call returns a different
// SubClusterId it means we already tried to insert this application but a
// component (Router/StateStore/RM) failed during the submission.
if (subClusterId.equals(subClusterIdHome)) {
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new application into FederationStateStore
if (cstmt.getInt(4) == 0) {
LOG.info(
"The application {} was not inserted in the StateStore because it"
+ " was already present in SubCluster {}",
appId, subClusterHome);
} else if (cstmt.getInt(4) != 1) {
// Check the ROWCOUNT value, if it is different from 1 it means the
// call had a wrong behavior. Maybe the database is not set correctly.
String errMsg = "Wrong behavior during the insertion of SubCluster "
+ subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Insert into the StateStore the application: " + appId
+ " in SubCluster: " + subClusterHome);
} else {
// Check the ROWCOUNT value, if it is different from 0 it means the call
// did edited the table
if (cstmt.getInt(4) != 0) {
String errMsg =
"The application " + appId + " does exist but was overwritten";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Application: " + appId + " already present with SubCluster: "
+ subClusterHome);
}
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated application "
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return AddApplicationHomeSubClusterResponse
.newInstance(SubClusterId.newInstance(subClusterHome));
}
@Override
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
// Input validator
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
SubClusterId subClusterId =
request.getApplicationHomeSubCluster().getHomeSubCluster();
try {
cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
cstmt.setString(2, subClusterId.getId());
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the application into FederationStateStore
if (cstmt.getInt(3) == 0) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(3) != 1) {
String errMsg =
"Wrong behavior during the update of SubCluster " + subClusterId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info(
"Update the SubCluster to {} for application {} in the StateStore",
subClusterId, appId);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to update the application "
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
// Input validator
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
SubClusterId homeRM = null;
try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
if (cstmt.getString(2) != null) {
homeRM = SubClusterId.newInstance(cstmt.getString(2));
} else {
String errMsg =
"Application " + request.getApplicationId() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.debug("Got the information about the specified application {}."
+ " The AM is running in {}", request.getApplicationId(), homeRM);
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the application information "
+ "for the specified application " + request.getApplicationId(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetApplicationHomeSubClusterResponse
.newInstance(request.getApplicationId(), homeRM);
}
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
CallableStatement cstmt = null;
ResultSet rs = null;
List<ApplicationHomeSubCluster> appsHomeSubClusters =
new ArrayList<ApplicationHomeSubCluster>();
try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
String applicationId = rs.getString(1);
String homeSubCluster = rs.getString(2);
appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
ApplicationId.fromString(applicationId),
SubClusterId.newInstance(homeSubCluster)));
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the applications ", e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetApplicationsHomeSubClusterResponse
.newInstance(appsHomeSubClusters);
}
@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
// Input validator
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
try {
cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not delete the application from FederationStateStore
if (cstmt.getInt(2) == 0) {
String errMsg =
"Application " + request.getApplicationId() + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(2) != 1) {
String errMsg = "Wrong behavior during deleting the application "
+ request.getApplicationId();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Delete from the StateStore the application: {}",
request.getApplicationId());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to delete the application " + request.getApplicationId(), e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return DeleteApplicationHomeSubClusterResponse.newInstance();
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
// Input validator
FederationPolicyStoreInputValidator.validate(request);
CallableStatement cstmt = null;
SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
try {
cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getQueue());
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check if the output it is a valid policy
if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
subClusterPolicyConfiguration =
SubClusterPolicyConfiguration.newInstance(request.getQueue(),
cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3)));
LOG.debug("Selected from StateStore the policy for the queue: {}",
subClusterPolicyConfiguration);
} else {
LOG.warn("Policy for queue: {} does not exist.", request.getQueue());
return null;
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to select the policy for the queue :" + request.getQueue(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetSubClusterPolicyConfigurationResponse
.newInstance(subClusterPolicyConfiguration);
}
@Override
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
// Input validator
FederationPolicyStoreInputValidator.validate(request);
CallableStatement cstmt = null;
SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
try {
cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, policyConf.getQueue());
cstmt.setString(2, policyConf.getType());
cstmt.setBytes(3, getByteArray(policyConf.getParams()));
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new policy into FederationStateStore
if (cstmt.getInt(4) == 0) {
String errMsg = "The policy " + policyConf.getQueue()
+ " was not insert into the StateStore";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check the ROWCOUNT value, if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
if (cstmt.getInt(4) != 1) {
String errMsg =
"Wrong behavior during insert the policy " + policyConf.getQueue();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
LOG.info("Insert into the state store the policy for the queue: "
+ policyConf.getQueue());
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated policy for the queue :"
+ policyConf.getQueue(),
e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SetSubClusterPolicyConfigurationResponse.newInstance();
}
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
CallableStatement cstmt = null;
ResultSet rs = null;
List<SubClusterPolicyConfiguration> policyConfigurations =
new ArrayList<SubClusterPolicyConfiguration>();
try {
cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
String queue = rs.getString(1);
String type = rs.getString(2);
byte[] policyInfo = rs.getBytes(3);
SubClusterPolicyConfiguration subClusterPolicyConfiguration =
SubClusterPolicyConfiguration.newInstance(queue, type,
ByteBuffer.wrap(policyInfo));
policyConfigurations.add(subClusterPolicyConfiguration);
}
FederationStateStoreClientMetrics
.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the policy information for all the queues.", e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetSubClusterPoliciesConfigurationsResponse
.newInstance(policyConfigurations);
}
@Override
public Version getCurrentVersion() {
throw new NotImplementedException("Code is not implemented");
}
@Override
public Version loadVersion() {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void close() throws Exception {
if (dataSource != null) {
dataSource.close();
LOG.debug("Connection closed");
FederationStateStoreClientMetrics.decrConnections();
}
}
/**
* Get a connection from the DataSource pool.
*
* @return a connection from the DataSource pool.
* @throws SQLException on failure
*/
@VisibleForTesting
protected Connection getConnection() throws SQLException {
FederationStateStoreClientMetrics.incrConnections();
return dataSource.getConnection();
}
@VisibleForTesting
protected CallableStatement getCallableStatement(String procedure)
throws SQLException {
return conn.prepareCall(procedure);
}
private static byte[] getByteArray(ByteBuffer bb) {
byte[] ba = new byte[bb.limit()];
bb.get(ba);
return ba;
}
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
SubClusterId subClusterHomeId = null;
try {
// Defined the sp_addReservationHomeSubCluster procedure
// this procedure requires 4 parameters
// Input parameters
// 1)IN reservationId_IN varchar(128)
// 2)IN homeSubCluster_IN varchar(256)
// Output parameters
// 3)OUT storedHomeSubCluster_OUT varchar(256)
// 4)OUT rowCount_OUT int
// Call procedure
cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1)IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2)IN homeSubCluster_IN varchar(256)
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
// 3) OUT storedHomeSubCluster_OUT varchar(256)
cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
// 4) OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
// Get SubClusterHome
String subClusterHomeIdString = cstmt.getString("storedHomeSubCluster_OUT");
subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
// Get rowCount
int rowCount = cstmt.getInt("rowCount_OUT");
// For failover reason, we check the returned subClusterId.
// 1.If it is equal to the subClusterId we sent, the call added the new
// reservation into FederationStateStore.
// 2.If the call returns a different subClusterId
// it means we already tried to insert this reservation
// but a component (Router/StateStore/RM) failed during the submission.
if (subClusterId.equals(subClusterHomeId)) {
// if it is equal to 0
// it means the call did not add a new reservation into FederationStateStore.
if (rowCount == 0) {
LOG.info("The reservation {} was not inserted in the StateStore because it" +
" was already present in subCluster {}", reservationId, subClusterHomeId);
} else if (rowCount != 1) {
// if it is different from 1
// it means the call had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during the insertion of subCluster %s according to reservation %s. " +
"The database expects to insert 1 record, but the number of " +
"inserted changes is greater than 1, " +
"please check the records of the database.",
subClusterId, reservationId);
}
} else {
// If it is different from 0,
// it means that there is a data situation that does not meet the expectations,
// and an exception should be thrown at this time
if (rowCount != 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"The reservation %s does exist but was overwritten.", reservationId);
}
LOG.info("Reservation: {} already present with subCluster: {}.",
reservationId, subClusterHomeId);
}
// Record successful call time
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to insert the newly generated reservation %s to subCluster %s.",
reservationId, subClusterId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
}
@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationId reservationId = request.getReservationId();
SubClusterId subClusterId = null;
try {
// Defined the sp_getReservationHomeSubCluster procedure
// this procedure requires 2 parameters
// Input parameters
// 1)IN reservationId_IN varchar(128)
// Output parameters
// 2)OUT homeSubCluster_OUT varchar(256)
cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1)IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2)OUT homeSubCluster_OUT varchar(256)
cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
// Execute the query
long startTime = clock.getTime();
cstmt.execute();
long stopTime = clock.getTime();
// Get Result
String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
if (StringUtils.isNotBlank(subClusterHomeIdString)) {
subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
} else {
// If subClusterHomeIdString blank, we need to throw an exception
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Reservation %s does not exist", reservationId);
}
LOG.info("Got the information about the specified reservation {} in subCluster = {}.",
reservationId, subClusterId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
ReservationHomeSubCluster homeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the reservation information according to %s.", reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException(
"Unable to obtain the reservation information according to " + reservationId);
}
@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
CallableStatement cstmt = null;
ResultSet rs = null;
List<ReservationHomeSubCluster> reservationsHomeSubClusters = new ArrayList<>();
try {
// Defined the sp_getReservationsHomeSubCluster procedure
// This procedure requires no input parameters, but will have 2 output parameters
// Output parameters
// 1)OUT reservationId
// 2)OUT homeSubCluster
cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
while (rs.next()) {
// Extract the output for each tuple
// 1)OUT reservationId
String dbReservationId = rs.getString("reservationId");
// 2)OUT homeSubCluster
String dbHomeSubCluster = rs.getString("homeSubCluster");
// Generate parameters
ReservationId reservationId = ReservationId.parseReservationId(dbReservationId);
SubClusterId homeSubCluster = SubClusterId.newInstance(dbHomeSubCluster);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
reservationsHomeSubClusters.add(reservationHomeSubCluster);
}
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return GetReservationsHomeSubClusterResponse.newInstance(
reservationsHomeSubClusters);
} catch (Exception e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the reservations.", e);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
throw new YarnException("Unable to obtain the information for all the reservations.");
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationId reservationId = request.getReservationId();
try {
// Defined the sp_deleteReservationHomeSubCluster procedure
// This procedure requires 1 input parameters, 1 output parameters
// Input parameters
// 1)IN reservationId_IN varchar(128)
// Output parameters
// 2)OUT rowCount_OUT int
cstmt = getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1)IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2)OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
int rowCount = cstmt.getInt("rowCount_OUT");
// if it is equal to 0 it means the call
// did not delete the reservation from FederationStateStore
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist", reservationId);
} else if (rowCount != 1) {
// if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during deleting the reservation %s. " +
"The database is expected to delete 1 record, " +
"but the number of deleted records returned by the database is greater than 1, " +
"indicating that a duplicate reservationId occurred during the deletion process.",
reservationId);
}
LOG.info("Delete from the StateStore the reservation: {}.", reservationId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return DeleteReservationHomeSubClusterResponse.newInstance();
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to delete the reservation %s.", reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException("Unable to delete the reservation " + reservationId);
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
// validate
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
try {
// Defined the sp_updateReservationHomeSubCluster procedure
// This procedure requires 2 input parameters, 1 output parameters
// Input parameters
// 1)IN reservationId_IN varchar(128)
// 2)IN homeSubCluster_IN varchar(256)
// Output parameters
// 3)OUT rowCount_OUT int
cstmt = getCallableStatement(CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
// 1)IN reservationId_IN varchar(128)
cstmt.setString("reservationId_IN", reservationId.toString());
// 2)IN homeSubCluster_IN varchar(256)
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
// 3)OUT rowCount_OUT int
cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
// Execute the query
long startTime = clock.getTime();
cstmt.executeUpdate();
long stopTime = clock.getTime();
int rowCount = cstmt.getInt("rowCount_OUT");
// if it is equal to 0 it means the call
// did not update the reservation into FederationStateStore
if (rowCount == 0) {
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist", reservationId);
} else if (rowCount != 1) {
// if it is different from 1 it means the call
// had a wrong behavior. Maybe the database is not set correctly.
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Wrong behavior during update the subCluster %s according to reservation %s. " +
"The database is expected to update 1 record, " +
"but the number of database update records is greater than 1, " +
"the records of the database should be checked.",
subClusterId, reservationId);
}
LOG.info("Update the subCluster to {} for reservation {} in the StateStore.",
subClusterId, reservationId);
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return UpdateReservationHomeSubClusterResponse.newInstance();
} catch (SQLException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to update the subCluster %s according to reservation %s.",
subClusterId, reservationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
throw new YarnException(
"Unable to update the subCluster " + subClusterId +
" according to reservation" + reservationId);
}
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
}