blob: fb0831a177bcda62593912fa97059290db1556a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ReservationInputValidator {
private final Clock clock;
/**
* Utility class to validate reservation requests.
*/
public ReservationInputValidator(Clock clock) {
this.clock = clock;
}
private Plan validateReservation(ReservationSystem reservationSystem,
ReservationId reservationId, String auditConstant) throws YarnException {
String message = "";
// check if the reservation id is valid
if (reservationId == null) {
message =
"Missing reservation id."
+ " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queueName = reservationSystem.getQueueForReservation(reservationId);
if (queueName == null) {
message =
"The specified reservation with ID: " + reservationId
+ " is unknown. Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
message =
"The specified reservation: " + reservationId
+ " is not associated with any valid plan."
+ " Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
return plan;
}
private void validateReservationDefinition(ReservationId reservationId,
ReservationDefinition contract, Plan plan, String auditConstant)
throws YarnException {
String message = "";
// check if deadline is in the past
if (contract == null) {
message =
"Missing reservation definition."
+ " Please try again by specifying a reservation definition.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
if (contract.getDeadline() <= clock.getTime()) {
message =
"The specified deadline: " + contract.getDeadline()
+ " is the past. Please try again with deadline in the future.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// Check if at least one RR has been specified
ReservationRequests resReqs = contract.getReservationRequests();
if (resReqs == null) {
message =
"No resources have been specified to reserve."
+ "Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
List<ReservationRequest> resReq = resReqs.getReservationResources();
if (resReq == null || resReq.isEmpty()) {
message =
"No resources have been specified to reserve."
+ " Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// compute minimum duration and max gang size
long minDuration = 0;
Resource maxGangSize = Resource.newInstance(0, 0);
ReservationRequestInterpreter type =
contract.getReservationRequests().getInterpreter();
for (ReservationRequest rr : resReq) {
if (type == ReservationRequestInterpreter.R_ALL
|| type == ReservationRequestInterpreter.R_ANY) {
minDuration = Math.max(minDuration, rr.getDuration());
} else {
minDuration += rr.getDuration();
}
maxGangSize =
Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
maxGangSize,
Resources.multiply(rr.getCapability(), rr.getConcurrency()));
}
// verify the allocation is possible (skip for ANY)
if (contract.getDeadline() - contract.getArrival() < minDuration
&& type != ReservationRequestInterpreter.R_ANY) {
message =
"The time difference ("
+ (contract.getDeadline() - contract.getArrival())
+ ") between arrival (" + contract.getArrival() + ") "
+ "and deadline (" + contract.getDeadline() + ") must "
+ " be greater or equal to the minimum resource duration ("
+ minDuration + ")";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check that the largest gang does not exceed the inventory available
// capacity (skip for ANY)
if (Resources.greaterThan(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
&& type != ReservationRequestInterpreter.R_ANY) {
message =
"The size of the largest gang in the reservation refinition ("
+ maxGangSize + ") exceed the capacity available ("
+ plan.getTotalCapacity() + " )";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationSubmissionRequest} defining the
* resources required over time for the request
* @param reservationId the {@link ReservationId} associated with the current
* request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationSubmissionRequest(
ReservationSystem reservationSystem,
ReservationSubmissionRequest request, ReservationId reservationId)
throws YarnException {
// Check if it is a managed queue
String queueName = request.getQueue();
if (queueName == null || queueName.isEmpty()) {
String errMsg =
"The queue to submit is not specified."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
String errMsg =
"The specified queue: " + queueName
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationUpdateRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationUpdateRequest(
ReservationSystem reservationSystem, ReservationUpdateRequest request)
throws YarnException {
ReservationId reservationId = request.getReservationId();
Plan plan =
validateReservation(reservationSystem, reservationId,
AuditConstants.UPDATE_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.UPDATE_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationDeleteRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationDeleteRequest(
ReservationSystem reservationSystem, ReservationDeleteRequest request)
throws YarnException {
return validateReservation(reservationSystem, request.getReservationId(),
AuditConstants.DELETE_RESERVATION_REQUEST);
}
}