blob: 7040004ae48d3a9d0985cb9b231f914ebf6ff5a4 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.resources;
import java.util.Set;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.aurora.scheduler.base.Numbers;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.Protos.Resource;
import static org.apache.aurora.scheduler.base.Numbers.RANGE_TRANSFORM;
/**
* Converts Mesos resource values to be consumed in Aurora.
*/
public interface MesosResourceConverter {
/**
* Gets Mesos resource quantity.
*
* @param resource Mesos resource to quantify.
* @return Mesos resource quantity.
*/
Double quantify(Resource resource);
/**
* Allocates offer resources to resource request from {@code resourceRequest}.
*
* @param offerResources Offer resources to allocate.
* @param resourceRequest Resource request.
* @param isRevocable Flag indicating if allocated resources must be marked as Mesos-revocable.
* @return Allocated Mesos resources.
*/
Iterable<Resource> toMesosResource(
Iterable<Resource.Builder> offerResources,
Supplier<?> resourceRequest,
boolean isRevocable);
ScalarConverter SCALAR = new ScalarConverter();
RangeConverter RANGES = new RangeConverter();
class ScalarConverter implements MesosResourceConverter {
/**
* Helper function to check if a resource value is small enough to be considered zero.
*/
private static boolean nearZero(double value) {
return Math.abs(value) < 1e-6;
}
@Override
public Double quantify(Resource resource) {
return resource.getScalar().getValue();
}
@Override
public Iterable<Resource> toMesosResource(
Iterable<Resource.Builder> offerResources,
Supplier<?> resourceRequest,
boolean isRevocable) {
double remaining = (Double) resourceRequest.get();
ImmutableList.Builder<Resource> result = ImmutableList.builder();
for (Resource.Builder offerResource : offerResources) {
if (nearZero(remaining)) {
break;
}
final double available = offerResource.getScalar().getValue();
if (nearZero(available)) {
// Skip resource slot that is already used up.
continue;
}
final double used = Math.min(remaining, available);
remaining -= used;
Resource.Builder newResource =
Resource.newBuilder(offerResource.build())
.setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build());
if (isRevocable) {
newResource.setRevocable(Resource.RevocableInfo.newBuilder());
}
result.add(newResource.build());
offerResource.getScalarBuilder().setValue(available - used);
}
if (!nearZero(remaining)) {
// NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
// consistent.
// Maybe we should consider implementing resource veto with this class to ensure that.
throw new ResourceManager.InsufficientResourcesException(
"Insufficient resource when allocating from offer");
}
return result.build();
}
}
class RangeConverter implements MesosResourceConverter {
@Override
public Double quantify(Resource resource) {
return resource.getRanges().getRangeList().stream()
.map(range -> 1 + range.getEnd() - range.getBegin())
.reduce((l, r) -> l + r)
.map(Long::doubleValue)
.orElse(0.0);
}
@Override
public Iterable<Resource> toMesosResource(
Iterable<Resource.Builder> offerResources,
Supplier<?> resourceRequest,
boolean isRevocable) {
@SuppressWarnings("unchecked")
Set<Integer> leftOver = Sets.newHashSet((Set<Integer>) resourceRequest.get());
ImmutableList.Builder<Resource> result = ImmutableList.builder();
for (Resource.Builder r : offerResources) {
Set<Integer> fromResource = Sets.newHashSet(Iterables.concat(
Iterables.transform(r.getRanges().getRangeList(), Numbers.RANGE_TO_MEMBERS)));
Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource));
if (available.isEmpty()) {
continue;
}
Resource.Builder newResource = Protos.Resource.newBuilder(r.build())
.setRanges(Protos.Value.Ranges.newBuilder()
.addAllRange(Iterables.transform(Numbers.toRanges(available), RANGE_TRANSFORM)));
if (isRevocable) {
newResource.setRevocable(Resource.RevocableInfo.newBuilder());
}
result.add(newResource.build());
leftOver.removeAll(available);
if (leftOver.isEmpty()) {
break;
}
}
if (!leftOver.isEmpty()) {
// NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
// consistent.
// Maybe we should consider implementing resource veto with this class to ensure that.
throw new ResourceManager.InsufficientResourcesException(
"Insufficient resource for range type when allocating from offer");
}
return result.build();
}
}
}