blob: 72213184df167a3b533451497f5a32228d06e6ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.disk;
import org.apache.samza.util.Throttleable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* An object that calculates the current work rate using a configurable set of
* {@link Entry} instances.
* <p>
* The supplied {@link Entry} instances may overlap as long as the range
* between low and high water mark of one policy entry does not contain the range of low and high
* water mark of another policy entry. For example, the following entries would be OK:
*
* <ul>
* <li>Low: 0.5, High: 1.0</li>
* <li>Low:0.2, High: 0.8</li>
* </ul>
*
* But the following entries would not:
*
* <ul>
* <li>Low: 0.1, High: 1.0</li>
* <li>Low: 0.2, High: 0.5</li>
* </ul>
*
* Policy entries do not stack. In other words, there is always a clear entry to apply and its work
* factor is not added, multiplied, or in any other way joined with another entry's work rate.
*/
public class WatermarkDiskQuotaPolicy implements DiskQuotaPolicy {
/**
* A comparator that orders {@link Entry} instances in descending order first by
* high water mark and then by low water mark.
*/
private static final Comparator<Entry> POLICY_COMPARATOR = new Comparator<Entry>() {
@Override
public int compare(Entry lhs, Entry rhs) {
if (lhs.getHighWaterMarkPercent() > rhs.getHighWaterMarkPercent()) {
return -1;
} else if (lhs.getHighWaterMarkPercent() < rhs.getHighWaterMarkPercent()) {
return 1;
} else if (lhs.getLowWaterMarkPercent() > rhs.getLowWaterMarkPercent()) {
return -1;
} else if (lhs.getLowWaterMarkPercent() < rhs.getLowWaterMarkPercent()) {
return 1;
}
return 0;
}
};
private static final Logger log = LoggerFactory.getLogger(WatermarkDiskQuotaPolicy.class);
// Lock guards entryIndex
private final List<Entry> entries;
private int entryIndex = -1;
private static String dumpPolicyEntries(List<Entry> entries) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < entries.size(); ++i) {
final Entry entry = entries.get(i);
sb.append(String.format("\n Policy entry %d. Low: %f. High: %f. Work Factor: %f",
i,
entry.getLowWaterMarkPercent(),
entry.getHighWaterMarkPercent(),
entry.getWorkFactor()));
}
return sb.toString();
}
/**
* Constructs a new watermark disk quota policy using the supplied policy entries.
*
* @param entries a list of {@link Entry} objects that describe how to adjust the work rate. May
* be empty, but cannot be {@code null}.
*/
public WatermarkDiskQuotaPolicy(List<Entry> entries) {
// Copy entries, sort, make immutable
entries = new ArrayList<>(entries);
Collections.sort(entries, POLICY_COMPARATOR);
this.entries = Collections.unmodifiableList(entries);
// Validate entries
double lastHighWaterMark = 1.0;
double lastWorkFactor = Throttleable.MAX_WORK_FACTOR;
for (int i = 0; i < entries.size(); ++i) {
final Entry entry = entries.get(i);
if (lastHighWaterMark < entry.getHighWaterMarkPercent()) {
throw new IllegalArgumentException("Policy entry " + i +
" has high water mark (" + entry.getHighWaterMarkPercent() +
") > previous high water mark (" + lastHighWaterMark + "):" +
dumpPolicyEntries(entries));
}
if (lastWorkFactor < entry.getWorkFactor()) {
throw new IllegalArgumentException("Policy entry " + i +
" has work factor (" + entry.getWorkFactor() +
") < previous work factor (" + lastWorkFactor + "):" +
dumpPolicyEntries(entries));
}
if (entry.getWorkFactor() < Throttleable.MIN_WORK_FACTOR) {
throw new IllegalArgumentException("Policy entry " + i +
" has work factor (" + entry.getWorkFactor() +
") < minimum work factor (" + Throttleable.MIN_WORK_FACTOR + "):" +
dumpPolicyEntries(entries));
}
lastHighWaterMark = entry.getHighWaterMarkPercent();
lastWorkFactor = entry.getWorkFactor();
}
log.info("Using the following disk quota enforcement entries: {}",
entries.isEmpty() ? "NONE" : dumpPolicyEntries(entries));
}
@Override
public double apply(double availableDiskQuotaPercentage) {
double workFactor = entryIndex == -1 ? 1.0 : this.entries.get(entryIndex).getWorkFactor();
int entryIndex = this.entryIndex;
while (entryIndex >= 0 &&
entries.get(entryIndex).getHighWaterMarkPercent() <= availableDiskQuotaPercentage) {
--entryIndex;
}
while (entryIndex < entries.size() - 1 &&
entries.get(entryIndex + 1).getLowWaterMarkPercent() > availableDiskQuotaPercentage) {
++entryIndex;
}
if (entryIndex != this.entryIndex) {
workFactor = entryIndex == -1 ? 1.0 : entries.get(entryIndex).getWorkFactor();
this.entryIndex = entryIndex;
log.info("Work factor has been updated: {}.", workFactor);
}
return workFactor;
}
/**
* A thread-safe value object that represents a policy entry for disk quotas. When the percentage
* of available disk space assigned via the disk quota drops below the low water mark the
* configured work rate is applied to reduce throughput of the task. When the available disk space
* rises above the high water mark the work rate throttling is removed (but other entries may
* still apply).
*/
public static class Entry {
private final double lowWaterMarkPercent;
private final double highWaterMarkPercent;
private final double workFactor;
public Entry(double lowWaterMarkPercent, double highWaterMarkPercent, double workFactor) {
if (lowWaterMarkPercent < 0.0) {
throw new IllegalArgumentException("low water mark percent (" + lowWaterMarkPercent + ") < 0");
}
if (highWaterMarkPercent > 1.0) {
throw new IllegalArgumentException("high water mark percent (" + highWaterMarkPercent + ") > 1");
}
if (lowWaterMarkPercent > highWaterMarkPercent) {
throw new IllegalArgumentException("low water mark percent (" + lowWaterMarkPercent + ") > " +
"high water mark percent (" + highWaterMarkPercent + ")");
}
if (workFactor <= 0.0) {
throw new IllegalArgumentException("work factor (" + workFactor + ") <= 0");
}
if (workFactor > 1.0) {
throw new IllegalArgumentException("work factor (" + workFactor + ") > 1");
}
this.lowWaterMarkPercent = lowWaterMarkPercent;
this.highWaterMarkPercent = highWaterMarkPercent;
this.workFactor = workFactor;
}
public double getLowWaterMarkPercent() {
return lowWaterMarkPercent;
}
public double getHighWaterMarkPercent() {
return highWaterMarkPercent;
}
public double getWorkFactor() {
return workFactor;
}
}
}