blob: b5b8d653cfebe681c090bae5e7391366fca02aff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ReservationInputValidator {
private final Clock clock;
/**
* Utility class to validate reservation requests.
*
* @param clock the {@link Clock} to use
*/
public ReservationInputValidator(Clock clock) {
this.clock = clock;
}
private Plan validateReservation(ReservationSystem reservationSystem,
ReservationId reservationId, String auditConstant) throws YarnException {
// check if the reservation id is valid
if (reservationId == null) {
String message = "Missing reservation id."
+ " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queue = reservationSystem.getQueueForReservation(reservationId);
String nullQueueErrorMessage =
"The specified reservation with ID: " + reservationId
+ " is unknown. Please try again with a valid reservation.";
String nullPlanErrorMessage = "The specified reservation: " + reservationId
+ " is not associated with any valid plan."
+ " Please try again with a valid reservation.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
nullQueueErrorMessage, nullPlanErrorMessage);
}
private void validateReservationDefinition(ReservationId reservationId,
ReservationDefinition contract, Plan plan, String auditConstant)
throws YarnException {
String message = "";
// check if deadline is in the past
if (contract == null) {
message = "Missing reservation definition."
+ " Please try again by specifying a reservation definition.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
if (contract.getDeadline() <= clock.getTime()) {
message = "The specified deadline: " + contract.getDeadline()
+ " is the past. Please try again with deadline in the future.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// Check if at least one RR has been specified
ReservationRequests resReqs = contract.getReservationRequests();
if (resReqs == null) {
message = "No resources have been specified to reserve."
+ "Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
List<ReservationRequest> resReq = resReqs.getReservationResources();
if (resReq == null || resReq.isEmpty()) {
message = "No resources have been specified to reserve."
+ " Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// compute minimum duration and max gang size
long minDuration = 0;
Resource maxGangSize = Resource.newInstance(0, 0);
ReservationRequestInterpreter type =
contract.getReservationRequests().getInterpreter();
for (ReservationRequest rr : resReq) {
if (type == ReservationRequestInterpreter.R_ALL
|| type == ReservationRequestInterpreter.R_ANY) {
minDuration = Math.max(minDuration, rr.getDuration());
} else {
minDuration += rr.getDuration();
}
maxGangSize = Resources.max(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize,
Resources.multiply(rr.getCapability(), rr.getConcurrency()));
}
// verify the allocation is possible (skip for ANY)
long duration = contract.getDeadline() - contract.getArrival();
if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
message = "The time difference (" + (duration) + ") between arrival ("
+ contract.getArrival() + ") " + "and deadline ("
+ contract.getDeadline() + ") must "
+ " be greater or equal to the minimum resource duration ("
+ minDuration + ")";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check that the largest gang does not exceed the inventory available
// capacity (skip for ANY)
if (Resources.greaterThan(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
&& type != ReservationRequestInterpreter.R_ANY) {
message = "The size of the largest gang in the reservation definition ("
+ maxGangSize + ") exceed the capacity available ("
+ plan.getTotalCapacity() + " )";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check that the recurrence is a positive long value.
String recurrenceExpression = contract.getRecurrenceExpression();
try {
long recurrence = Long.parseLong(recurrenceExpression);
if (recurrence < 0) {
message = "Negative Period : " + recurrenceExpression + ". Please try"
+ " again with a non-negative long value as period.";
throw RPCUtil.getRemoteException(message);
}
// verify duration is less than recurrence for periodic reservations
if (recurrence > 0 && duration > recurrence) {
message = "Duration of the requested reservation: " + duration
+ " is greater than the recurrence: " + recurrence
+ ". Please try again with a smaller duration.";
throw RPCUtil.getRemoteException(message);
}
// verify maximum period is divisible by recurrence expression.
if (recurrence > 0 && plan.getMaximumPeriodicity() % recurrence != 0) {
message = "The maximum periodicity: " + plan.getMaximumPeriodicity() +
" must be divisible by the recurrence expression provided: " +
recurrence + ". Please try again with a recurrence expression" +
" that satisfies this requirement.";
throw RPCUtil.getRemoteException(message);
}
} catch (NumberFormatException e) {
message = "Invalid period " + recurrenceExpression + ". Please try"
+ " again with a non-negative long value as period.";
throw RPCUtil.getRemoteException(message);
}
}
private Plan getPlanFromQueue(ReservationSystem reservationSystem,
String queue, String auditConstant) throws YarnException {
String nullQueueErrorMessage = "The queue is not specified."
+ " Please try again with a valid reservable queue.";
String nullPlanErrorMessage = "The specified queue: " + queue
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
nullQueueErrorMessage, nullPlanErrorMessage);
}
private Plan getPlanFromQueue(ReservationSystem reservationSystem,
String queue, String auditConstant, String nullQueueErrorMessage,
String nullPlanErrorMessage) throws YarnException {
if (queue == null || queue.isEmpty()) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService",
nullQueueErrorMessage);
throw RPCUtil.getRemoteException(nullQueueErrorMessage);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queue);
if (plan == null) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService",
nullPlanErrorMessage);
throw RPCUtil.getRemoteException(nullPlanErrorMessage);
}
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationSubmissionRequest} defining the
* resources required over time for the request
* @param reservationId the {@link ReservationId} associated with the current
* request
* @return the {@link Plan} to submit the request to
* @throws YarnException if validation fails
*/
public Plan validateReservationSubmissionRequest(
ReservationSystem reservationSystem, ReservationSubmissionRequest request,
ReservationId reservationId) throws YarnException {
String message;
if (reservationId == null) {
message = "Reservation id cannot be null. Please try again specifying "
+ " a valid reservation id by creating a new reservation id.";
throw RPCUtil.getRemoteException(message);
}
// Check if it is a managed queue
String queue = request.getQueue();
Plan plan = getPlanFromQueue(reservationSystem, queue,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationUpdateRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException if validation fails
*/
public Plan validateReservationUpdateRequest(
ReservationSystem reservationSystem, ReservationUpdateRequest request)
throws YarnException {
ReservationId reservationId = request.getReservationId();
Plan plan = validateReservation(reservationSystem, reservationId,
AuditConstants.UPDATE_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.UPDATE_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures.
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationListRequest} defining search
* parameters for reservations in the {@link ReservationSystem} that
* is being validated against.
* @return the {@link Plan} to list reservations of.
* @throws YarnException if validation fails
*/
public Plan validateReservationListRequest(
ReservationSystem reservationSystem, ReservationListRequest request)
throws YarnException {
String queue = request.getQueue();
if (request.getEndTime() < request.getStartTime()) {
String errorMessage = "The specified end time must be greater than "
+ "the specified start time.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.LIST_RESERVATION_REQUEST,
"validate list reservation input", "ClientRMService", errorMessage);
throw RPCUtil.getRemoteException(errorMessage);
}
// Check if it is a managed queue
return getPlanFromQueue(reservationSystem, queue,
AuditConstants.LIST_RESERVATION_REQUEST);
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationDeleteRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException if validation fails
*/
public Plan validateReservationDeleteRequest(
ReservationSystem reservationSystem, ReservationDeleteRequest request)
throws YarnException {
return validateReservation(reservationSystem, request.getReservationId(),
AuditConstants.DELETE_RESERVATION_REQUEST);
}
}