blob: 07397330ee42fa33e2c126a19dfe6d78c7bf05a8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;
import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.client.AsyncKuduClient.LookupType;
import org.apache.kudu.util.AsyncUtil;
import org.apache.kudu.util.LogThrottler;
import org.apache.kudu.util.Slice;
* An {@code AsyncKuduSession} belongs to a specific {@link AsyncKuduClient}, and represents a
* context in which all write data access should take place. Within a session,
* multiple operations may be accumulated and batched together for better
* efficiency. Settings like timeouts, priorities, and trace IDs are also set
* per session.
* <p>{@code AsyncKuduSession} is separate from {@link AsyncKuduClient} because, in a multi-threaded
* application, different threads may need to concurrently execute
* transactions. Similar to a JDBC "session", transaction boundaries will be
* delineated on a per-session basis -- in between a "BeginTransaction" and
* "Commit" call on a given session, all operations will be part of the same
* transaction. Meanwhile another concurrent session object can safely run
* non-transactional work or other transactions without interfering.
* <p>Therefore, this class is <b>not</b> thread-safe.
* <p>Additionally, there is a guarantee that writes from different sessions do not
* get batched together into the same RPCs -- this means that latency-sensitive
* clients can run through the same {@link AsyncKuduClient} object as throughput-oriented
* clients, perhaps by setting the latency-sensitive session's timeouts low and
* priorities high. Without the separation of batches, a latency-sensitive
* single-row insert might get batched along with 10MB worth of inserts from the
* batch writer, thus delaying the response significantly.
* <p>Timeouts are handled differently depending on the flush mode.
* With {@link SessionConfiguration.FlushMode#AUTO_FLUSH_SYNC AUTO_FLUSH_SYNC}, the timeout is set
* on each {@linkplain #apply apply}()'d operation.
* With {@link SessionConfiguration.FlushMode#AUTO_FLUSH_BACKGROUND AUTO_FLUSH_BACKGROUND} and
* {@link SessionConfiguration.FlushMode#MANUAL_FLUSH MANUAL_FLUSH}, the timeout is assigned to a
* whole batch of operations upon {@linkplain #flush flush}()'ing. It means that in a situation
* with a timeout of 500ms and a flush interval of 1000ms, an operation can be outstanding for up to
* 1500ms before being timed out.
* <p><strong>Warning: a note on out-of-order operations</strong>
* <p>When using {@code AsyncKuduSession}, it is not difficult to trigger concurrent flushes on
* the same session. The result is that operations applied in a particular order within a single
* session may be applied in a different order on the server side, even for a single tablet. To
* prevent this behavior, ensure that only one flush is outstanding at a given time (the maximum
* concurrent flushes per {@code AsyncKuduSession} is hard-coded to 2).
* <p>If operation interleaving would be unacceptable for your application, consider using one of
* the following strategies to avoid it:
* <ol>
* <li>When using {@link SessionConfiguration.FlushMode#MANUAL_FLUSH MANUAL_FLUSH} mode,
* wait for one {@link #flush flush()} to {@code join()} before triggering another flush.
* <li>When using {@link SessionConfiguration.FlushMode#AUTO_FLUSH_SYNC AUTO_FLUSH_SYNC}
* mode, wait for each {@link #apply apply()} to {@code join()} before applying another operation.
* <li>Consider not using
* {@link SessionConfiguration.FlushMode#AUTO_FLUSH_BACKGROUND AUTO_FLUSH_BACKGROUND} mode.
* <li>Make your application resilient to out-of-order application of writes.
* <li>Avoid applying an {@link Operation} on a particular row until any previous write to that
* row has been successfully flushed.
* </ol>
* <p>For more information on per-session operation interleaving, see
* <a href="">KUDU-1767</a>.
public class AsyncKuduSession implements SessionConfiguration {
public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduSession.class);
* Instance of LogThrottler isn't static so we can throttle messages per session
private final LogThrottler throttleClosedLog = new LogThrottler(LOG);
private final AsyncKuduClient client;
private final Random randomizer = new Random();
private final ErrorCollector errorCollector;
private int flushIntervalMillis = 1000;
private int mutationBufferMaxOps = 1000; // TODO express this in terms of data size.
private FlushMode flushMode;
private ExternalConsistencyMode consistencyMode;
private long timeoutMillis;
private final long txnId;
* Protects internal state from concurrent access. {@code AsyncKuduSession} is not threadsafe
* from the application's perspective, but because internally async timers and async flushing
* tasks may access the session concurrently with the application, synchronization is still
* needed.
private final Object monitor = new Object();
* Tracks the currently active buffer.
* When in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} or {@link FlushMode#AUTO_FLUSH_SYNC},
* {@code AsyncKuduSession} uses double buffering to improve write throughput. While the
* application is {@link #apply}ing operations to one buffer (the {@code activeBuffer}), the
* second buffer is either being flushed, or if it has already been flushed, it waits in the
* {@link #inactiveBuffers} queue. When the currently active buffer is flushed,
* {@code activeBuffer} is set to {@code null}. On the next call to {@code apply}, an inactive
* buffer is taken from {@code inactiveBuffers} and made the new active buffer. If both
* buffers are still flushing, then the {@code apply} call throws {@link PleaseThrottleException}.
private Buffer activeBuffer;
* The buffers. May either be active (pointed to by {@link #activeBuffer},
* inactive (in the {@link #inactiveBuffers}) queue, or flushing.
private final Buffer bufferA = new Buffer();
private final Buffer bufferB = new Buffer();
* Queue containing flushed, inactive buffers. May be accessed from callbacks (I/O threads).
* We restrict the session to only two buffers, so {@link BlockingQueue#add} can
* be used without chance of failure.
private final BlockingQueue<Buffer> inactiveBuffers = new ArrayBlockingQueue<>(2, false);
* Deferred used to notify on flush events. Atomically swapped and completed every time a buffer
* is flushed. This can be used to notify handlers of {@link PleaseThrottleException} that more
* capacity may be available in the active buffer.
private final AtomicReference<Deferred<Void>> flushNotification =
new AtomicReference<>(new Deferred<>());
* Tracks whether the session has been closed.
private volatile boolean closed = false;
private boolean ignoreAllDuplicateRows = false;
private boolean ignoreAllNotFoundRows = false;
* Package-private constructor meant to be used via AsyncKuduClient
* @param client client that creates this session
AsyncKuduSession(AsyncKuduClient client) {
this.client = client;
this.txnId = AsyncKuduClient.INVALID_TXN_ID;
flushMode = FlushMode.AUTO_FLUSH_SYNC;
consistencyMode = CLIENT_PROPAGATED;
timeoutMillis = client.getDefaultOperationTimeoutMs();
errorCollector = new ErrorCollector(mutationBufferMaxOps);
* Constructor for a transactional session.
* @param client client that creates this session
* @param txnId transaction identifier for all operations within the session
AsyncKuduSession(AsyncKuduClient client, long txnId) {
assert txnId > AsyncKuduClient.INVALID_TXN_ID;
this.client = client;
this.txnId = txnId;
flushMode = FlushMode.AUTO_FLUSH_SYNC;
consistencyMode = CLIENT_PROPAGATED;
timeoutMillis = client.getDefaultOperationTimeoutMs();
errorCollector = new ErrorCollector(mutationBufferMaxOps);
public FlushMode getFlushMode() {
return this.flushMode;
// TODO(wdberkeley): KUDU-1944. Don't let applications change the flush mode. Use a new session.
public void setFlushMode(FlushMode flushMode) {
if (hasPendingOperations()) {
throw new IllegalArgumentException("Cannot change flush mode when writes are buffered");
this.flushMode = flushMode;
public void setExternalConsistencyMode(ExternalConsistencyMode consistencyMode) {
if (hasPendingOperations()) {
throw new IllegalArgumentException("Cannot change consistency mode " +
"when writes are buffered");
this.consistencyMode = consistencyMode;
public void setMutationBufferSpace(int numOps) {
if (hasPendingOperations()) {
throw new IllegalArgumentException("Cannot change the buffer" +
" size when operations are buffered");
this.mutationBufferMaxOps = numOps;
public void setErrorCollectorSpace(int size) {
public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) {
LOG.warn("setMutationBufferLowWatermark is deprecated");
* Lets us set a specific seed for tests
* @param seed the seed to use
void setRandomSeed(long seed) {
public void setFlushInterval(int flushIntervalMillis) {
this.flushIntervalMillis = flushIntervalMillis;
public void setTimeoutMillis(long timeout) {
this.timeoutMillis = timeout;
public long getTimeoutMillis() {
return this.timeoutMillis;
public boolean isClosed() {
return closed;
public boolean isIgnoreAllDuplicateRows() {
return ignoreAllDuplicateRows;
public void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
public boolean isIgnoreAllNotFoundRows() {
return ignoreAllNotFoundRows;
public void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows) {
this.ignoreAllNotFoundRows = ignoreAllNotFoundRows;
public int countPendingErrors() {
return errorCollector.countErrors();
public RowErrorsAndOverflowStatus getPendingErrors() {
return errorCollector.getErrors();
* Flushes the buffered operations and marks this session as closed.
* See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method.
* @return a Deferred whose callback chain will be invoked when.
* everything that was buffered at the time of the call has been flushed.
public Deferred<List<OperationResponse>> close() {
if (!closed) {
closed = true;
return flush();
* Callback which waits for all tablet location lookups to complete, groups all operations into
* batches by tablet, and dispatches them. When all of the batches are complete, a deferred is
* fired and the buffer is added to the inactive queue.
private final class TabletLookupCB implements Callback<Void, Object> {
private final AtomicInteger lookupsOutstanding;
private final Buffer buffer;
private final Deferred<List<BatchResponse>> deferred;
public TabletLookupCB(Buffer buffer, Deferred<List<BatchResponse>> deferred) {
this.lookupsOutstanding = new AtomicInteger(buffer.getOperations().size());
this.buffer = buffer;
this.deferred = deferred;
public Void call(Object unused) throws Exception {
if (lookupsOutstanding.decrementAndGet() != 0) {
return null;
// The final tablet lookup is complete. Batch all of the buffered
// operations into their respective tablet, and then send the batches.
// Group the operations by tablet.
Map<Slice, Batch> batches = new HashMap<>();
List<OperationResponse> opsFailedInLookup = new ArrayList<>();
List<Integer> opsFailedIndexesList = new ArrayList<>();
int currentIndex = 0;
for (BufferedOperation bufferedOp : buffer.getOperations()) {
Operation operation = bufferedOp.getOperation();
if (bufferedOp.tabletLookupFailed()) {
Exception failure = bufferedOp.getTabletLookupFailure();
RowError error;
if (failure instanceof NonCoveredRangeException) {
// TODO: this should be something different than NotFound so that
// applications can distinguish from updates on missing rows.
error = new RowError(Status.NotFound(String.format(
"%s: %s", failure.getMessage(), operation.getTable().getName())), operation);
} else {
LOG.warn("unexpected tablet lookup failure for operation {}", operation, failure);
error = new RowError(Status.RuntimeError(failure.getMessage()), operation);
OperationResponse response = new OperationResponse(0, null, 0, operation, error);
// Add the row error to the error collector if the session is in background flush mode,
// and complete the operation's deferred with the error response. The ordering between
// adding to the error collector and completing the deferred should not matter since
// applications should be using one or the other method for error handling, not both.
if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND) {
LocatedTablet tablet = bufferedOp.getTablet();
Slice tabletId = new Slice(tablet.getTabletId());
Batch batch = batches.get(tabletId);
if (batch == null) {
batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows,
ignoreAllNotFoundRows, txnId);
batches.put(tabletId, batch);
batch.add(operation, currentIndex++);
List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size() + 1);
if (!opsFailedInLookup.isEmpty()) {
Deferred.fromResult(new BatchResponse(opsFailedInLookup, opsFailedIndexesList)));
for (Batch batch : batches.values()) {
if (timeoutMillis != 0) {
batch.resetTimeoutMillis(client.getTimer(), timeoutMillis);
// On completion of all batches, fire the completion deferred, and add the buffer
// back to the inactive buffers queue. This frees it up for new inserts.
new Callback<Void, Object>() {
public Void call(Object responses) {
return null;
return null;
* Creates callbacks to handle a multi-put and adds them to the request.
* @param request the request for which we must handle the response
private void addBatchCallbacks(final Batch request) {
final class BatchCallback implements Callback<BatchResponse, BatchResponse> {
public BatchResponse call(final BatchResponse response) {
LOG.trace("Got a Batch response for {} rows", request.operations.size());
// Send individualized responses to all the operations in this batch.
for (OperationResponse operationResponse : response.getIndividualResponses()) {
if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND && operationResponse.hasRowError()) {
// Fire the callback after collecting the errors so that the errors
// are visible should the callback interrogate the error collector.
return response;
public String toString() {
return "apply batch response";
final class BatchErrCallback implements Callback<Object, Exception> {
public Object call(Exception e) {
// If the exception we receive is a KuduException we're going to build OperationResponses.
Status status = null;
List<OperationResponse> responses = null;
boolean handleKuduException = e instanceof KuduException;
if (handleKuduException) {
status = ((KuduException) e).getStatus();
responses = new ArrayList<>(request.operations.size());
for (Operation operation : request.operations) {
// Same comment as in BatchCallback regarding the ordering of when to callback.
if (handleKuduException) {
RowError rowError = new RowError(status, operation);
OperationResponse response = new OperationResponse(0, null, 0, operation, rowError);
} else {
// We have no idea what the exception is so we'll just send it up.
// Note that returning an object that's not an exception will make us leave the
// errback chain. Effectively, the BatchResponse below will end up as part of the list
// passed to ConvertBatchToListOfResponsesCB.
return handleKuduException ? new BatchResponse(responses, request.operationIndexes) : e;
public String toString() {
return "apply batch error response";
request.getDeferred().addCallbacks(new BatchCallback(), new BatchErrCallback());
* Returns a buffer to the inactive queue after flushing.
* @param buffer the buffer to return to the inactive queue.
private void queueBuffer(Buffer buffer) {
if (buffer.callbackFlushNotification()) {
Deferred<Void> localFlushNotification = flushNotification.getAndSet(new Deferred<>());
* Flush buffered writes.
* @return a {@link Deferred} whose callback chain will be invoked when all applied operations at
* the time of the call have been flushed.
public Deferred<List<OperationResponse>> flush() {
Buffer buffer;
Deferred<Void> nonActiveBufferFlush;
synchronized (monitor) {
nonActiveBufferFlush = getNonActiveFlushNotificationUnlocked();
buffer = retireActiveBufferUnlocked();
// TODO(wdb): If there is a buffer flushing already, this code will wait for it to finish before
// flushing 'buffer'. This is less performant but has less surprising semantics than
// simultaneously flushing two buffers. Even though we don't promise those semantics,
// I'm going to leave it this way for now because it's never caused any trouble.
return AsyncUtil.addBothDeferring(nonActiveBufferFlush, unused -> doFlush(buffer));
* Flushes a write buffer. This method takes ownership of 'buffer', no other concurrent access
* is allowed. 'buffer' is allowed to be null.
* @param buffer the buffer to flush, must not be modified once passed to this method
* @return the operation responses
private Deferred<List<OperationResponse>> doFlush(Buffer buffer) {
if (buffer == null || buffer.getOperations().isEmpty()) {
return Deferred.fromResult(ImmutableList.of());
LOG.debug("flushing buffer: {}", buffer);
Deferred<List<BatchResponse>> batchResponses = new Deferred<>();
Callback<Void, Object> tabletLookupCB = new TabletLookupCB(buffer, batchResponses);
for (BufferedOperation bufferedOperation : buffer.getOperations()) {
AsyncUtil.addBoth(bufferedOperation.getTabletLookup(), tabletLookupCB);
return batchResponses.addCallback(ConvertBatchToListOfResponsesCB.getInstance());
* Callback used to send a list of OperationResponse instead of BatchResponse since the
* latter is an implementation detail.
private static class ConvertBatchToListOfResponsesCB implements Callback<List<OperationResponse>,
List<BatchResponse>> {
private static final ConvertBatchToListOfResponsesCB INSTANCE =
new ConvertBatchToListOfResponsesCB();
public List<OperationResponse> call(List<BatchResponse> batchResponses) throws Exception {
// First compute the size of the union of all the lists so that we don't trigger expensive
// list growths while adding responses to it.
int size = 0;
for (BatchResponse batchResponse : batchResponses) {
size += batchResponse.getIndividualResponses().size();
OperationResponse[] responses = new OperationResponse[size];
for (BatchResponse batchResponse : batchResponses) {
List<OperationResponse> responseList = batchResponse.getIndividualResponses();
List<Integer> indexList = batchResponse.getResponseIndexes();
for (int i = 0; i < indexList.size(); i++) {
int index = indexList.get(i);
assert responses[index] == null;
responses[index] = responseList.get(i);
return Arrays.asList(responses);
public String toString() {
return "ConvertBatchToListOfResponsesCB";
public static ConvertBatchToListOfResponsesCB getInstance() {
return INSTANCE;
public boolean hasPendingOperations() {
synchronized (monitor) {
return activeBuffer == null ? inactiveBuffers.size() < 2 :
!activeBuffer.getOperations().isEmpty() || !inactiveBufferAvailable();
// TODO(wdberkeley): Get rid of the idea of an Operation as a distinct way to do a write. Replace
// it with a single-operation Batch.
private Deferred<OperationResponse> doAutoFlushSync(final Operation operation) {
if (timeoutMillis != 0) {
operation.resetTimeoutMillis(client.getTimer(), timeoutMillis);
return client.sendRpcToTablet(operation)
.addCallbackDeferring(resp -> {
return Deferred.fromResult(resp);
.addErrback(new SingleOperationErrCallback(operation));
* Apply the given operation.
* <p>
* The behavior of this method depends on the configured
* {@link SessionConfiguration.FlushMode FlushMode}. Regardless
* of flush mode, however, {@code apply()} may begin to perform processing in the background
* for the call (e.g looking up the tablet location, etc).
* @param operation operation to apply
* @return a Deferred to track this operation
* @throws KuduException if an error happens or {@link PleaseThrottleException} is triggered
* @see SessionConfiguration.FlushMode FlushMode
public Deferred<OperationResponse> apply(final Operation operation) throws KuduException {
Preconditions.checkNotNull(operation, "Cannot apply a null operation");
Preconditions.checkArgument(operation.getTable().getAsyncClient() == client,
"Applied operations must be created from a KuduTable instance opened " +
"from the same client that opened this KuduSession");
if (closed) {
// Ideally this would be a precondition, but that may break existing
// clients who have grown to rely on this unsafe behavior.
throttleClosedLog.warn(60L, "Applying an operation in a closed session; this is unsafe");
// Freeze the row so that the client cannot concurrently modify it while it is in flight.
// If immediate flush mode, send the operation directly.
if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
return doAutoFlushSync(operation);
// Kick off a location lookup.
Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
// Holds a buffer that should be flushed outside the synchronized block, if necessary.
Buffer fullBuffer = null;
try {
synchronized (monitor) {
Deferred<Void> notification = flushNotification.get();
if (activeBuffer == null) {
// If the active buffer is null then we recently flushed. Check if there
// is an inactive buffer available to replace as the active.
if (inactiveBufferAvailable()) {
} else {
Status statusServiceUnavailable =
Status.ServiceUnavailable("all buffers are currently flushing");
// This can happen if the user writes into a buffer, flushes it, writes
// into the second, flushes it, and immediately tries to write again.
throw new PleaseThrottleException(statusServiceUnavailable,
null, operation, notification);
int activeBufferSize = activeBuffer.getOperations().size();
switch (flushMode) {
// This case is handled above and is impossible here.
// TODO(wdberkeley): Handle AUTO_FLUSH_SYNC just like other flush modes.
assert false;
if (activeBufferSize >= mutationBufferMaxOps) {
Status statusIllegalState =
Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big");
throw new NonRecoverableException(statusIllegalState);
activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
if (activeBufferSize >= mutationBufferMaxOps) {
// If the active buffer is full or overflowing, be sure to kick off a flush.
fullBuffer = retireActiveBufferUnlocked();
activeBufferSize = 0;
if (!inactiveBufferAvailable()) {
Status statusServiceUnavailable =
Status.ServiceUnavailable("All buffers are currently flushing");
throw new PleaseThrottleException(statusServiceUnavailable,
null, operation, notification);
// Add the operation to the active buffer, and:
// 1. If it's the first operation in the buffer, start a background flush timer.
// 2. If it filled or overflowed the buffer, kick off a flush.
activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
if (activeBufferSize == 0) {
AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(),
if (activeBufferSize + 1 >= mutationBufferMaxOps && inactiveBufferAvailable()) {
fullBuffer = retireActiveBufferUnlocked();
throw new IllegalArgumentException("Unexpected flushMode: " + flushMode);
} finally {
// Flush the buffer outside of the synchronized block, if required.
return operation.getDeferred();
* Returns {@code true} if there is an inactive buffer available.
* @return true if there is currently an inactive buffer available
private boolean inactiveBufferAvailable() {
return inactiveBuffers.peek() != null;
* Refreshes the active buffer. This should only be called after a
* {@link #flush()} when the active buffer is {@code null}, there is an
* inactive buffer available (see {@link #inactiveBufferAvailable()}, and
* {@link #monitor} is locked.
private void refreshActiveBufferUnlocked() {
Preconditions.checkState(activeBuffer == null);
activeBuffer = inactiveBuffers.remove();
* Retires the active buffer and returns it. Returns null if there is no active buffer.
* This should only be called if {@link #monitor} is locked.
private Buffer retireActiveBufferUnlocked() {
Buffer buffer = activeBuffer;
activeBuffer = null;
return buffer;
* Returns a flush notification for the currently non-active buffers.
* This is used during manual {@link #flush} calls to ensure that all buffers (not just the active
* buffer) are fully flushed before completing.
private Deferred<Void> getNonActiveFlushNotificationUnlocked() {
final Deferred<Void> notificationA = bufferA.getFlushNotification();
final Deferred<Void> notificationB = bufferB.getFlushNotification();
if (activeBuffer == null) {
// Both buffers are either flushing or inactive.
return AsyncUtil.addBothDeferring(notificationA, unused -> notificationB);
} else if (activeBuffer == bufferA) {
return notificationB;
} else {
return notificationA;
* Analogous to BatchErrCallback above but for AUTO_FLUSH_SYNC which doesn't handle lists of
* operations and responses.
private static final class SingleOperationErrCallback implements Callback<Object, Exception> {
private final Operation operation;
private SingleOperationErrCallback(Operation operation) {
this.operation = operation;
public Object call(Exception e) throws Exception {
if (e instanceof KuduException) {
Status status;
if (e instanceof NonCoveredRangeException) {
status = Status.NotFound(String.format(
"%s: %s", e.getMessage(), operation.getTable().getName()));
} else {
status = ((KuduException) e).getStatus();
return new OperationResponse(0, null, 0, operation, new RowError(status, operation));
return e;
* A FlusherTask is created for each active buffer in mode
* {@link FlushMode#AUTO_FLUSH_BACKGROUND}.
private final class FlusherTask implements TimerTask {
public void run(final Timeout timeout) {
Buffer buffer = null;
synchronized (monitor) {
if (activeBuffer == null) {
if (activeBuffer.getFlusherTask() == this) {
buffer = retireActiveBufferUnlocked();
* The {@code Buffer} consists of a list of operations, an optional pointer to a flush task,
* and a flush notification.
* The {@link #flusherTask} is used in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} to point to
* the background flusher task assigned to the buffer when it becomes active and the first
* operation is applied to it. When the flusher task executes after the timeout, it checks
* that the currently active buffer's flusher task points to itself before executing the flush.
* This protects against the background task waking up after one or more manual flushes and
* attempting to flush the active buffer.
* The {@link #flushNotification} deferred is used when executing manual {@link #flush}es to
* ensure that non-active buffers are fully flushed. {@code flushNotification} is completed
* when this buffer is successfully flushed. When the buffer is promoted from inactive to active,
* the deferred is replaced with a new one to indicate that the buffer is not yet flushed.
* Buffer is externally synchronized. When the active buffer, {@link #monitor}
* synchronizes access to it.
private final class Buffer {
private final List<BufferedOperation> operations = new ArrayList<>();
private FlusherTask flusherTask = null;
private Deferred<Void> flushNotification = Deferred.fromResult(null);
private boolean flushNotificationFired = false;
public List<BufferedOperation> getOperations() {
return operations;
FlusherTask getFlusherTask() {
if (flusherTask == null) {
flusherTask = new FlusherTask();
return flusherTask;
* Returns a {@link Deferred} which will be completed when this buffer is flushed. If the buffer
* is inactive (its flush is complete and it has been enqueued into {@link #inactiveBuffers}),
* then the deferred will already be complete.
Deferred<Void> getFlushNotification() {
return flushNotification;
* Completes the buffer's flush notification. Should be called when
* the buffer has been successfully flushed.
boolean callbackFlushNotification() {
LOG.trace("buffer flush notification fired: {}", this);
if (injectLatencyBufferFlushCb) {
try {
} catch (InterruptedException e) {
synchronized (monitor) {
if (flushNotificationFired) {
// Do nothing: the callback has been called already.
return false;
flushNotificationFired = true;
return true;
* Resets the buffer's internal state. Should be called when the buffer is promoted from
* inactive to active.
void resetUnlocked() {
LOG.trace("buffer resetUnlocked: {}", this);
flushNotification = new Deferred<>();
flushNotificationFired = false;
flusherTask = null;
public String toString() {
return MoreObjects.toStringHelper(this)
.add("operations", operations.size())
.add("flusherTask", flusherTask)
.add("flushNotification", flushNotification)
* Container class holding all the state associated with a buffered operation.
private static final class BufferedOperation {
/** Holds either a {@link LocatedTablet} or the failure exception if the lookup failed. */
private Object tablet = null;
private final Deferred<Void> tabletLookup;
private final Operation operation;
public BufferedOperation(Deferred<LocatedTablet> tablet,
Operation operation) {
tabletLookup = AsyncUtil.addBoth(tablet, new Callback<Void, Object>() {
public Void call(final Object tablet) {
BufferedOperation.this.tablet = tablet;
return null;
this.operation = Preconditions.checkNotNull(operation);
* @return {@code true} if the tablet lookup failed.
public boolean tabletLookupFailed() {
return !(tablet instanceof LocatedTablet);
* @return the located tablet
* @throws ClassCastException if the tablet lookup failed,
* check with {@link #tabletLookupFailed} before calling
public LocatedTablet getTablet() {
return (LocatedTablet) tablet;
* @return the cause of the failed lookup
* @throws ClassCastException if the tablet lookup succeeded,
* check with {@link #tabletLookupFailed} before calling
public Exception getTabletLookupFailure() {
return (Exception) tablet;
public Deferred<Void> getTabletLookup() {
return tabletLookup;
public Operation getOperation() {
return operation;
public String toString() {
return MoreObjects.toStringHelper(this)
.add("tablet", tablet)
.add("operation", operation)
private static boolean injectLatencyBufferFlushCb = false;
* Inject latency into {@link Buffer#callbackFlushNotification}.
static void injectLatencyBufferFlushCb(boolean injectLatency) {
injectLatencyBufferFlushCb = injectLatency;
LOG.warn("latency injection for Buffer flush notification is {}",
injectLatency ? "enabled" : "disabled");