blob: 025a565a25890d17771756ba4a9cd90db2639b49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.batching;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.samza.util.HighResolutionClock;
/**
* Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more
* operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be
* created and a timer will be set for it.
*
* @param <K> The type of the key associated with the {@link Operation}
* @param <V> The type of the value associated with the {@link Operation}
*/
public class BatchProcessor<K, V> {
private final ScheduledExecutorService scheduledExecutorService;
private final ReentrantLock lock = new ReentrantLock();
private final BatchHandler<K, V> batchHandler;
private final BatchProvider<K, V> batchProvider;
private final BatchMetrics batchMetrics;
private final HighResolutionClock clock;
private Batch<K, V> batch;
private ScheduledFuture<?> scheduledFuture;
private long batchOpenTimestamp;
/**
* @param batchMetrics Batch metrics.
* @param batchHandler Defines how each batch will be processed.
* @param batchProvider The batch provider to create a batch instance.
* @param clock A clock used to get the timestamp.
* @param scheduledExecutorService A scheduled executor service to set timers for the managed batches.
*/
public BatchProcessor(BatchMetrics batchMetrics, BatchHandler<K, V> batchHandler, BatchProvider batchProvider,
HighResolutionClock clock, ScheduledExecutorService scheduledExecutorService) {
Preconditions.checkNotNull(batchHandler);
Preconditions.checkNotNull(batchProvider);
Preconditions.checkNotNull(clock);
Preconditions.checkNotNull(scheduledExecutorService);
this.batchHandler = batchHandler;
this.batchProvider = batchProvider;
this.scheduledExecutorService = scheduledExecutorService;
this.batchMetrics = batchMetrics;
this.clock = clock;
}
private CompletableFuture<Void> addOperation(Operation<K, V> operation) {
if (batch == null) {
startNewBatch();
}
final CompletableFuture<Void> res = batch.addOperation(operation);
if (batch.isClosed()) {
processBatch(true);
}
return res;
}
/**
* @param operation The query operation to be added to the batch.
* @return A {@link CompletableFuture} to indicate whether the operation is finished.
*/
CompletableFuture<V> processQueryOperation(Operation<K, V> operation) {
Preconditions.checkNotNull(operation);
Preconditions.checkArgument(operation instanceof GetOperation);
lock.lock();
try {
GetOperation<K, V> getOperation = (GetOperation) operation;
addOperation(getOperation);
return getOperation.getCompletableFuture();
} finally {
lock.unlock();
}
}
/**
* @param operation The update operation to be added to the batch.
* @return A {@link CompletableFuture} to indicate whether the operation is finished.
*/
CompletableFuture<Void> processUpdateOperation(Operation<K, V> operation) {
Preconditions.checkNotNull(operation);
Preconditions.checkArgument(operation instanceof PutOperation || operation instanceof DeleteOperation);
lock.lock();
try {
return addOperation(operation);
} finally {
lock.unlock();
}
}
private void processBatch(boolean cancelTimer) {
mayCancelTimer(cancelTimer);
closeBatch();
batchHandler.handle(batch);
startNewBatch();
}
private void startNewBatch() {
batch = batchProvider.getBatch();
batchOpenTimestamp = clock.nanoTime();
batchMetrics.incBatchCount();
setBatchTimer(batch);
}
private void closeBatch() {
batch.close();
batchMetrics.updateBatchDuration(clock.nanoTime() - batchOpenTimestamp);
}
private void mayCancelTimer(boolean cancelTimer) {
if (cancelTimer && scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* Set a timer to close the batch when the batch is older than the max delay.
*/
private void setBatchTimer(Batch<K, V> batch) {
final long maxDelay = batch.getMaxBatchDelay().toMillis();
if (maxDelay != Integer.MAX_VALUE) {
scheduledFuture = scheduledExecutorService.schedule(() -> {
lock.lock();
try {
processBatch(false);
} finally {
lock.unlock();
}
}, maxDelay, TimeUnit.MILLISECONDS);
}
}
/**
* Stop the processor and cancel all the pending futures.
*/
void stop() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* Get the current number of operations received.
*/
@VisibleForTesting
int size() {
return batch == null ? 0 : batch.size();
}
/**
* Get the latest update operation for the specified key.
*/
@VisibleForTesting
Operation<K, V> getLastUpdate(K key) {
final Collection<Operation<K, V>> operations = batch.getOperations();
final Iterator<Operation<K, V>> iterator = operations.iterator();
Operation<K, V> lastUpdate = null;
while (iterator.hasNext()) {
final Operation<K, V> operation = iterator.next();
if ((operation instanceof PutOperation || operation instanceof DeleteOperation)
&& operation.getKey().equals(key)) {
lastUpdate = operation;
}
}
return lastUpdate;
}
}