blob: da07211defeb5ce7f6cae2b019c39d491b68f36c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.resourceestimator.solver.preprocess;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.solver.api.Solver;
import org.apache.hadoop.resourceestimator.solver.exceptions.InvalidInputException;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Common preprocessing functions for {@link Solver}.
*/
public class SolverPreprocessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(SolverPreprocessor.class);
/**
* Check if Solver's input parameters are valid.
*
* @param jobHistory the history {@link ResourceSkyline}s of the recurring
* pipeline job.
* @param timeInterval the time interval which is used to discretize the
* history {@link ResourceSkyline}s.
* @throws InvalidInputException if: (1) jobHistory is <em>null</em>;
* (2) jobHistory is empty; (3) timeout is non-positive;
* (4) timeInterval is non-positive;
*/
public final void validate(
final Map<RecurrenceId, List<ResourceSkyline>> jobHistory,
final int timeInterval) throws InvalidInputException {
if ((jobHistory == null) || (jobHistory.size() == 0)) {
LOGGER.error(
"Job resource skyline history is invalid, please try again with"
+ " valid resource skyline history.");
throw new InvalidInputException("Job ResourceSkyline history", "invalid");
}
if (timeInterval <= 0) {
LOGGER.error(
"Solver timeInterval {} is invalid, please specify a positive value.",
timeInterval);
throw new InvalidInputException("Solver timeInterval", "non-positive");
}
}
/**
* Return the multi-dimension resource vector consumed by the job at specified
* time.
*
* @param skyList the list of {@link Resource}s used by the job.
* @param index the discretized time index.
* @param containerMemAlloc the multi-dimension resource vector allocated to
* one container.
* @return the multi-dimension resource vector consumed by the job.
*/
public final long getResourceVector(final RLESparseResourceAllocation skyList,
final int index, final long containerMemAlloc) {
return skyList.getCapacityAtTime(index).getMemorySize() / containerMemAlloc;
}
/**
* Discretize job's lifespan into intervals, and return the number of
* containers used by the job within each interval.
* <p> Note that here we assume all containers allocated to the job have the
* same {@link Resource}. This is due to the limit of
* {@link RLESparseResourceAllocation}.
*
* @param skyList the list of {@link Resource}s used by the job.
* @param timeInterval the time interval used to discretize the job's
* lifespan.
* @param containerMemAlloc the amount of memory allocated to each container.
* @param jobLen the duration of the job.
* @return the number of containers allocated to the job within discretized
* time intervals.
*/
public final int[] getDiscreteSkyline(
final RLESparseResourceAllocation skyList, final int timeInterval,
final long containerMemAlloc, final int jobLen) {
long jobLifeSpan =
skyList.getLatestNonNullTime() - skyList.getEarliestStartTime();
int[] result = new int[jobLen];
Arrays.fill(result, 0);
int index = 0;
long numContainerAt = 0;
for (int i = 0; i < jobLifeSpan; i++) {
index = (int) Math.floor((double) i / timeInterval);
numContainerAt = getResourceVector(skyList, i, containerMemAlloc);
if (result[index] < numContainerAt) {
result[index] = (int) numContainerAt;
}
}
return result;
}
/**
* Merge different jobs' resource skylines into one within the same pipeline.
*
* @param resourceSkylines different jobs' resource skylines within the same
* pipeline.
* @return an aggregated resource skyline for the pipeline.
*/
public final ResourceSkyline mergeSkyline(
final List<ResourceSkyline> resourceSkylines) {
// TODO:
// rewrite this function with shift and merge once YARN-5328 is committed
/** First, getHistory the pipeline submission time. */
long pipelineSubmission = Long.MAX_VALUE;
for (int i = 0; i < resourceSkylines.size(); i++) {
long jobSubmission = resourceSkylines.get(i).getJobSubmissionTime();
if (pipelineSubmission > jobSubmission) {
pipelineSubmission = jobSubmission;
}
}
final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
final RLESparseResourceAllocation skylineListAgg =
new RLESparseResourceAllocation(resourceOverTime,
new DefaultResourceCalculator());
/**
* Second, adjust different jobs' ResourceSkyline starting time based on
* pipeline submission time, and merge them into one ResourceSkyline.
*/
for (int i = 0; i < resourceSkylines.size(); i++) {
long jobSubmission = resourceSkylines.get(i).getJobSubmissionTime();
long diff = (jobSubmission - pipelineSubmission) / 1000;
RLESparseResourceAllocation tmp =
resourceSkylines.get(i).getSkylineList();
Object[] timePoints = tmp.getCumulative().keySet().toArray();
for (int j = 0; j < timePoints.length - 2; j++) {
ReservationInterval riAdd =
new ReservationInterval(toIntExact((long) timePoints[j]) + diff,
toIntExact((long) timePoints[j + 1] + diff));
skylineListAgg.addInterval(riAdd,
tmp.getCapacityAtTime(toIntExact((long) timePoints[j])));
}
}
ResourceSkyline skylineAgg =
new ResourceSkyline(resourceSkylines.get(0).getJobId(),
resourceSkylines.get(0).getJobInputDataSize(),
resourceSkylines.get(0).getJobSubmissionTime(),
resourceSkylines.get(0).getJobFinishTime(),
resourceSkylines.get(0).getContainerSpec(), skylineListAgg);
return skylineAgg;
}
private int toIntExact(long value) {
return BigDecimal.valueOf(value).intValueExact();
}
/**
* Aggregate all job's {@link ResourceSkyline}s in the one run of recurring
* pipeline, and return the aggregated {@link ResourceSkyline}s in different
* runs.
*
* @param jobHistory the history {@link ResourceSkyline} of the recurring
* pipeline job.
* @param minJobRuns the minimum number of job runs required to run the
* solver.
* @return the aggregated {@link ResourceSkyline}s in different runs.
* @throws InvalidInputException if: (1) job submission time parsing fails;
* (2) jobHistory has less job runs than the minimum requirement;
*/
public final List<ResourceSkyline> aggregateSkylines(
final Map<RecurrenceId, List<ResourceSkyline>> jobHistory,
final int minJobRuns) throws InvalidInputException {
List<ResourceSkyline> resourceSkylines = new ArrayList<ResourceSkyline>();
for (Map.Entry<RecurrenceId, List<ResourceSkyline>> entry : jobHistory
.entrySet()) {
// TODO: identify different jobs within the same pipeline
// right now, we do prediction at the granularity of pipeline, i.e., we
// will merge the
// resource skylines of jobs within the same pipeline into one aggregated
// resource skyline
ResourceSkyline skylineAgg = null;
skylineAgg = mergeSkyline(entry.getValue());
resourceSkylines.add(skylineAgg);
}
int numJobs = resourceSkylines.size();
if (numJobs < minJobRuns) {
LOGGER.error(
"Solver requires job resource skyline history for at least {} runs,"
+ " but it only receives history info for {} runs.",
minJobRuns, numJobs);
throw new InvalidInputException("Job ResourceSkyline history",
"containing less job runs" + " than " + minJobRuns);
}
return resourceSkylines;
}
}