blob: c944d20ae9bf4d8cc85e6a7939fb34b0d1a6765e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.resourceestimator.solver.impl;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
import org.apache.hadoop.resourceestimator.skylinestore.api.PredictionSkylineStore;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
import org.apache.hadoop.resourceestimator.solver.api.Solver;
import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
import org.apache.hadoop.resourceestimator.solver.preprocess.SolverPreprocessor;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.ojalgo.optimisation.Expression;
import org.ojalgo.optimisation.ExpressionsBasedModel;
import org.ojalgo.optimisation.Optimisation.Result;
import org.ojalgo.optimisation.Variable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A LP(Linear Programming) solution to predict recurring pipeline's
* {@link Resource} requirements, and generate Hadoop {@code RDL} requests which
* will be used to make recurring resource reservation.
*/
public class LpSolver extends BaseSolver implements Solver {
private static final Logger LOGGER = LoggerFactory.getLogger(LpSolver.class);
private final SolverPreprocessor preprocessor = new SolverPreprocessor();
/**
* Controls the balance between over-allocation and under-allocation.
*/
private double alpha;
/**
* Controls the generalization of the solver.
*/
private double beta;
/**
* The minimum number of job runs required to run the solver.
*/
private int minJobRuns;
/**
* The time interval which is used to discretize job execution.
*/
private int timeInterval;
/**
* The PredictionSkylineStore to store the predicted ResourceSkyline for new
* run.
*/
private PredictionSkylineStore predictionSkylineStore;
@Override public final void init(final Configuration config,
PredictionSkylineStore skylineStore) {
this.alpha =
config.getDouble(ResourceEstimatorConfiguration.SOLVER_ALPHA_KEY, 0.1);
this.beta =
config.getDouble(ResourceEstimatorConfiguration.SOLVER_BETA_KEY, 0.1);
this.minJobRuns =
config.getInt(ResourceEstimatorConfiguration.SOLVER_MIN_JOB_RUN_KEY, 1);
this.timeInterval =
config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5);
this.predictionSkylineStore = skylineStore;
}
/**
* Generate over-allocation constraints.
*
* @param lpModel the LP model.
* @param cJobITimeK actual container allocation for job i in time
* interval k.
* @param oa container over-allocation.
* @param x predicted container allocation.
* @param indexJobITimeK index for job i at time interval k.
* @param timeK index for time interval k.
*/
private void generateOverAllocationConstraints(
final ExpressionsBasedModel lpModel, final double cJobITimeK,
final Variable[] oa, final Variable[] x, final int indexJobITimeK,
final int timeK) {
// oa_job_i_timeK >= x_timeK - cJobITimeK
Expression overAllocExpression =
lpModel.addExpression("over_alloc_" + indexJobITimeK);
overAllocExpression.set(oa[indexJobITimeK], 1);
overAllocExpression.set(x[timeK], -1);
overAllocExpression.lower(-cJobITimeK); // >=
}
/**
* Generate under-allocation constraints.
*
* @param lpModel the LP model.
* @param cJobITimeK actual container allocation for job i in time
* interval k.
* @param uaPredict absolute container under-allocation.
* @param ua recursive container under-allocation.
* @param x predicted container allocation.
* @param indexJobITimeK index for job i at time interval k.
* @param timeK index for time interval k.
*/
private void generateUnderAllocationConstraints(
final ExpressionsBasedModel lpModel, final double cJobITimeK,
final Variable[] uaPredict, final Variable[] ua, final Variable[] x,
final int indexJobITimeK, final int timeK) {
// uaPredict_job_i_timeK + x_timeK >= cJobITimeK
Expression underAllocPredictExpression =
lpModel.addExpression("under_alloc_predict_" + indexJobITimeK);
underAllocPredictExpression.set(uaPredict[indexJobITimeK], 1);
underAllocPredictExpression.set(x[timeK], 1);
underAllocPredictExpression.lower(cJobITimeK); // >=
if (timeK >= 1) {
/** Recursively calculate container under-allocation. */
// ua_job_i_timeK >= ua_job_i_time_(k-1) + cJobITimeK - x_timeK
Expression underAllocExpression =
lpModel.addExpression("under_alloc_" + indexJobITimeK);
underAllocExpression.set(ua[indexJobITimeK], 1);
underAllocExpression.set(ua[indexJobITimeK - 1], -1);
underAllocExpression.set(x[timeK], 1);
underAllocExpression.lower(cJobITimeK); // >=
} else {
/** Initial value for container under-allocation. */
// ua_job_i_time_0 >= cJobI_time_0 - x_time_0
Expression underAllocExpression =
lpModel.addExpression("under_alloc_" + indexJobITimeK);
underAllocExpression.set(ua[indexJobITimeK], 1);
underAllocExpression.set(x[timeK], 1);
underAllocExpression.lower(cJobITimeK); // >=
}
}
/**
* Generate solver objective.
*
* @param objective LP solver objective.
* @param numJobs number of history runs of the recurring pipeline.
* @param jobLen (maximum) job lenght of the recurring pipeline.
* @param oa container over-allocation.
* @param ua recursive container under-allocation.
* @param eps regularization parameter.
*/
private void generateObjective(final Expression objective, final int numJobs,
final int jobLen, final Variable[] oa, final Variable[] ua,
final Variable eps) {
int indexJobITimeK;
// sum Over_Allocation
for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
for (int timeK = 0; timeK < jobLen; timeK++) {
indexJobITimeK = indexJobI * jobLen + timeK;
objective.set(oa[indexJobITimeK], alpha / numJobs);
}
}
// sum Under_Allocation
int indexJobITimeN;
for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
indexJobITimeN = indexJobI * jobLen + jobLen - 1;
objective.set(ua[indexJobITimeN], (1 - alpha) / numJobs);
}
objective.set(eps, beta);
objective.weight(BigDecimal.valueOf(1));
}
/**
* Get the job length of recurring pipeline.
*
* @param resourceSkylines the history ResourceSkylines allocated to the
* recurring pipeline.
* @param numJobs number of history runs of the recurring pipeline.
* @return length of (discretized time intervals of) the recurring pipeline.
*/
private int getJobLen(final List<ResourceSkyline> resourceSkylines,
final int numJobs) {
int curLen = 0;
int jobLen = 0;
for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
curLen = (int) (resourceSkylines.get(indexJobI).getSkylineList()
.getLatestNonNullTime() - resourceSkylines.get(indexJobI)
.getSkylineList().getEarliestStartTime() + timeInterval - 1)
/ timeInterval; // for round up
if (jobLen < curLen) {
jobLen = curLen;
}
}
return jobLen;
}
@Override public final RLESparseResourceAllocation solve(
final Map<RecurrenceId, List<ResourceSkyline>> jobHistory)
throws SolverException, SkylineStoreException {
// TODO: addHistory timeout support for this function, and ideally we should
// return the confidence
// level associated with the predicted resource.
preprocessor.validate(jobHistory, timeInterval);
final List<ResourceSkyline> resourceSkylines =
preprocessor.aggregateSkylines(jobHistory, minJobRuns);
final int numJobs = resourceSkylines.size();
final int jobLen = getJobLen(resourceSkylines, numJobs);
/** Create variables. */
final ExpressionsBasedModel lpModel = new ExpressionsBasedModel();
Variable[] oa = new Variable[jobLen * numJobs];
Variable[] ua = new Variable[jobLen * numJobs];
Variable[] uaPredict = new Variable[jobLen * numJobs];
Variable[] x = new Variable[jobLen];
for (int i = 0; i < jobLen * numJobs; i++) {
oa[i] = new Variable("oa" + i).lower(BigDecimal.valueOf(0));
ua[i] = new Variable("ua" + i).lower(BigDecimal.valueOf(0));
uaPredict[i] = new Variable("uaPredict" + i).lower(BigDecimal.valueOf(0));
}
for (int i = 0; i < jobLen; i++) {
x[i] = new Variable("x").lower(BigDecimal.valueOf(0));
}
lpModel.addVariables(x);
lpModel.addVariables(oa);
lpModel.addVariables(ua);
lpModel.addVariables(uaPredict);
Variable eps = new Variable("epsilon").lower(BigDecimal.valueOf(0));
lpModel.addVariable(eps);
/** Set constraints. */
int indexJobITimeK = 0;
double cJobI = 0;
double cJobITimeK = 0;
ResourceSkyline resourceSkyline;
int[] containerNums;
// 1. sum(job_i){sum(timeK){1/cJobI * uaPredict_job_i_timeK}} <= numJobs
// * eps
Expression regularizationConstraint =
lpModel.addExpression("regularization");
regularizationConstraint.set(eps, -numJobs);
regularizationConstraint.upper(BigDecimal.valueOf(0)); // <= 0
for (int indexJobI = 0;
indexJobI < resourceSkylines.size(); indexJobI++) {
resourceSkyline = resourceSkylines.get(indexJobI);
// the # of containers consumed by job i in discretized time intervals
containerNums = preprocessor
.getDiscreteSkyline(resourceSkyline.getSkylineList(), timeInterval,
resourceSkyline.getContainerSpec().getMemorySize(), jobLen);
// the aggregated # of containers consumed by job i during its lifespan
cJobI = 0;
for (int i = 0; i < containerNums.length; i++) {
cJobI = cJobI + containerNums[i];
}
for (int timeK = 0; timeK < jobLen; timeK++) {
indexJobITimeK = indexJobI * jobLen + timeK;
// the # of containers consumed by job i in the k-th time interval
cJobITimeK = containerNums[timeK];
regularizationConstraint
.set(uaPredict[indexJobITimeK], 1 / cJobI);
generateOverAllocationConstraints(lpModel, cJobITimeK, oa, x,
indexJobITimeK, timeK);
generateUnderAllocationConstraints(lpModel, cJobITimeK, uaPredict,
ua, x, indexJobITimeK, timeK);
}
}
/** Set objective. */
Expression objective = lpModel.addExpression("objective");
generateObjective(objective, numJobs, jobLen, oa, ua, eps);
/** Solve the model. */
final Result lpResult = lpModel.minimise();
final TreeMap<Long, Resource> treeMap = new TreeMap<>();
RLESparseResourceAllocation result =
new RLESparseResourceAllocation(treeMap,
new DefaultResourceCalculator());
ReservationInterval riAdd;
Resource containerSpec = resourceSkylines.get(0).getContainerSpec();
String pipelineId =
((RecurrenceId) jobHistory.keySet().toArray()[0]).getPipelineId();
Resource resource;
for (int indexTimeK = 0; indexTimeK < jobLen; indexTimeK++) {
riAdd = new ReservationInterval(indexTimeK * timeInterval,
(indexTimeK + 1) * timeInterval);
resource = Resource.newInstance(
containerSpec.getMemorySize() * (int) lpResult
.doubleValue(indexTimeK),
containerSpec.getVirtualCores() * (int) lpResult
.doubleValue(indexTimeK));
result.addInterval(riAdd, resource);
LOGGER.debug("time interval: {}, container: {}.", indexTimeK,
lpResult.doubleValue(indexTimeK));
}
predictionSkylineStore.addEstimation(pipelineId, result);
/**
* TODO: 1. We can calculate the estimated error (over-allocation,
* under-allocation) of our prediction which could be used to generate
* confidence level for our prediction; 2. Also, we can modify our model to
* take job input data size (and maybe stage info) into consideration; 3. We
* can also try to generate such conclusion: our prediction under-allocates
* X amount of resources from time 0 to time 100 compared with 95% of
* history runs; 4. We can build framework-specific versions of estimator
* (such as scope/spark/hive, etc.) and provides more specific suggestions.
* For example, we may say: for spark job i, its task size is X GB while the
* container memory allocation is Y GB; as a result, its shuffling stage is
* 20% slower than ideal case due to the disk spilling operations, etc. 5.
* If we have more information of jobs (other than ResourceSkyline), we may
* have such conclusion: job i is 20% slower than 90% of history runs, and
* it is because part of its tasks are running together with job j's tasks.
* In this case, we not only predict the amount of resource needed for job
* i, but also how to place the resource requirements to clusters; 6. We may
* monitor job progress, and dynamically increase/decrease container
* allocations to satisfy job deadline while minimizing the cost; 7. We may
* allow users to specify a budget (say $100 per job run), and optimize the
* resource allocation under the budget constraints. 8. ...
*/
return result;
}
@Override public final void close() {
// TODO: currently place holder
}
}