blob: f5a6f16d7bfad04d29665c69184b1fae7beb333a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.resources.GPUResource;
import org.apache.flink.api.common.resources.Resource;
import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Describe the different resource factors of the operator with UDF.
*
* <p>The state backend provides the method to estimate memory usages based on state size in the resource.
*
* <p>Resource provides {@link #merge(ResourceSpec)} method for chained operators when generating job graph.
*
* <p>Resource provides {@link #lessThanOrEqual(ResourceSpec)} method to compare these fields in sequence:
* <ol>
* <li>CPU cores</li>
* <li>Heap Memory Size</li>
* <li>Direct Memory Size</li>
* <li>Native Memory Size</li>
* <li>State Size</li>
* <li>Extended resources</li>
* </ol>
*/
@Internal
public class ResourceSpec implements Serializable {
private static final long serialVersionUID = 1L;
public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
public static final String GPU_NAME = "GPU";
public static final String MANAGED_MEMORY_NAME = "MANAGED_MEMORY_MB";
public static final String FLOATING_MANAGED_MEMORY_NAME = "FLOATING_MANAGED_MEMORY_MB";
/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
private final double cpuCores;
/** How many java heap memory in mb are needed. */
private final int heapMemoryInMB;
/** How many nio direct memory in mb are needed. */
private final int directMemoryInMB;
/** How many native memory in mb are needed. */
private final int nativeMemoryInMB;
/** How many state size in mb are used. */
private final int stateSizeInMB;
private final Map<String, Resource> extendedResources = new HashMap<>(1);
/**
* Creates a new ResourceSpec with full resources.
*
* @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
* @param heapMemoryInMB The size of the java heap memory, in megabytes.
* @param directMemoryInMB The size of the java nio direct memory, in megabytes.
* @param nativeMemoryInMB The size of the native memory, in megabytes.
* @param stateSizeInMB The state size for storing in checkpoint.
* @param extendedResources The extended resources, associated with the resource manager used
*/
protected ResourceSpec(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB,
int stateSizeInMB,
Resource... extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
this.nativeMemoryInMB = nativeMemoryInMB;
this.stateSizeInMB = stateSizeInMB;
for (Resource resource : extendedResources) {
if (resource != null) {
this.extendedResources.put(resource.getName(), resource);
}
}
}
/**
* Used by system internally to merge the other resources of chained operators
* when generating the job graph or merge the resource consumed by state backend.
*
* @param other Reference to resource to merge in.
* @return The new resource with merged values.
*/
public ResourceSpec merge(ResourceSpec other) {
ResourceSpec target = new ResourceSpec(
Math.max(this.cpuCores, other.cpuCores),
this.heapMemoryInMB + other.heapMemoryInMB,
this.directMemoryInMB + other.directMemoryInMB,
this.nativeMemoryInMB + other.nativeMemoryInMB,
this.stateSizeInMB + other.stateSizeInMB);
target.extendedResources.putAll(extendedResources);
for (Resource resource : other.extendedResources.values()) {
target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
}
return target;
}
public double getCpuCores() {
return this.cpuCores;
}
public int getHeapMemory() {
return this.heapMemoryInMB;
}
public int getDirectMemory() {
return this.directMemoryInMB;
}
public int getNativeMemory() {
return this.nativeMemoryInMB;
}
public int getStateSize() {
return this.stateSizeInMB;
}
public double getGPUResource() {
Resource gpuResource = extendedResources.get(GPU_NAME);
if (gpuResource != null) {
return gpuResource.getValue();
}
return 0.0;
}
public Map<String, Resource> getExtendedResources() {
return extendedResources;
}
/**
* Check whether all the field values are valid.
*
* @return True if all the values are equal or greater than 0, otherwise false.
*/
public boolean isValid() {
if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
for (Resource resource : extendedResources.values()) {
if (resource.getValue() < 0) {
return false;
}
}
return true;
} else {
return false;
}
}
/**
* Checks the current resource less than or equal with the other resource by comparing
* all the fields in the resource.
*
* @param other The resource to compare
* @return True if current resource is less than or equal with the other resource, otherwise return false.
*/
public boolean lessThanOrEqual(@Nonnull ResourceSpec other) {
int cmp1 = Double.compare(this.cpuCores, other.cpuCores);
int cmp2 = Integer.compare(this.heapMemoryInMB, other.heapMemoryInMB);
int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
for (Resource resource : extendedResources.values()) {
if (!other.extendedResources.containsKey(resource.getName()) ||
other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) {
return false;
}
}
return true;
}
return false;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == ResourceSpec.class) {
ResourceSpec that = (ResourceSpec) obj;
return this.cpuCores == that.cpuCores &&
this.heapMemoryInMB == that.heapMemoryInMB &&
this.directMemoryInMB == that.directMemoryInMB &&
this.nativeMemoryInMB == that.nativeMemoryInMB &&
this.stateSizeInMB == that.stateSizeInMB &&
Objects.equals(this.extendedResources, that.extendedResources);
} else {
return false;
}
}
@Override
public int hashCode() {
final long cpuBits = Double.doubleToLongBits(cpuCores);
int result = (int) (cpuBits ^ (cpuBits >>> 32));
result = 31 * result + heapMemoryInMB;
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
result = 31 * result + extendedResources.hashCode();
return result;
}
@Override
public String toString() {
String extend = "";
for (Resource resource : extendedResources.values()) {
extend += ", " + resource.getName() + "=" + resource.getValue();
}
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
public static Builder newBuilder() {
return new Builder();
}
/**
* Builder for the {@link ResourceSpec}.
*/
public static class Builder {
private double cpuCores;
private int heapMemoryInMB;
private int directMemoryInMB;
private int nativeMemoryInMB;
private int stateSizeInMB;
private final Map<String, Resource> extendedResources = new HashMap<>();
public Builder setCpuCores(double cpuCores) {
this.cpuCores = cpuCores;
return this;
}
public Builder setHeapMemoryInMB(int heapMemory) {
this.heapMemoryInMB = heapMemory;
return this;
}
public Builder setDirectMemoryInMB(int directMemory) {
this.directMemoryInMB = directMemory;
return this;
}
public Builder setNativeMemoryInMB(int nativeMemory) {
this.nativeMemoryInMB = nativeMemory;
return this;
}
public Builder setStateSizeInMB(int stateSize) {
this.stateSizeInMB = stateSize;
return this;
}
public Builder setGPUResource(double gpus) {
extendedResources.put(GPU_NAME, new GPUResource(gpus));
return this;
}
public Builder addExtendedResource(Resource... resources) {
for (Resource resource : resources) {
this.extendedResources.put(resource.getName(), resource);
}
return this;
}
public ResourceSpec build() {
return new ResourceSpec(
cpuCores,
heapMemoryInMB,
directMemoryInMB,
nativeMemoryInMB,
stateSizeInMB,
extendedResources.values().toArray(new Resource[0]));
}
}
}