blob: c15a04cbec5aca7d224a07ef72c29d7b81c25dab [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import org.apache.aurora.common.stats.SlidingStats;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkState;
/**
* Generic helper that allows bundling multiple work items into a single {@link Storage}
* transaction aiming to reduce the write lock contention.
*
* @param <T> Expected result type.
*/
public class BatchWorker<T> extends AbstractExecutionThreadService {
/**
* Empty result placeholder.
*/
public interface NoResult { }
/**
* Convenience wrapper for a non-repeatable no value work {@link Result}.
*/
public static final NoResult NO_RESULT = new NoResult() { };
private static final Logger LOG = LoggerFactory.getLogger(BatchWorker.class);
private final Storage storage;
private final int maxBatchSize;
private final SlidingStats batchUnlocked;
private final SlidingStats batchLocked;
private final BlockingQueue<WorkItem<T>> workQueue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduledExecutor;
private final AtomicInteger lastBatchSize = new AtomicInteger(0);
private final AtomicLong itemsProcessed;
private final AtomicLong batchesProcessed;
/**
* Wraps result returned by the {@link RepeatableWork} item.
*
* @param <T> Expected result type.
*/
public static class Result<T> {
private final boolean isCompleted;
private final T value;
/**
* Initializes a {@link Result} instance with {@code isCompleted} and {@code value}.
* <p>
* The {@code isCompleted} may be set to {@code False} for a {@link RepeatableWork} that has
* not finished yet. Otherwise, it must be set to {@code True}.
*
* @param isCompleted Flag indicating if the {@link RepeatableWork} has completed.
* @param value result value.
*/
public Result(boolean isCompleted, T value) {
this.isCompleted = isCompleted;
this.value = value;
}
}
/**
* Encapsulates a potentially repeatable operation.
*/
public interface RepeatableWork<T> {
/**
* Abstracts a unit of repeatable (i.e.: "repeat until completed") work.
* <p>
* The work unit may be repeated as instructed by the {@link Result}.
*
* @param storeProvider {@link MutableStoreProvider} instance.
* @return {@link Result}
*/
Result<T> apply(MutableStoreProvider storeProvider);
}
/**
* Encapsulates a non-repeatable operation.
*/
public interface Work<T> extends RepeatableWork<T> {
@Override
default Result<T> apply(MutableStoreProvider storeProvider) {
T value = execute(storeProvider);
return new Result<>(true, value);
}
/**
* Abstracts a unit of non-repeatable (i.e.: "run exactly once") work.
*
* @param storeProvider {@link MutableStoreProvider} instance.
* @return result value.
*/
T execute(MutableStoreProvider storeProvider);
}
@Inject
protected BatchWorker(
Storage storage,
StatsProvider statsProvider,
int maxBatchSize) {
this.storage = requireNonNull(storage);
this.maxBatchSize = maxBatchSize;
scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor(serviceName() + "-%d", LOG);
statsProvider.makeGauge(serviceName() + "_queue_size", () -> workQueue.size());
statsProvider.makeGauge(
serviceName() + "_last_processed_batch_size",
() -> lastBatchSize.intValue());
batchUnlocked = new SlidingStats(serviceName() + "_batch_unlocked", "nanos");
batchLocked = new SlidingStats(serviceName() + "_batch_locked", "nanos");
itemsProcessed = statsProvider.makeCounter(serviceName() + "_items_processed");
batchesProcessed = statsProvider.makeCounter(serviceName() + "_batches_processed");
}
/**
* Executes a non-repeatable {@link Work} and returns {@link CompletableFuture} to wait on.
*
* @param work A non-repeatable {@link Work} to execute.
* @return {@link CompletableFuture} to wait on.
*/
public CompletableFuture<T> execute(Work<T> work) {
CompletableFuture<T> result = new CompletableFuture<>();
workQueue.add(new WorkItem<>(
work,
result,
Optional.empty(),
Optional.empty()));
return result;
}
/**
* Executes a {@link RepeatableWork} until it completes and returns {@link CompletableFuture}
* to wait on.
*
* @param backoffStrategy A {@link BackoffStrategy} instance to backoff subsequent runs.
* @param work A {@link RepeatableWork} to execute.
*/
public CompletableFuture<T> executeWithReplay(
BackoffStrategy backoffStrategy,
RepeatableWork<T> work) {
CompletableFuture<T> result = new CompletableFuture<>();
workQueue.add(new WorkItem<>(
work,
result,
Optional.of(backoffStrategy),
Optional.of(0L)));
return result;
}
@Override
protected void run() throws Exception {
while (isRunning()) {
List<WorkItem<T>> batch = new LinkedList<>();
// Make the loop responsive to shutdown under light load by using
// a short non-configurable timeout in poll().
Optional<WorkItem<T>> head = Optional.ofNullable(workQueue.poll(3, TimeUnit.SECONDS));
if (head.isPresent()) {
workQueue.add(head.get());
workQueue.drainTo(batch, maxBatchSize - batch.size());
processBatch(batch);
}
}
}
private void processBatch(List<WorkItem<T>> batch) {
if (!batch.isEmpty()) {
long unlockedStart = System.nanoTime();
storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
long lockedStart = System.nanoTime();
for (WorkItem<T> item : batch) {
Result<T> itemResult = item.work.apply(storeProvider);
if (itemResult.isCompleted) {
item.result.complete(itemResult.value);
} else {
// Work not finished yet - re-queue for a followup later.
long backoffMsec = backoffFor(item);
scheduledExecutor.schedule(
() -> workQueue.add(new WorkItem<>(
item.work,
item.result,
item.backoffStrategy,
Optional.of(backoffMsec))),
backoffMsec,
TimeUnit.MILLISECONDS);
}
}
batchLocked.accumulate(System.nanoTime() - lockedStart);
});
batchUnlocked.accumulate(System.nanoTime() - unlockedStart);
batchesProcessed.incrementAndGet();
lastBatchSize.set(batch.size());
itemsProcessed.addAndGet(batch.size());
}
}
private long backoffFor(WorkItem<T> item) {
checkState(item.backoffStrategy.isPresent());
checkState(item.lastBackoffMsec.isPresent());
return item.backoffStrategy.get().calculateBackoffMs(item.lastBackoffMsec.get());
}
private class WorkItem<V> {
private final RepeatableWork<V> work;
private final CompletableFuture<T> result;
private final Optional<BackoffStrategy> backoffStrategy;
private final Optional<Long> lastBackoffMsec;
WorkItem(
RepeatableWork<V> work,
CompletableFuture<T> result,
Optional<BackoffStrategy> backoffStrategy,
Optional<Long> lastBackoffMsec) {
this.work = work;
this.result = result;
this.backoffStrategy = backoffStrategy;
this.lastBackoffMsec = lastBackoffMsec;
}
}
}