blob: ed49cf85ab07d68ed2a63e90da31d68fa2cb90c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.remote;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.AsyncReadWriteTable;
import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
import org.apache.samza.table.retry.AsyncRetriableTable;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.testUtils.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestRemoteTable {
public static Context getMockContext() {
Context context = new MockContext();
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
return context;
}
private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, TableReadFunction<K, V> readFn,
TableWriteFunction<K, V> writeFn, boolean retry) {
return getTable(tableId, readFn, writeFn, null, retry);
}
private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId, TableReadFunction<K, V> readFn,
TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor, boolean retry) {
TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
TableRetryPolicy readPolicy = retry ? new TableRetryPolicy() : null;
TableRetryPolicy writePolicy = retry ? new TableRetryPolicy() : null;
ExecutorService rateLimitingExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
RemoteTable<K, V> table = new RemoteTable(tableId, readFn, writeFn,
readRateLimiter, writeRateLimiter, rateLimitingExecutor,
readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
table.init(getMockContext());
if (readFn != null) {
verify(readFn, times(1)).init(any(), any());
}
if (writeFn != null) {
verify(writeFn, times(1)).init(any(), any());
}
return (T) table;
}
private void doTestGet(boolean sync, boolean error, boolean retry) {
String tableId = "testGet-" + sync + error + retry;
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
CompletableFuture<String> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
if (!retry) {
doReturn(future).when(readFn).getAsync(anyString());
} else {
final int[] times = new int[] {0};
doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
.when(readFn).getAsync(anyString());
}
} else {
future = CompletableFuture.completedFuture("bar");
doReturn(future).when(readFn).getAsync(anyString());
}
if (retry) {
doReturn(true).when(readFn).isRetriable(any());
}
RemoteTable<String, String> table = getTable(tableId, readFn, null, retry);
Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").join());
verify(table.readRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString());
}
@Test(expected = IllegalArgumentException.class)
public void testFailOnNullReadFnAndWriteFn() {
getTable("id", null, null, false);
}
@Test
public void testSucceedValidationOnNullReadFn() {
RemoteTable<String, String> table = getTable("tableId", null, mock(TableWriteFunction.class), false);
Assert.assertNotNull(table);
}
@Test
public void testInit() {
String tableId = "testInit";
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable(tableId, readFn, writeFn, true);
// AsyncRetriableTable
AsyncReadWriteTable innerTable = TestUtils.getFieldValue(table, "asyncTable");
Assert.assertTrue(innerTable instanceof AsyncRetriableTable);
Assert.assertNotNull(TestUtils.getFieldValue(innerTable, "readRetryMetrics"));
Assert.assertNotNull(TestUtils.getFieldValue(innerTable, "writeRetryMetrics"));
// AsyncRateLimitedTable
innerTable = TestUtils.getFieldValue(innerTable, "table");
Assert.assertTrue(innerTable instanceof AsyncRateLimitedTable);
// AsyncRemoteTable
innerTable = TestUtils.getFieldValue(innerTable, "table");
Assert.assertTrue(innerTable instanceof AsyncRemoteTable);
// Verify table functions are initialized
verify(readFn, times(1)).init(any(), any());
verify(writeFn, times(1)).init(any(), any());
}
@Test
public void testGet() {
doTestGet(true, false, false);
}
@Test
public void testGetAsync() {
doTestGet(false, false, false);
}
@Test(expected = RuntimeException.class)
public void testGetAsyncError() {
doTestGet(false, true, false);
}
@Test
public void testGetAsyncErrorRetried() {
doTestGet(false, true, true);
}
@Test
public void testGetMultipleTables() {
TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null, false);
RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null, false);
CompletableFuture<String> future1 = table1.getAsync("foo1");
CompletableFuture<String> future2 = table2.getAsync("foo2");
CompletableFuture.allOf(future1, future2)
.thenAccept(u -> {
Assert.assertEquals(future1.join(), "bar1");
Assert.assertEquals(future2.join(), "bar1");
});
}
public void doTestRead(boolean sync, boolean error) {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
RemoteTable<String, String> table = getTable("testRead-" + sync + error,
readFn, mock(TableWriteFunction.class), false);
CompletableFuture<?> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(5);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(readFn).readAsync(anyInt(), any());
int readResult = sync
? table.read(1, 2)
: (Integer) table.readAsync(1, 2).join();
verify(readFn, times(1)).readAsync(anyInt(), any());
Assert.assertEquals(5, readResult);
verify(table.readRateLimiter, times(1)).throttle(anyInt(), any());
}
@Test
public void testRead() {
doTestRead(true, false);
}
@Test
public void testReadAsync() {
doTestRead(false, false);
}
@Test(expected = RuntimeException.class)
public void testReadAsyncError() {
doTestRead(false, true);
}
private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) {
String tableId = "testPut-" + sync + error + isDelete + retry;
TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
TableWriteFunction<String, String> writeFn = mockWriteFn;
CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
CompletableFuture<Void> failureFuture = new CompletableFuture();
failureFuture.completeExceptionally(new RuntimeException("Test exception"));
if (!error) {
if (isDelete) {
doReturn(successFuture).when(writeFn).deleteAsync(any());
} else {
doReturn(successFuture).when(writeFn).putAsync(any(), any());
}
} else if (!retry) {
if (isDelete) {
doReturn(failureFuture).when(writeFn).deleteAsync(any());
} else {
doReturn(failureFuture).when(writeFn).putAsync(any(), any());
}
} else {
doReturn(true).when(writeFn).isRetriable(any());
final int[] times = new int[] {0};
if (isDelete) {
doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
} else {
doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
}
}
RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn, retry);
if (sync) {
table.put("foo", isDelete ? null : "bar");
} else {
table.putAsync("foo", isDelete ? null : "bar").join();
}
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
if (isDelete) {
verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
} else {
verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
Assert.assertEquals("bar", valCaptor.getValue());
}
Assert.assertEquals("foo", keyCaptor.getValue());
if (isDelete) {
verify(table.writeRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString());
} else {
verify(table.writeRateLimiter, times(error && retry ? 2 : 1)).throttle(anyString(), anyString());
}
}
@Test
public void testPut() {
doTestPut(true, false, false, false);
}
@Test
public void testPutDelete() {
doTestPut(true, false, true, false);
}
@Test
public void testPutAsync() {
doTestPut(false, false, false, false);
}
@Test
public void testPutAsyncDelete() {
doTestPut(false, false, true, false);
}
@Test(expected = RuntimeException.class)
public void testPutAsyncError() {
doTestPut(false, true, false, false);
}
@Test
public void testPutAsyncErrorRetried() {
doTestPut(false, true, false, true);
}
private void doTestDelete(boolean sync, boolean error) {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
mock(TableReadFunction.class), writeFn, false);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).deleteAsync(any());
ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
if (sync) {
table.delete("foo");
} else {
table.deleteAsync("foo").join();
}
verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
Assert.assertEquals("foo", argCaptor.getValue());
verify(table.writeRateLimiter, times(1)).throttle(anyString());
}
@Test
public void testDelete() {
doTestDelete(true, false);
}
@Test
public void testDeleteAsync() {
doTestDelete(false, false);
}
@Test(expected = RuntimeException.class)
public void testDeleteAsyncError() {
doTestDelete(false, true);
}
private void doTestGetAll(boolean sync, boolean error, boolean partial) {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
Map<String, String> res = new HashMap<>();
res.put("foo1", "bar1");
if (!partial) {
res.put("foo2", "bar2");
}
CompletableFuture<Map<String, String>> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(res);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(readFn).getAllAsync(any());
RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null, false);
Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
: table.getAllAsync(Arrays.asList("foo1", "foo2")).join());
verify(table.readRateLimiter, times(1)).throttle(anyCollection());
}
@Test
public void testGetAll() {
doTestGetAll(true, false, false);
}
@Test
public void testGetAllAsync() {
doTestGetAll(false, false, false);
}
@Test(expected = RuntimeException.class)
public void testGetAllAsyncError() {
doTestGetAll(false, true, false);
}
// Partial result is an acceptable scenario
@Test
public void testGetAllPartialResult() {
doTestGetAll(false, false, true);
}
public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
mock(TableReadFunction.class), writeFn, false);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).putAllAsync(any());
if (hasDelete) {
doReturn(future).when(writeFn).deleteAllAsync(any());
}
List<Entry<String, String>> entries = Arrays.asList(
new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
if (sync) {
table.putAll(entries);
} else {
table.putAllAsync(entries).join();
}
verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
if (hasDelete) {
ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
Assert.assertEquals(1, argCaptor.getValue().size());
Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
} else {
Assert.assertEquals(entries, argCaptor.getValue());
}
verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
}
@Test
public void testPutAll() {
doTestPutAll(true, false, false);
}
@Test
public void testPutAllHasDelete() {
doTestPutAll(true, false, true);
}
@Test
public void testPutAllAsync() {
doTestPutAll(false, false, false);
}
@Test
public void testPutAllAsyncHasDelete() {
doTestPutAll(false, false, true);
}
@Test(expected = RuntimeException.class)
public void testPutAllAsyncError() {
doTestPutAll(false, true, false);
}
public void doTestDeleteAll(boolean sync, boolean error) {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
mock(TableReadFunction.class), writeFn, false);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).deleteAllAsync(any());
List<String> keys = Arrays.asList("foo1", "foo2");
ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
if (sync) {
table.deleteAll(keys);
} else {
table.deleteAllAsync(keys).join();
}
verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
Assert.assertEquals(keys, argCaptor.getValue());
verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
}
@Test
public void testDeleteAll() {
doTestDeleteAll(true, false);
}
@Test
public void testDeleteAllAsync() {
doTestDeleteAll(false, false);
}
@Test(expected = RuntimeException.class)
public void testDeleteAllAsyncError() {
doTestDeleteAll(false, true);
}
public void doTestWrite(boolean sync, boolean error) {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable("testWrite-" + sync + error,
mock(TableReadFunction.class), writeFn, false);
CompletableFuture<?> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(5);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).writeAsync(anyInt(), any());
int writeResult = sync
? table.write(1, 2)
: (Integer) table.writeAsync(1, 2).join();
verify(writeFn, times(1)).writeAsync(anyInt(), any());
Assert.assertEquals(5, writeResult);
verify(table.writeRateLimiter, times(1)).throttle(anyInt(), any());
}
@Test
public void testWrite() {
doTestWrite(true, false);
}
@Test
public void testWriteAsync() {
doTestWrite(false, false);
}
@Test(expected = RuntimeException.class)
public void testWriteAsyncError() {
doTestWrite(false, true);
}
@Test
public void testFlush() {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn, false);
table.flush();
verify(writeFn, times(1)).flush();
}
@Test
public void testGetWithCallbackExecutor() {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
Executors.newSingleThreadExecutor(), false);
Thread testThread = Thread.currentThread();
table.getAsync("foo").thenAccept(result -> {
Assert.assertEquals("bar", result);
// Must be executed on the executor thread
Assert.assertNotSame(testThread, Thread.currentThread());
});
}
@Test
public void testGetDelegation() {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any(), any());
Map<String, String> result = new HashMap<>();
result.put("foo", "bar");
doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any());
doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any(), any());
doReturn(CompletableFuture.completedFuture(5)).when(readFn).readAsync(anyInt(), any());
RemoteTable<String, String> table = getTable("testGetDelegation", readFn, null,
Executors.newSingleThreadExecutor(), true);
verify(readFn, times(1)).init(any(), any());
// GetAsync
verify(readFn, times(0)).getAsync(any());
verify(readFn, times(0)).getAsync(any(), any());
assertEquals("bar", table.getAsync("foo").join());
verify(readFn, times(1)).getAsync(any());
verify(readFn, times(0)).getAsync(any(), any());
assertEquals("bar", table.getAsync("foo", 1).join());
verify(readFn, times(1)).getAsync(any());
verify(readFn, times(1)).getAsync(any(), any());
// GetAllAsync
verify(readFn, times(0)).getAllAsync(any());
verify(readFn, times(0)).getAllAsync(any(), any());
assertEquals(result, table.getAllAsync(Arrays.asList("foo")).join());
verify(readFn, times(1)).getAllAsync(any());
verify(readFn, times(0)).getAllAsync(any(), any());
assertEquals(result, table.getAllAsync(Arrays.asList("foo"), Arrays.asList(1)).join());
verify(readFn, times(1)).getAllAsync(any());
verify(readFn, times(1)).getAllAsync(any(), any());
// ReadAsync
verify(readFn, times(0)).readAsync(anyInt(), any());
assertEquals(5, table.readAsync(1, 2).join());
verify(readFn, times(1)).readAsync(anyInt(), any());
table.close();
}
@Test
public void testPutAndDeleteDelegation() {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(anyCollection());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(anyCollection(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(anyCollection());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(anyCollection(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).writeAsync(anyInt(), any());
RemoteTable<String, String> table = getTable("testGetDelegation", readFn, writeFn,
Executors.newSingleThreadExecutor(), true);
verify(readFn, times(1)).init(any(), any());
// PutAsync
verify(writeFn, times(0)).putAsync(any(), any());
verify(writeFn, times(0)).putAsync(any(), any(), any());
table.putAsync("roo", "bar").join();
verify(writeFn, times(1)).putAsync(any(), any());
verify(writeFn, times(0)).putAsync(any(), any(), any());
table.putAsync("foo", "bar", 3).join();
verify(writeFn, times(1)).putAsync(any(), any());
verify(writeFn, times(1)).putAsync(any(), any(), any());
// PutAllAsync
verify(writeFn, times(0)).putAllAsync(anyCollection());
verify(writeFn, times(0)).putAllAsync(anyCollection(), any());
table.putAllAsync(Arrays.asList(new Entry("foo", "bar"))).join();
verify(writeFn, times(1)).putAllAsync(anyCollection());
verify(writeFn, times(0)).putAllAsync(anyCollection(), any());
table.putAllAsync(Arrays.asList(new Entry("foo", "bar")), 2).join();
verify(writeFn, times(1)).putAllAsync(anyCollection());
verify(writeFn, times(1)).putAllAsync(anyCollection(), any());
// DeleteAsync
verify(writeFn, times(0)).deleteAsync(any());
verify(writeFn, times(0)).deleteAsync(any(), any());
table.deleteAsync("foo").join();
verify(writeFn, times(1)).deleteAsync(any());
verify(writeFn, times(0)).deleteAsync(any(), any());
table.deleteAsync("foo", 2).join();
verify(writeFn, times(1)).deleteAsync(any());
verify(writeFn, times(1)).deleteAsync(any(), any());
// DeleteAllAsync
verify(writeFn, times(0)).deleteAllAsync(anyCollection());
verify(writeFn, times(0)).deleteAllAsync(anyCollection(), any());
table.deleteAllAsync(Arrays.asList("foo")).join();
verify(writeFn, times(1)).deleteAllAsync(anyCollection());
verify(writeFn, times(0)).deleteAllAsync(anyCollection(), any());
table.deleteAllAsync(Arrays.asList("foo"), Arrays.asList(2)).join();
verify(writeFn, times(1)).deleteAllAsync(anyCollection());
verify(writeFn, times(1)).deleteAllAsync(anyCollection(), any());
// WriteAsync
verify(writeFn, times(0)).writeAsync(anyInt(), any());
table.writeAsync(1, 2).join();
verify(writeFn, times(1)).writeAsync(anyInt(), any());
}
}