| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.optimizer.costs; |
| |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.function.FunctionDescriptor; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.optimizer.OptimizationUtils; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| import org.apache.wayang.core.optimizer.costs.LoadEstimator.SinglePointEstimationFunction; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.util.JuelUtils; |
| import org.apache.wayang.core.util.json.WayangJsonObj; |
| import org.apache.wayang.core.util.mathex.Context; |
| import org.apache.wayang.core.util.mathex.DefaultContext; |
| import org.apache.wayang.core.util.mathex.Expression; |
| import org.apache.wayang.core.util.mathex.ExpressionBuilder; |
| import org.apache.wayang.core.util.mathex.exceptions.EvaluationException; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.function.ToDoubleBiFunction; |
| import java.util.function.ToDoubleFunction; |
| import java.util.function.ToLongBiFunction; |
| import java.util.stream.Collectors; |
| import java.util.stream.StreamSupport; |
| |
| /** |
| * Utilities to deal with {@link LoadProfileEstimator}s. |
| */ |
| public class LoadProfileEstimators { |
| |
| /** |
| * Base {@link Context} that provides several functions for estimation purposes. |
| */ |
| public static final Context baseContext; |
| |
| private static final Logger logger = LogManager.getLogger(LoadProfileEstimators.class); |
| |
| static { |
| DefaultContext ctx = new DefaultContext(Context.baseContext); |
| ctx.setFunction( |
| "logGrowth", |
| vals -> OptimizationUtils.logisticGrowth(vals[0], vals[1], vals[2], vals[3]) |
| ); |
| baseContext = ctx; |
| } |
| |
| /** |
| * Prevent instantiation of this class. |
| */ |
| private LoadProfileEstimators() { |
| } |
| |
| /** |
| * Creates an {@link LoadProfileEstimator} via a specification that is retrieved from a {@link Configuration}. |
| * |
| * @param configKey to look up the specification |
| * @param configuration provides the specification and caches the {@link LoadProfileEstimator} |
| * @return the {@link LoadProfileEstimator} or {@code null} if no specification was found |
| */ |
| public static LoadProfileEstimator createFromSpecification(String configKey, Configuration configuration) { |
| final LoadProfileEstimator cachedEstimator = |
| configuration.getLoadProfileEstimatorCache().optionallyProvideFor(configKey).orElse(null); |
| if (cachedEstimator != null) return cachedEstimator.copy(); |
| |
| final Optional<String> optSpecification = configuration.getOptionalStringProperty(configKey); |
| if (optSpecification.isPresent()) { |
| final NestableLoadProfileEstimator estimator = |
| LoadProfileEstimators.createFromSpecification(configKey, optSpecification.get()); |
| configuration.getLoadProfileEstimatorCache().set(configKey, estimator.copy()); |
| return estimator; |
| } else { |
| logger.warn("Could not find an estimator specification associated with '{}'.", configuration); |
| return null; |
| } |
| } |
| |
| /** |
| * Creates a new instance from a specification {@link String}. Valid specifications are as follows: |
| * <pre> |
| * {"type":<*juel*, org.apache.wayang.core.util.mathex.mathex>, |
| * "cpu":<mathematical expression>, |
| * "ram":<mathematical expression>, |
| * "disk":<mathematical expression>, |
| * "network":<mathematical expression>, |
| * "import":<["optional", "operator", "properties"]>, |
| * "in":<#inputs>, |
| * "out":<#outputs>, |
| * "p":<correctness probability>, |
| * "overhead":<overhead in milliseconds>, |
| * "ru":<resource utilization mathematical expression> |
| * } |
| * </pre> |
| * The JUEL expressions accept as parameters {@code in0}, {@code in1} a.s.o. for the input cardinalities and |
| * {@code out0}, {@code out1} a.s.o. for the output cardinalities. |
| * |
| * @param configKey the {@link Configuration} from that the {@code spec} was retrieved or else {@code null} |
| * @param specification a specification that adheres to above format |
| * @return the new instance |
| */ |
| public static NestableLoadProfileEstimator createFromSpecification(String configKey, String specification) { |
| try { |
| final WayangJsonObj spec = new WayangJsonObj(specification); |
| if (!spec.has("type") || "juel".equalsIgnoreCase(spec.getString("type"))) { |
| return createFromJuelSpecification(configKey, spec); |
| } else if ("mathex".equalsIgnoreCase(spec.getString("type"))) { |
| return createFromMathExSpecification(configKey, spec); |
| } else { |
| throw new WayangException(String.format("Unknown specification type: %s", spec.get("type"))); |
| } |
| } catch (Exception e) { |
| throw new WayangException(String.format("Could not initialize from specification \"%s\".", specification), e); |
| } |
| } |
| |
| /** |
| * Creates a new instance from a specification {@link String}. Valid specifications are as follows: |
| * <pre> |
| * {"cpu":<JUEL expression>, |
| * "ram":<JUEL expression>, |
| * "disk":<JUEL expression>, |
| * "network":<JUEL expression>, |
| * "import":<["optional", "operator", "properties"]>, |
| * "in":<#inputs>, |
| * "out":<#outputs>, |
| * "p":<correctness probability>, |
| * "overhead":<overhead in milliseconds>, |
| * "ru":<resource utilization JUEL expression> |
| * } |
| * </pre> |
| * The JUEL expressions accept as parameters {@code in0}, {@code in1} a.s.o. for the input cardinalities and |
| * {@code out0}, {@code out1} a.s.o. for the output cardinalities. |
| * |
| * @param configKey the {@link Configuration} from that the {@code spec} was retrieved or else {@code null} |
| * @param spec a specification that adheres to above format |
| * @return the new instance |
| */ |
| public static NestableLoadProfileEstimator createFromJuelSpecification(String configKey, WayangJsonObj spec) { |
| int numInputs = spec.getInt("in"); |
| int numOutputs = spec.getInt("out"); |
| double correctnessProb = spec.getDouble("p"); |
| List<String> operatorProperties = spec.has("import") ? |
| StreamSupport.stream(spec.optionalWayangJsonArray("import").spliterator(), false).map(Objects::toString).collect(Collectors.toList()) : |
| Collections.emptyList(); |
| |
| |
| LoadEstimator cpuEstimator = new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| parseLoadJuel(spec.getString("cpu"), numInputs, numOutputs, operatorProperties) |
| ); |
| LoadEstimator ramEstimator = new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| parseLoadJuel(spec.getString("ram"), numInputs, numOutputs, operatorProperties) |
| ); |
| LoadEstimator diskEstimator = !spec.has("disk") ? null : new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| parseLoadJuel(spec.getString("disk"), numInputs, numOutputs, operatorProperties) |
| ); |
| LoadEstimator networkEstimator = !spec.has("network") ? null : new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| parseLoadJuel(spec.getString("network"), numInputs, numOutputs, operatorProperties) |
| ); |
| |
| long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L; |
| FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ? |
| parseResourceUsageJuel(spec.getString("ru"), numInputs, numOutputs) : |
| DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR; |
| return new NestableLoadProfileEstimator( |
| cpuEstimator, |
| ramEstimator, |
| diskEstimator, |
| networkEstimator, |
| resourceUtilizationEstimator, |
| overhead, |
| configKey |
| ); |
| } |
| |
| /** |
| * Creates a new instance from a specification {@link String}. Valid specifications are as follows: |
| * <pre> |
| * {"cpu":<mathematical expression>, |
| * "ram":<mathematical expression>, |
| * "disk":<mathematical expression>, |
| * "network":<mathematical expression>, |
| * "import":<["optional", "operator", "properties"]>, |
| * "in":<#inputs>, |
| * "out":<#outputs>, |
| * "p":<correctness probability>, |
| * "overhead":<overhead in milliseconds>, |
| * "ru":<resource utilization mathematical expression> |
| * } |
| * </pre> |
| * The JUEL expressions accept as parameters {@code in0}, {@code in1} a.s.o. for the input cardinalities and |
| * {@code out0}, {@code out1} a.s.o. for the output cardinalities. |
| * |
| * @param configKey the {@link Configuration} from that the {@code spec} was retrieved or else {@code null} |
| * @param spec a specification that adheres to above format |
| * @return the new instance |
| */ |
| public static NestableLoadProfileEstimator createFromMathExSpecification(String configKey, WayangJsonObj spec) { |
| int numInputs = spec.getInt("in"); |
| int numOutputs = spec.getInt("out"); |
| double correctnessProb = spec.getDouble("p"); |
| List<String> operatorProperties = spec.has("import") ? |
| StreamSupport.stream(spec.optionalWayangJsonArray("import").spliterator(), false).map(Objects::toString).collect(Collectors.toList()) : |
| Collections.emptyList(); |
| |
| |
| LoadEstimator cpuEstimator = new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| compile(spec.getString("cpu")) |
| ); |
| LoadEstimator ramEstimator = new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| compile(spec.getString("ram")) |
| ); |
| LoadEstimator diskEstimator = !spec.has("disk") ? null : new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| compile(spec.getString("disk")) |
| ); |
| LoadEstimator networkEstimator = !spec.has("network") ? null : new DefaultLoadEstimator( |
| numInputs, |
| numOutputs, |
| correctnessProb, |
| CardinalityEstimate.EMPTY_ESTIMATE, |
| compile(spec.getString("network")) |
| ); |
| |
| long overhead = spec.has("overhead") ? spec.getLong("overhead") : 0L; |
| FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator = spec.has("ru") ? |
| compileResourceUsage(spec.getString("ru")) : |
| DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR; |
| return new NestableLoadProfileEstimator( |
| cpuEstimator, |
| ramEstimator, |
| diskEstimator, |
| networkEstimator, |
| resourceUtilizationEstimator, |
| overhead, |
| configKey |
| ); |
| |
| } |
| |
| /** |
| * Parses a JUEL expression and provides it as a {@link SinglePointEstimationFunction}. |
| * |
| * @param juel a JUEL expression |
| * @param numInputs the number of inputs of the estimated operator, reflected as JUEL variables {@code in0}, {@code in1}, ... |
| * @param numOutputs the number of outputs of the estimated operator, reflected as JUEL variables {@code out0}, {@code out1}, ... |
| * @param additionalProperties additional properties to consider |
| * @return a {@link ToLongBiFunction} wrapping the JUEL expression |
| */ |
| private static SinglePointEstimationFunction parseLoadJuel(String juel, |
| int numInputs, |
| int numOutputs, |
| List<String> additionalProperties) { |
| final Map<String, Class<?>> parameterClasses = createJuelParameterClasses( |
| numInputs, |
| numOutputs, |
| additionalProperties.toArray(new String[additionalProperties.size()]) |
| ); |
| final JuelUtils.JuelFunction<Long> juelFunction = new JuelUtils.JuelFunction<>(juel, Long.class, parameterClasses); |
| return (estimationContext, inCards, outCards) -> applyJuelFunction(juelFunction, estimationContext, inCards, outCards, additionalProperties); |
| } |
| |
| /** |
| * Parses a JUEL expression and provides it as a {@link ToLongBiFunction}. |
| * |
| * @param juel a JUEL expression |
| * @param numInputs the number of inputs of the estimated operator, reflected as JUEL variables {@code in0}, {@code in1}, ... |
| * @param numOutputs the number of outputs of the estimated operator, reflected as JUEL variables {@code out0}, {@code out1}, ... |
| * @return a {@link ToLongBiFunction} wrapping the JUEL expression |
| */ |
| private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> parseResourceUsageJuel(String juel, int numInputs, int numOutputs) { |
| final Map<String, Class<?>> parameterClasses = createJuelParameterClasses(numInputs, numOutputs); |
| final JuelUtils.JuelFunction<Double> juelFunction = new JuelUtils.JuelFunction<>(juel, Double.class, parameterClasses); |
| return (inCards, outCards) -> applyJuelFunction(juelFunction, null, inCards, outCards, Collections.emptyList()); |
| } |
| |
| /** |
| * Creates parameters classes for JUEL expressions based on input and output cardinalities. |
| * |
| * @param numInputs the number of inputs |
| * @param numOutputs the number of ouputs |
| * @return the parameter names mapped to their parameter classes |
| */ |
| private static Map<String, Class<?>> createJuelParameterClasses(int numInputs, int numOutputs, String... additionalProperties) { |
| final Map<String, Class<?>> parameterClasses = new HashMap<>(numOutputs + numOutputs); |
| for (int i = 0; i < numInputs; i++) { |
| parameterClasses.put("in" + i, Long.class); |
| } |
| for (int i = 0; i < numOutputs; i++) { |
| parameterClasses.put("out" + i, Long.class); |
| } |
| for (String property : additionalProperties) { |
| parameterClasses.put(property, Double.class); |
| } |
| return parameterClasses; |
| } |
| |
| /** |
| * Evaluates a {@link JuelUtils.JuelFunction} with the given {@code inputCardinalities} and {@code outputCardinalities} as parameters. |
| * |
| * @param juelFunction the JUEL function to be executed |
| * @param inputCardinalities the input cardinalities |
| * @param outputCardinalities the output cardinalities |
| * @return the JUEL function result |
| */ |
| private static <T> T applyJuelFunction(JuelUtils.JuelFunction<T> juelFunction, |
| EstimationContext estimationContext, |
| long[] inputCardinalities, |
| long[] outputCardinalities, |
| List<String> artifactProperties) { |
| final Map<String, Object> parameters = new HashMap<>(inputCardinalities.length + outputCardinalities.length); |
| for (int i = 0; i < inputCardinalities.length; i++) { |
| parameters.put("in" + i, inputCardinalities[i]); |
| } |
| for (int i = 0; i < outputCardinalities.length; i++) { |
| parameters.put("out" + i, outputCardinalities[i]); |
| } |
| for (String property : artifactProperties) { |
| parameters.put(property, estimationContext.getDoubleProperty(property, 0d)); |
| } |
| return juelFunction.apply(parameters, true); |
| } |
| |
| /** |
| * Parses a mathematical expression and provides it as a {@link SinglePointEstimationFunction}. |
| * |
| * @param expression a mathematical expression |
| * @return the {@link SinglePointEstimationFunction} |
| */ |
| private static SinglePointEstimationFunction compile(String expression) { |
| final Expression expr = ExpressionBuilder.parse(expression).specify(baseContext); |
| return (context, inCards, outCards) -> { |
| Context mathContext = createMathContext(context, inCards, outCards); |
| return Math.round(expr.evaluate(mathContext)); |
| }; |
| } |
| |
| /** |
| * Parses a mathematical expression and provides it as a {@link ToDoubleFunction}. |
| * |
| * @param expression a mathematical expression |
| * @return a {@link ToLongBiFunction} wrapping the expression |
| */ |
| private static FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> compileResourceUsage(String expression) { |
| final Expression expr = ExpressionBuilder.parse(expression).specify(baseContext); |
| return (inCards, outCards) -> { |
| Context mathContext = createMathContext(null, inCards, outCards); |
| return expr.evaluate(mathContext); |
| }; |
| } |
| |
| |
| /** |
| * Create a mathematical {@link Context} from the parameters. |
| * |
| * @param context provides miscellaneous variables |
| * @param inputCardinalities provides input {@link CardinalityEstimate}s (`in***`) |
| * @param outputCardinalities provides output {@link CardinalityEstimate}s (`out***`) |
| * @return the {@link Context} |
| */ |
| private static Context createMathContext(final EstimationContext context, |
| final long[] inputCardinalities, |
| final long[] outputCardinalities) { |
| return new Context() { |
| @Override |
| public double getVariable(String variableName) throws EvaluationException { |
| // Serve "in999" and "out999" variables directly from the cardinality arrays. |
| if (variableName.startsWith("in") && variableName.length() > 2) { |
| int accu = 0; |
| int i; |
| for (i = 2; i < variableName.length(); i++) { |
| char c = variableName.charAt(i); |
| if (!Character.isDigit(c)) break; |
| accu = 10 * accu + (c - '0'); |
| } |
| if (i == variableName.length()) return inputCardinalities[accu]; |
| } else if (variableName.startsWith("out") && variableName.length() > 3) { |
| int accu = 0; |
| int i; |
| for (i = 3; i < variableName.length(); i++) { |
| char c = variableName.charAt(i); |
| if (!Character.isDigit(c)) break; |
| accu = 10 * accu + (c - '0'); |
| } |
| if (i == variableName.length()) return outputCardinalities[accu]; |
| } |
| |
| // Otherwise, ask the context for the property. |
| return context.getDoubleProperty(variableName, Double.NaN); |
| } |
| |
| @Override |
| public ToDoubleFunction<double[]> getFunction(String functionName) throws EvaluationException { |
| throw new EvaluationException("This context does not provide any functions."); |
| } |
| }; |
| } |
| |
| /** |
| * Estimate the {@link LoadProfile} for an {@link OptimizationContext.OperatorContext} using a |
| * {@link LoadProfileEstimator} for the corresponding {@link ExecutionOperator}. |
| * |
| * @param operatorContext the {@link OptimizationContext.OperatorContext} |
| * @param estimator the {@link LoadProfileEstimator} for the {@link ExecutionOperator} |
| * @param <T> the type of the {@link ExecutionOperator} that is in the {@link OptimizationContext.OperatorContext} |
| * @return the {@link LoadProfile} |
| */ |
| public static <T extends ExecutionOperator> LoadProfile estimateLoadProfile( |
| OptimizationContext.OperatorContext operatorContext, |
| LoadProfileEstimator estimator) { |
| |
| // Estimate the LoadProfile for that single execution. |
| final LoadProfile baseProfile = estimator.estimate(operatorContext.getNormalizedEstimationContext()); |
| return baseProfile.timesSequential(operatorContext.getNumExecutions()); |
| } |
| |
| /** |
| * Utility to nest the {@link LoadProfileEstimator} of a {@link FunctionDescriptor}. |
| * |
| * @param mainEstimatorOpt an optional {@link LoadProfileEstimator}; should be a {@link NestableLoadProfileEstimator} |
| * @param functionDescriptor whose {@link LoadProfileEstimator} should be nested |
| * @param configuration provides the UDF {@link LoadProfileEstimator} |
| */ |
| public static void nestUdfEstimator(Optional<LoadProfileEstimator> mainEstimatorOpt, |
| FunctionDescriptor functionDescriptor, |
| Configuration configuration) { |
| final LoadProfileEstimator mainEstimator = mainEstimatorOpt.orElse(null); |
| if (mainEstimator == null || !(mainEstimator instanceof NestableLoadProfileEstimator)) return; |
| final LoadProfileEstimator subestimator = configuration |
| .getFunctionLoadProfileEstimatorProvider() |
| .provideFor(functionDescriptor); |
| mainEstimator.nest(subestimator); |
| |
| } |
| |
| |
| private static final FunctionDescriptor.SerializableToDoubleBiFunction<long[], long[]> DEFAULT_RESOURCE_UTILIZATION_ESTIMATOR = (in, out) -> 1d; |
| |
| } |