blob: 3e844c32b869a4b999332b60a6be44c02c12d8b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.remote;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.task.TaskContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import junit.framework.Assert;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestRemoteTable {
private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
public static TaskContext getMockTaskContext() {
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
TaskContext taskContext = mock(TaskContext.class);
doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
return taskContext;
}
private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
return getTable(tableId, readFn, writeFn, null);
}
private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
RemoteReadableTable<K, V> table;
TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
doReturn(true).when(readRateLimiter).isRateLimited();
doReturn(true).when(writeRateLimiter).isRateLimited();
ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
if (writeFn == null) {
table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
} else {
table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
}
TaskContext taskContext = getMockTaskContext();
SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
table.init(containerContext, taskContext);
return (T) table;
}
private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
String tableId = "testGet-" + sync + error + retry;
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
CompletableFuture<String> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
if (!retry) {
doReturn(future).when(readFn).getAsync(anyString());
} else {
final int [] times = new int[] {0};
doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
.when(readFn).getAsync(anyString());
}
} else {
future = CompletableFuture.completedFuture("bar");
doReturn(future).when(readFn).getAsync(anyString());
}
if (retry) {
doReturn(true).when(readFn).isRetriable(any());
TableRetryPolicy policy = new TableRetryPolicy();
readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
}
RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
verify(table.readRateLimiter, times(1)).throttle(anyString());
}
@Test
public void testGet() throws Exception {
doTestGet(true, false, false);
}
@Test
public void testGetAsync() throws Exception {
doTestGet(false, false, false);
}
@Test(expected = ExecutionException.class)
public void testGetAsyncError() throws Exception {
doTestGet(false, true, false);
}
@Test
public void testGetAsyncErrorRetried() throws Exception {
doTestGet(false, true, true);
}
@Test
public void testGetMultipleTables() {
TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
CompletableFuture<String> future1 = table1.getAsync("foo1");
CompletableFuture<String> future2 = table2.getAsync("foo2");
CompletableFuture.allOf(future1, future2)
.thenAccept(u -> {
Assert.assertEquals(future1.join(), "bar1");
Assert.assertEquals(future2.join(), "bar1");
});
}
private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
String tableId = "testPut-" + sync + error + isDelete + retry;
TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
TableWriteFunction<String, String> writeFn = mockWriteFn;
CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
CompletableFuture<Void> failureFuture = new CompletableFuture();
failureFuture.completeExceptionally(new RuntimeException("Test exception"));
if (!error) {
if (isDelete) {
doReturn(successFuture).when(writeFn).deleteAsync(any());
} else {
doReturn(successFuture).when(writeFn).putAsync(any(), any());
}
} else if (!retry) {
if (isDelete) {
doReturn(failureFuture).when(writeFn).deleteAsync(any());
} else {
doReturn(failureFuture).when(writeFn).putAsync(any(), any());
}
} else {
doReturn(true).when(writeFn).isRetriable(any());
final int [] times = new int[] {0};
if (isDelete) {
doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
} else {
doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
}
writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
}
RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
if (sync) {
table.put("foo", isDelete ? null : "bar");
} else {
table.putAsync("foo", isDelete ? null : "bar").get();
}
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
if (isDelete) {
verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
} else {
verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
Assert.assertEquals("bar", valCaptor.getValue());
}
Assert.assertEquals("foo", keyCaptor.getValue());
if (isDelete) {
verify(table.writeRateLimiter, times(1)).throttle(anyString());
} else {
verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
}
}
@Test
public void testPut() throws Exception {
doTestPut(true, false, false, false);
}
@Test
public void testPutDelete() throws Exception {
doTestPut(true, false, true, false);
}
@Test
public void testPutAsync() throws Exception {
doTestPut(false, false, false, false);
}
@Test
public void testPutAsyncDelete() throws Exception {
doTestPut(false, false, true, false);
}
@Test(expected = ExecutionException.class)
public void testPutAsyncError() throws Exception {
doTestPut(false, true, false, false);
}
@Test
public void testPutAsyncErrorRetried() throws Exception {
doTestPut(false, true, false, true);
}
private void doTestDelete(boolean sync, boolean error) throws Exception {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
mock(TableReadFunction.class), writeFn);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).deleteAsync(any());
ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
if (sync) {
table.delete("foo");
} else {
table.deleteAsync("foo").get();
}
verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
Assert.assertEquals("foo", argCaptor.getValue());
verify(table.writeRateLimiter, times(1)).throttle(anyString());
}
@Test
public void testDelete() throws Exception {
doTestDelete(true, false);
}
@Test
public void testDeleteAsync() throws Exception {
doTestDelete(false, false);
}
@Test(expected = ExecutionException.class)
public void testDeleteAsyncError() throws Exception {
doTestDelete(false, true);
}
private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
Map<String, String> res = new HashMap<>();
res.put("foo1", "bar1");
if (!partial) {
res.put("foo2", "bar2");
}
CompletableFuture<Map<String, String>> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(res);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(readFn).getAllAsync(any());
RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
: table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
verify(table.readRateLimiter, times(1)).throttle(anyCollection());
}
@Test
public void testGetAll() throws Exception {
doTestGetAll(true, false, false);
}
@Test
public void testGetAllAsync() throws Exception {
doTestGetAll(false, false, false);
}
@Test(expected = ExecutionException.class)
public void testGetAllAsyncError() throws Exception {
doTestGetAll(false, true, false);
}
// Partial result is an acceptable scenario
@Test
public void testGetAllPartialResult() throws Exception {
doTestGetAll(false, false, true);
}
public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
mock(TableReadFunction.class), writeFn);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).putAllAsync(any());
if (hasDelete) {
doReturn(future).when(writeFn).deleteAllAsync(any());
}
List<Entry<String, String>> entries = Arrays.asList(
new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
if (sync) {
table.putAll(entries);
} else {
table.putAllAsync(entries).get();
}
verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
if (hasDelete) {
ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
Assert.assertEquals(1, argCaptor.getValue().size());
Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
} else {
Assert.assertEquals(entries, argCaptor.getValue());
}
verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
}
@Test
public void testPutAll() throws Exception {
doTestPutAll(true, false, false);
}
@Test
public void testPutAllHasDelete() throws Exception {
doTestPutAll(true, false, true);
}
@Test
public void testPutAllAsync() throws Exception {
doTestPutAll(false, false, false);
}
@Test
public void testPutAllAsyncHasDelete() throws Exception {
doTestPutAll(false, false, true);
}
@Test(expected = ExecutionException.class)
public void testPutAllAsyncError() throws Exception {
doTestPutAll(false, true, false);
}
public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
mock(TableReadFunction.class), writeFn);
CompletableFuture<Void> future;
if (error) {
future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Test exception"));
} else {
future = CompletableFuture.completedFuture(null);
}
// Sync is backed by async so needs to mock the async method
doReturn(future).when(writeFn).deleteAllAsync(any());
List<String> keys = Arrays.asList("foo1", "foo2");
ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
if (sync) {
table.deleteAll(keys);
} else {
table.deleteAllAsync(keys).get();
}
verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
Assert.assertEquals(keys, argCaptor.getValue());
verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
}
@Test
public void testDeleteAll() throws Exception {
doTestDeleteAll(true, false);
}
@Test
public void testDeleteAllAsync() throws Exception {
doTestDeleteAll(false, false);
}
@Test(expected = ExecutionException.class)
public void testDeleteAllAsyncError() throws Exception {
doTestDeleteAll(false, true);
}
@Test
public void testFlush() {
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
table.flush();
verify(writeFn, times(1)).flush();
}
@Test
public void testGetWithCallbackExecutor() throws Exception {
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
// Sync is backed by async so needs to mock the async method
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
Executors.newSingleThreadExecutor());
Thread testThread = Thread.currentThread();
table.getAsync("foo").thenAccept(result -> {
Assert.assertEquals("bar", result);
// Must be executed on the executor thread
Assert.assertNotSame(testThread, Thread.currentThread());
});
}
}