| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.optimizer.costs; |
| |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.function.Function; |
| import java.util.function.ToDoubleBiFunction; |
| |
| /** |
| * {@link LoadProfileEstimator} that can host further {@link LoadProfileEstimator}s. |
| */ |
| public class NestableLoadProfileEstimator implements LoadProfileEstimator { |
| |
| /** |
| * {@link LoadEstimator} to estimate a certain aspect of the {@link LoadProfile}s for the {@code Artifact}. |
| */ |
| private final LoadEstimator cpuLoadEstimator, ramLoadEstimator, diskLoadEstimator, networkLoadEstimator; |
| |
| /** |
| * The degree to which the load profile can utilize available resources. |
| */ |
| private final ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator; |
| |
| /** |
| * Milliseconds overhead that this load profile incurs. |
| */ |
| private final long overheadMillis; |
| |
| /** |
| * Nested {@link LoadProfileEstimator}s together with a {@link Function} to extract the estimated |
| * {@code Artifact} from the {@code Artifact}s subject to this instance. |
| */ |
| private Collection<LoadProfileEstimator> nestedEstimators = new LinkedList<>(); |
| |
| /** |
| * If this instance was created from a specification in the {@link Configuration}, |
| * then the according {@link Configuration} key should be stored. |
| */ |
| private final String configurationKey; |
| |
| /** |
| * Creates an new instance. |
| * |
| * @param cpuLoadEstimator estimates CPU load in terms of cycles |
| * @param ramLoadEstimator estimates RAM load in terms of MB |
| */ |
| public NestableLoadProfileEstimator(LoadEstimator cpuLoadEstimator, LoadEstimator ramLoadEstimator) { |
| this(cpuLoadEstimator, ramLoadEstimator, null, null); |
| } |
| |
| /** |
| * Creates an new instance. |
| * |
| * @param cpuLoadEstimator estimates CPU load in terms of cycles |
| * @param ramLoadEstimator estimates RAM load in terms of bytes |
| * @param diskLoadEstimator estimates disk accesses in terms of bytes |
| * @param networkLoadEstimator estimates network in terms of bytes |
| */ |
| public NestableLoadProfileEstimator(LoadEstimator cpuLoadEstimator, |
| LoadEstimator ramLoadEstimator, |
| LoadEstimator diskLoadEstimator, |
| LoadEstimator networkLoadEstimator) { |
| this(cpuLoadEstimator, ramLoadEstimator, diskLoadEstimator, networkLoadEstimator, (in, out) -> 1d, 0L, null); |
| } |
| |
| /** |
| * Creates an new instance. |
| * |
| * @param cpuLoadEstimator estimates CPU load in terms of cycles |
| * @param ramLoadEstimator estimates RAM load in terms of bytes |
| * @param diskLoadEstimator estimates disk accesses in terms of bytes |
| * @param networkLoadEstimator estimates network in terms of bytes |
| * @param resourceUtilizationEstimator degree to which the load profile can utilize available resources |
| * @param overheadMillis overhead that this load profile incurs |
| */ |
| public NestableLoadProfileEstimator(LoadEstimator cpuLoadEstimator, |
| LoadEstimator ramLoadEstimator, |
| LoadEstimator diskLoadEstimator, |
| LoadEstimator networkLoadEstimator, |
| ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator, |
| long overheadMillis) { |
| this( |
| cpuLoadEstimator, ramLoadEstimator, diskLoadEstimator, networkLoadEstimator, |
| resourceUtilizationEstimator, overheadMillis, null |
| ); |
| } |
| |
| /** |
| * Creates an new instance. |
| * |
| * @param cpuLoadEstimator estimates CPU load in terms of cycles |
| * @param ramLoadEstimator estimates RAM load in terms of bytes |
| * @param diskLoadEstimator estimates disk accesses in terms of bytes |
| * @param networkLoadEstimator estimates network in terms of bytes |
| * @param resourceUtilizationEstimator degree to which the load profile can utilize available resources |
| * @param overheadMillis overhead that this load profile incurs |
| * @param configurationKey from that this instances was perceived |
| */ |
| public NestableLoadProfileEstimator(LoadEstimator cpuLoadEstimator, |
| LoadEstimator ramLoadEstimator, |
| LoadEstimator diskLoadEstimator, |
| LoadEstimator networkLoadEstimator, |
| ToDoubleBiFunction<long[], long[]> resourceUtilizationEstimator, |
| long overheadMillis, |
| String configurationKey) { |
| this.cpuLoadEstimator = cpuLoadEstimator; |
| this.ramLoadEstimator = ramLoadEstimator; |
| this.diskLoadEstimator = diskLoadEstimator; |
| this.networkLoadEstimator = networkLoadEstimator; |
| this.resourceUtilizationEstimator = resourceUtilizationEstimator; |
| this.overheadMillis = overheadMillis; |
| this.configurationKey = configurationKey; |
| } |
| |
| /** |
| * Nest a {@link LoadProfileEstimator} in this instance. No artifact will be available to it, though. |
| * |
| * @param nestedEstimator the {@link LoadProfileEstimator} that should be nested |
| */ |
| public void nest(LoadProfileEstimator nestedEstimator) { |
| this.nestedEstimators.add(nestedEstimator); |
| } |
| |
| @Override |
| public LoadProfile estimate(EstimationContext context) { |
| // Estimate the load for the very artifact. |
| final LoadProfile mainLoadProfile = this.performLocalEstimation(context); |
| |
| // Estiamte the load for any nested artifacts. |
| for (LoadProfileEstimator nestedEstimator : this.nestedEstimators) { |
| LoadProfile nestedProfile = nestedEstimator.estimate(context); |
| mainLoadProfile.nest(nestedProfile); |
| } |
| |
| // Return the complete LoadProfile. |
| return mainLoadProfile; |
| } |
| |
| /** |
| * Perform the estimation for the very {@code artifact}, i.e., without any nested artifacts. |
| * |
| * @param context provides parameters for the estimation |
| * @return the {@link LoadProfile} for the {@code artifact} |
| */ |
| private LoadProfile performLocalEstimation(EstimationContext context) { |
| try { |
| final LoadEstimate cpuLoadEstimate = this.cpuLoadEstimator.calculate(context); |
| final LoadEstimate ramLoadEstimate = this.ramLoadEstimator.calculate(context); |
| final LoadEstimate diskLoadEstimate = this.diskLoadEstimator == null ? |
| null : |
| this.diskLoadEstimator.calculate(context); |
| final LoadEstimate networkLoadEstimate = this.networkLoadEstimator == null ? |
| null : |
| this.networkLoadEstimator.calculate(context); |
| final double resourceUtilization = this.estimateResourceUtilization(context); |
| return new LoadProfile( |
| cpuLoadEstimate, |
| ramLoadEstimate, |
| networkLoadEstimate, |
| diskLoadEstimate, |
| resourceUtilization, |
| this.getOverheadMillis() |
| ); |
| } catch (Exception e) { |
| throw new WayangException(String.format("Failed estimating on %s.", this, context), e); |
| } |
| } |
| |
| /** |
| * Estimates the resource utilization. |
| * |
| * @param context provides parameters for the estimation |
| * @return the estimated resource utilization |
| */ |
| private double estimateResourceUtilization(EstimationContext context) { |
| long[] avgInputEstimates = extractMeanValues(context.getInputCardinalities()); |
| long[] avgOutputEstimates = extractMeanValues(context.getOutputCardinalities()); |
| return this.resourceUtilizationEstimator.applyAsDouble(avgInputEstimates, avgOutputEstimates); |
| } |
| |
| /** |
| * Extracts the geometric mean values of the given {@link CardinalityEstimate}s. |
| * |
| * @param estimates the input {@link CardinalityEstimate}s |
| * @return an array containing the average estimates |
| * @see CardinalityEstimate#getGeometricMeanEstimate() |
| */ |
| private static long[] extractMeanValues(CardinalityEstimate[] estimates) { |
| long[] averages = new long[estimates.length]; |
| for (int i = 0; i < estimates.length; i++) { |
| CardinalityEstimate inputEstimate = estimates[i]; |
| if (inputEstimate == null) inputEstimate = CardinalityEstimate.EMPTY_ESTIMATE; |
| averages[i] = inputEstimate.getGeometricMeanEstimate(); |
| } |
| return averages; |
| } |
| |
| private long getOverheadMillis() { |
| return this.overheadMillis; |
| } |
| |
| @Override |
| public Collection<LoadProfileEstimator> getNestedEstimators() { |
| return this.nestedEstimators; |
| } |
| |
| @Override |
| public String getConfigurationKey() { |
| return this.configurationKey; |
| } |
| |
| @Override |
| public LoadProfileEstimator copy() { |
| LoadProfileEstimator copy = new NestableLoadProfileEstimator( |
| this.cpuLoadEstimator, this.ramLoadEstimator, this.diskLoadEstimator, this.networkLoadEstimator, |
| this.resourceUtilizationEstimator, this.overheadMillis, this.configurationKey |
| ); |
| for (LoadProfileEstimator nestedEstimator : this.nestedEstimators) { |
| copy.nest(nestedEstimator.copy()); |
| } |
| return copy; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s[%s]", |
| this.getClass().getSimpleName(), |
| this.configurationKey == null ? "(no key)" : this.configurationKey |
| ); |
| } |
| } |