blob: 0ec8e5de6f36136fc0428eee7b0c45896879b3e8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.utils;
import java.net.URI;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to validate the inputs to
* {@code FederationMembershipStateStore}, allows a fail fast mechanism for
* invalid user inputs.
*
*/
public final class FederationMembershipStateStoreInputValidator {
private static final Logger LOG = LoggerFactory
.getLogger(FederationMembershipStateStoreInputValidator.class);
private FederationMembershipStateStoreInputValidator() {
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link SubClusterRegisterRequest} for
* registration a new subcluster is valid or not.
*
* @param request the {@link SubClusterRegisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(SubClusterRegisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
if (request == null) {
String message = "Missing SubClusterRegister Request."
+ " Please try again by specifying a"
+ " SubCluster Register Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate subcluster info
checkSubClusterInfo(request.getSubClusterInfo());
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link SubClusterDeregisterRequest} for
* deregistration a subcluster is valid or not.
*
* @param request the {@link SubClusterDeregisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(SubClusterDeregisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
if (request == null) {
String message = "Missing SubClusterDeregister Request."
+ " Please try again by specifying a"
+ " SubCluster Deregister Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate subcluster id
checkSubClusterId(request.getSubClusterId());
// validate subcluster state
checkSubClusterState(request.getState());
if (!request.getState().isFinal()) {
String message = "Invalid non-final state: " + request.getState();
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link SubClusterHeartbeatRequest} for
* heartbeating a subcluster is valid or not.
*
* @param request the {@link SubClusterHeartbeatRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(SubClusterHeartbeatRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
if (request == null) {
String message = "Missing SubClusterHeartbeat Request."
+ " Please try again by specifying a"
+ " SubCluster Heartbeat Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate subcluster id
checkSubClusterId(request.getSubClusterId());
// validate last heartbeat timestamp
checkTimestamp(request.getLastHeartBeat());
// validate subcluster capability
checkCapability(request.getCapability());
// validate subcluster state
checkSubClusterState(request.getState());
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link GetSubClusterInfoRequest} for querying
* subcluster's information is valid or not.
*
* @param request the {@link GetSubClusterInfoRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(GetSubClusterInfoRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
if (request == null) {
String message = "Missing GetSubClusterInfo Request."
+ " Please try again by specifying a Get SubCluster information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate subcluster id
checkSubClusterId(request.getSubClusterId());
}
/**
* Validate if all the required fields on {@link SubClusterInfo} are present
* or not. {@code Capability} will be empty as the corresponding
* {@code ResourceManager} is in the process of initialization during
* registration.
*
* @param subClusterInfo the information of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
* are invalid
*/
public static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
throws FederationStateStoreInvalidInputException {
if (subClusterInfo == null) {
String message = "Missing SubCluster Information."
+ " Please try again by specifying SubCluster Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate subcluster id
checkSubClusterId(subClusterInfo.getSubClusterId());
// validate AMRM Service address
checkAddress(subClusterInfo.getAMRMServiceAddress());
// validate ClientRM Service address
checkAddress(subClusterInfo.getClientRMServiceAddress());
// validate RMClient Service address
checkAddress(subClusterInfo.getRMAdminServiceAddress());
// validate RMWeb Service address
checkAddress(subClusterInfo.getRMWebServiceAddress());
// validate last heartbeat timestamp
checkTimestamp(subClusterInfo.getLastHeartBeat());
// validate last start timestamp
checkTimestamp(subClusterInfo.getLastStartTime());
// validate subcluster state
checkSubClusterState(subClusterInfo.getState());
}
/**
* Validate if the timestamp is positive or not.
*
* @param timestamp the timestamp to be verified
* @throws FederationStateStoreInvalidInputException if the timestamp is
* invalid
*/
private static void checkTimestamp(long timestamp)
throws FederationStateStoreInvalidInputException {
if (timestamp < 0) {
String message = "Invalid timestamp information."
+ " Please try again by specifying valid Timestamp Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Validate if the Capability is present or not.
*
* @param capability the capability of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the capability is
* invalid
*/
private static void checkCapability(String capability)
throws FederationStateStoreInvalidInputException {
if (capability == null || capability.isEmpty()) {
String message = "Invalid capability information."
+ " Please try again by specifying valid Capability Information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Validate if the SubCluster Id is present or not.
*
* @param subClusterId the identifier of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the SubCluster Id is
* invalid
*/
protected static void checkSubClusterId(SubClusterId subClusterId)
throws FederationStateStoreInvalidInputException {
// check if cluster id is present
if (subClusterId == null) {
String message = "Missing SubCluster Id information."
+ " Please try again by specifying Subcluster Id information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// check if cluster id is valid
if (subClusterId.getId().isEmpty()) {
String message = "Invalid SubCluster Id information."
+ " Please try again by specifying valid Subcluster Id.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Validate if the SubCluster Address is a valid URL or not.
*
* @param address the endpoint of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the address is invalid
*/
private static void checkAddress(String address)
throws FederationStateStoreInvalidInputException {
// Ensure url is not null
if (address == null || address.isEmpty()) {
String message = "Missing SubCluster Endpoint information."
+ " Please try again by specifying SubCluster Endpoint information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// Validate url is well formed
boolean hasScheme = address.contains("://");
URI uri = null;
try {
uri = hasScheme ? URI.create(address)
: URI.create("dummyscheme://" + address);
} catch (IllegalArgumentException e) {
String message = "The provided SubCluster Endpoint does not contain a"
+ " valid host:port authority: " + address;
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
String host = uri.getHost();
int port = uri.getPort();
String path = uri.getPath();
if ((host == null) || (port < 0)
|| (!hasScheme && path != null && !path.isEmpty())) {
String message = "The provided SubCluster Endpoint does not contain a"
+ " valid host:port authority: " + address;
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Validate if the SubCluster State is present or not.
*
* @param state the state of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the SubCluster State
* is invalid
*/
private static void checkSubClusterState(SubClusterState state)
throws FederationStateStoreInvalidInputException {
// check sub-cluster state is not empty
if (state == null) {
String message = "Missing SubCluster State information."
+ " Please try again by specifying SubCluster State information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
}