| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import java.util.List; |
| |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.ReservationRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; |
| import org.apache.hadoop.yarn.api.records.ReservationRequests; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.ipc.RPCUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| public class ReservationInputValidator { |
| |
| private final Clock clock; |
| |
| /** |
| * Utility class to validate reservation requests. |
| * |
| * @param clock the {@link Clock} to use |
| */ |
| public ReservationInputValidator(Clock clock) { |
| this.clock = clock; |
| } |
| |
| private Plan validateReservation(ReservationSystem reservationSystem, |
| ReservationId reservationId, String auditConstant) throws YarnException { |
| // check if the reservation id is valid |
| if (reservationId == null) { |
| String message = "Missing reservation id." |
| + " Please try again by specifying a reservation id."; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| String queue = reservationSystem.getQueueForReservation(reservationId); |
| String nullQueueErrorMessage = |
| "The specified reservation with ID: " + reservationId |
| + " is unknown. Please try again with a valid reservation."; |
| String nullPlanErrorMessage = "The specified reservation: " + reservationId |
| + " is not associated with any valid plan." |
| + " Please try again with a valid reservation."; |
| return getPlanFromQueue(reservationSystem, queue, auditConstant, |
| nullQueueErrorMessage, nullPlanErrorMessage); |
| } |
| |
| private void validateReservationDefinition(ReservationId reservationId, |
| ReservationDefinition contract, Plan plan, String auditConstant) |
| throws YarnException { |
| String message = ""; |
| // check if deadline is in the past |
| if (contract == null) { |
| message = "Missing reservation definition." |
| + " Please try again by specifying a reservation definition."; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| if (contract.getDeadline() <= clock.getTime()) { |
| message = "The specified deadline: " + contract.getDeadline() |
| + " is the past. Please try again with deadline in the future."; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| // Check if at least one RR has been specified |
| ReservationRequests resReqs = contract.getReservationRequests(); |
| if (resReqs == null) { |
| message = "No resources have been specified to reserve." |
| + "Please try again by specifying the resources to reserve."; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| List<ReservationRequest> resReq = resReqs.getReservationResources(); |
| if (resReq == null || resReq.isEmpty()) { |
| message = "No resources have been specified to reserve." |
| + " Please try again by specifying the resources to reserve."; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| // compute minimum duration and max gang size |
| long minDuration = 0; |
| Resource maxGangSize = Resource.newInstance(0, 0); |
| ReservationRequestInterpreter type = |
| contract.getReservationRequests().getInterpreter(); |
| for (ReservationRequest rr : resReq) { |
| if (type == ReservationRequestInterpreter.R_ALL |
| || type == ReservationRequestInterpreter.R_ANY) { |
| minDuration = Math.max(minDuration, rr.getDuration()); |
| } else { |
| minDuration += rr.getDuration(); |
| } |
| maxGangSize = Resources.max(plan.getResourceCalculator(), |
| plan.getTotalCapacity(), maxGangSize, |
| Resources.multiply(rr.getCapability(), rr.getConcurrency())); |
| } |
| // verify the allocation is possible (skip for ANY) |
| long duration = contract.getDeadline() - contract.getArrival(); |
| if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) { |
| message = "The time difference (" + (duration) + ") between arrival (" |
| + contract.getArrival() + ") " + "and deadline (" |
| + contract.getDeadline() + ") must " |
| + " be greater or equal to the minimum resource duration (" |
| + minDuration + ")"; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| // check that the largest gang does not exceed the inventory available |
| // capacity (skip for ANY) |
| if (Resources.greaterThan(plan.getResourceCalculator(), |
| plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity()) |
| && type != ReservationRequestInterpreter.R_ANY) { |
| message = "The size of the largest gang in the reservation definition (" |
| + maxGangSize + ") exceed the capacity available (" |
| + plan.getTotalCapacity() + " )"; |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input definition", "ClientRMService", message); |
| throw RPCUtil.getRemoteException(message); |
| } |
| // check that the recurrence is a positive long value. |
| String recurrenceExpression = contract.getRecurrenceExpression(); |
| try { |
| long recurrence = Long.parseLong(recurrenceExpression); |
| if (recurrence < 0) { |
| message = "Negative Period : " + recurrenceExpression + ". Please try" |
| + " again with a non-negative long value as period."; |
| throw RPCUtil.getRemoteException(message); |
| } |
| // verify duration is less than recurrence for periodic reservations |
| if (recurrence > 0 && duration > recurrence) { |
| message = "Duration of the requested reservation: " + duration |
| + " is greater than the recurrence: " + recurrence |
| + ". Please try again with a smaller duration."; |
| throw RPCUtil.getRemoteException(message); |
| } |
| // verify maximum period is divisible by recurrence expression. |
| if (recurrence > 0 && plan.getMaximumPeriodicity() % recurrence != 0) { |
| message = "The maximum periodicity: " + plan.getMaximumPeriodicity() + |
| " must be divisible by the recurrence expression provided: " + |
| recurrence + ". Please try again with a recurrence expression" + |
| " that satisfies this requirement."; |
| throw RPCUtil.getRemoteException(message); |
| } |
| } catch (NumberFormatException e) { |
| message = "Invalid period " + recurrenceExpression + ". Please try" |
| + " again with a non-negative long value as period."; |
| throw RPCUtil.getRemoteException(message); |
| } |
| } |
| |
| private Plan getPlanFromQueue(ReservationSystem reservationSystem, |
| String queue, String auditConstant) throws YarnException { |
| String nullQueueErrorMessage = "The queue is not specified." |
| + " Please try again with a valid reservable queue."; |
| String nullPlanErrorMessage = "The specified queue: " + queue |
| + " is not managed by reservation system." |
| + " Please try again with a valid reservable queue."; |
| return getPlanFromQueue(reservationSystem, queue, auditConstant, |
| nullQueueErrorMessage, nullPlanErrorMessage); |
| } |
| |
| private Plan getPlanFromQueue(ReservationSystem reservationSystem, |
| String queue, String auditConstant, String nullQueueErrorMessage, |
| String nullPlanErrorMessage) throws YarnException { |
| if (queue == null || queue.isEmpty()) { |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input", "ClientRMService", |
| nullQueueErrorMessage); |
| throw RPCUtil.getRemoteException(nullQueueErrorMessage); |
| } |
| // check if the associated plan is valid |
| Plan plan = reservationSystem.getPlan(queue); |
| if (plan == null) { |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, |
| "validate reservation input", "ClientRMService", |
| nullPlanErrorMessage); |
| throw RPCUtil.getRemoteException(nullPlanErrorMessage); |
| } |
| return plan; |
| } |
| |
| /** |
| * Quick validation on the input to check some obvious fail conditions (fail |
| * fast) the input and returns the appropriate {@link Plan} associated with |
| * the specified {@link Queue} or throws an exception message illustrating the |
| * details of any validation check failures |
| * |
| * @param reservationSystem the {@link ReservationSystem} to validate against |
| * @param request the {@link ReservationSubmissionRequest} defining the |
| * resources required over time for the request |
| * @param reservationId the {@link ReservationId} associated with the current |
| * request |
| * @return the {@link Plan} to submit the request to |
| * @throws YarnException if validation fails |
| */ |
| public Plan validateReservationSubmissionRequest( |
| ReservationSystem reservationSystem, ReservationSubmissionRequest request, |
| ReservationId reservationId) throws YarnException { |
| String message; |
| if (reservationId == null) { |
| message = "Reservation id cannot be null. Please try again specifying " |
| + " a valid reservation id by creating a new reservation id."; |
| throw RPCUtil.getRemoteException(message); |
| } |
| // Check if it is a managed queue |
| String queue = request.getQueue(); |
| Plan plan = getPlanFromQueue(reservationSystem, queue, |
| AuditConstants.SUBMIT_RESERVATION_REQUEST); |
| |
| validateReservationDefinition(reservationId, |
| request.getReservationDefinition(), plan, |
| AuditConstants.SUBMIT_RESERVATION_REQUEST); |
| return plan; |
| } |
| |
| /** |
| * Quick validation on the input to check some obvious fail conditions (fail |
| * fast) the input and returns the appropriate {@link Plan} associated with |
| * the specified {@link Queue} or throws an exception message illustrating the |
| * details of any validation check failures |
| * |
| * @param reservationSystem the {@link ReservationSystem} to validate against |
| * @param request the {@link ReservationUpdateRequest} defining the resources |
| * required over time for the request |
| * @return the {@link Plan} to submit the request to |
| * @throws YarnException if validation fails |
| */ |
| public Plan validateReservationUpdateRequest( |
| ReservationSystem reservationSystem, ReservationUpdateRequest request) |
| throws YarnException { |
| ReservationId reservationId = request.getReservationId(); |
| Plan plan = validateReservation(reservationSystem, reservationId, |
| AuditConstants.UPDATE_RESERVATION_REQUEST); |
| validateReservationDefinition(reservationId, |
| request.getReservationDefinition(), plan, |
| AuditConstants.UPDATE_RESERVATION_REQUEST); |
| return plan; |
| } |
| |
| /** |
| * Quick validation on the input to check some obvious fail conditions (fail |
| * fast) the input and returns the appropriate {@link Plan} associated with |
| * the specified {@link Queue} or throws an exception message illustrating the |
| * details of any validation check failures. |
| * |
| * @param reservationSystem the {@link ReservationSystem} to validate against |
| * @param request the {@link ReservationListRequest} defining search |
| * parameters for reservations in the {@link ReservationSystem} that |
| * is being validated against. |
| * @return the {@link Plan} to list reservations of. |
| * @throws YarnException if validation fails |
| */ |
| public Plan validateReservationListRequest( |
| ReservationSystem reservationSystem, ReservationListRequest request) |
| throws YarnException { |
| String queue = request.getQueue(); |
| if (request.getEndTime() < request.getStartTime()) { |
| String errorMessage = "The specified end time must be greater than " |
| + "the specified start time."; |
| RMAuditLogger.logFailure("UNKNOWN", |
| AuditConstants.LIST_RESERVATION_REQUEST, |
| "validate list reservation input", "ClientRMService", errorMessage); |
| throw RPCUtil.getRemoteException(errorMessage); |
| } |
| // Check if it is a managed queue |
| return getPlanFromQueue(reservationSystem, queue, |
| AuditConstants.LIST_RESERVATION_REQUEST); |
| } |
| |
| /** |
| * Quick validation on the input to check some obvious fail conditions (fail |
| * fast) the input and returns the appropriate {@link Plan} associated with |
| * the specified {@link Queue} or throws an exception message illustrating the |
| * details of any validation check failures |
| * |
| * @param reservationSystem the {@link ReservationSystem} to validate against |
| * @param request the {@link ReservationDeleteRequest} defining the resources |
| * required over time for the request |
| * @return the {@link Plan} to submit the request to |
| * @throws YarnException if validation fails |
| */ |
| public Plan validateReservationDeleteRequest( |
| ReservationSystem reservationSystem, ReservationDeleteRequest request) |
| throws YarnException { |
| return validateReservation(reservationSystem, request.getReservationId(), |
| AuditConstants.DELETE_RESERVATION_REQUEST); |
| } |
| } |