blob: f80f623d57521863f37260b2584f154c483bed47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.retry;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.AsyncReadWriteTable;
import org.apache.samza.table.remote.AsyncRemoteTable;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
import org.apache.samza.table.remote.TestRemoteTable;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestAsyncRetriableTable {
private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
@Test(expected = NullPointerException.class)
public void testNotNullTableId() {
new AsyncRetriableTable(null, mock(AsyncReadWriteTable.class),
mock(TableRetryPolicy.class), mock(TableRetryPolicy.class),
mock(ScheduledExecutorService.class),
mock(TableReadFunction.class), mock(TableWriteFunction.class));
}
@Test(expected = NullPointerException.class)
public void testNotNullTable() {
new AsyncRetriableTable("t1", null,
mock(TableRetryPolicy.class), mock(TableRetryPolicy.class),
mock(ScheduledExecutorService.class),
mock(TableReadFunction.class), mock(TableWriteFunction.class));
}
@Test(expected = NullPointerException.class)
public void testNotNullExecutorService() {
new AsyncRetriableTable("t1", mock(AsyncReadWriteTable.class),
mock(TableRetryPolicy.class), mock(TableRetryPolicy.class), null,
mock(TableReadFunction.class), mock(TableWriteFunction.class));
}
@Test(expected = IllegalArgumentException.class)
public void testNotAllRetryPolicyAreNull() {
new AsyncRetriableTable("t1", mock(AsyncReadWriteTable.class),
null, null,
mock(ScheduledExecutorService.class),
mock(TableReadFunction.class), mock(TableWriteFunction.class));
}
@Test
public void testGetDelegation() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(100));
TableReadFunction readFn = mock(TableReadFunction.class);
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any(), any());
Map<String, String> result = new HashMap<>();
result.put("foo", "bar");
doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any());
doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any(), any());
doReturn(CompletableFuture.completedFuture(5)).when(readFn).readAsync(anyInt(), any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
table.init(TestRemoteTable.getMockContext());
verify(readFn, times(0)).init(any(), any());
// GetAsync
verify(readFn, times(0)).getAsync(any());
verify(readFn, times(0)).getAsync(any(), any());
assertEquals("bar", table.getAsync("foo").join());
verify(readFn, times(1)).getAsync(any());
verify(readFn, times(0)).getAsync(any(), any());
assertEquals("bar", table.getAsync("foo", 1).join());
verify(readFn, times(1)).getAsync(any());
verify(readFn, times(1)).getAsync(any(), any());
// GetAllAsync
verify(readFn, times(0)).getAllAsync(any());
verify(readFn, times(0)).getAllAsync(any(), any());
assertEquals(result, table.getAllAsync(Arrays.asList("foo")).join());
verify(readFn, times(1)).getAllAsync(any());
verify(readFn, times(0)).getAllAsync(any(), any());
assertEquals(result, table.getAllAsync(Arrays.asList("foo"), Arrays.asList(1)).join());
verify(readFn, times(1)).getAllAsync(any());
verify(readFn, times(1)).getAllAsync(any(), any());
// ReadAsync
verify(readFn, times(0)).readAsync(anyInt(), any());
assertEquals(5, table.readAsync(1, 2).join());
verify(readFn, times(1)).readAsync(anyInt(), any());
table.close();
}
@Test
public void testGetWithoutRetry() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(100));
TableReadFunction readFn = mock(TableReadFunction.class);
doReturn(true).when(readFn).isRetriable(any());
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
Map<String, String> result = new HashMap<>();
result.put("foo", "bar");
doReturn(CompletableFuture.completedFuture(result)).when(readFn).getAllAsync(any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
int times = 0;
table.init(TestRemoteTable.getMockContext());
verify(readFn, times(0)).init(any(), any());
assertEquals("bar", table.getAsync("foo").join());
verify(readFn, times(1)).getAsync(any());
assertEquals(++times, table.readRetryMetrics.successCount.getCount());
assertEquals(result, table.getAllAsync(Arrays.asList("foo")).join());
verify(readFn, times(1)).getAllAsync(any());
assertEquals(++times, table.readRetryMetrics.successCount.getCount());
assertEquals(0, table.readRetryMetrics.retryCount.getCount());
assertEquals(0, table.readRetryMetrics.retryTimer.getSnapshot().getMax());
assertEquals(0, table.readRetryMetrics.permFailureCount.getCount());
assertNull(table.writeRetryMetrics);
table.close();
verify(readFn, times(1)).close();
}
@Test
public void testGetWithRetryDisabled() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(10));
policy.withStopAfterDelay(Duration.ofMillis(100));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
doReturn(false).when(readFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(readFn).getAsync(anyString());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
table.init(TestRemoteTable.getMockContext());
try {
table.getAsync("foo").join();
fail();
} catch (Throwable t) {
}
verify(readFn, times(1)).getAsync(any());
assertEquals(0, table.readRetryMetrics.retryCount.getCount());
assertEquals(0, table.readRetryMetrics.successCount.getCount());
assertEquals(0, table.readRetryMetrics.permFailureCount.getCount());
assertEquals(0, table.readRetryMetrics.retryTimer.getSnapshot().getMax());
}
@Test
public void testGetAllWithOneRetry() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(10));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
doReturn(true).when(readFn).isRetriable(any());
AtomicInteger times = new AtomicInteger();
Map<String, String> map = new HashMap<>();
map.put("foo1", "bar1");
map.put("foo2", "bar2");
doAnswer(invocation -> {
CompletableFuture<Map<String, String>> future = new CompletableFuture();
if (times.get() > 0) {
future.complete(map);
} else {
times.incrementAndGet();
future.completeExceptionally(new RuntimeException("test exception"));
}
return future;
}).when(readFn).getAllAsync(anyCollection());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
table.init(TestRemoteTable.getMockContext());
assertEquals(map, table.getAllAsync(Arrays.asList("foo1", "foo2")).join());
verify(readFn, times(2)).getAllAsync(any());
assertEquals(1, table.readRetryMetrics.retryCount.getCount());
assertEquals(0, table.readRetryMetrics.successCount.getCount());
assertEquals(0, table.readRetryMetrics.permFailureCount.getCount());
assertTrue(table.readRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testGetWithPermFailureOnTimeout() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(5));
policy.withStopAfterDelay(Duration.ofMillis(100));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
doReturn(true).when(readFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(readFn).getAsync(anyString());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
table.init(TestRemoteTable.getMockContext());
try {
table.getAsync("foo").join();
fail();
} catch (Throwable t) {
}
verify(readFn, atLeast(3)).getAsync(any());
assertTrue(table.readRetryMetrics.retryCount.getCount() >= 3);
assertEquals(0, table.readRetryMetrics.successCount.getCount());
assertEquals(1, table.readRetryMetrics.permFailureCount.getCount());
assertTrue(table.readRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testGetWithPermFailureOnMaxCount() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(5));
policy.withStopAfterAttempts(10);
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
doReturn(true).when(readFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(readFn).getAllAsync(any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, null);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, policy, null, schedExec, readFn, null);
table.init(TestRemoteTable.getMockContext());
try {
table.getAsync("foo").join();
fail();
} catch (Throwable t) {
}
verify(readFn, atLeast(11)).getAsync(any());
assertEquals(10, table.readRetryMetrics.retryCount.getCount());
assertEquals(0, table.readRetryMetrics.successCount.getCount());
assertEquals(1, table.readRetryMetrics.permFailureCount.getCount());
assertTrue(table.readRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testPutAndDeleteDelegation() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(100));
TableReadFunction readFn = mock(TableReadFunction.class);
TableWriteFunction writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).writeAsync(anyInt(), any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
// PutAsync
verify(writeFn, times(0)).putAsync(any(), any());
verify(writeFn, times(0)).putAsync(any(), any(), any());
table.putAsync(1, 2).join();
verify(writeFn, times(1)).putAsync(any(), any());
verify(writeFn, times(0)).putAsync(any(), any(), any());
table.putAsync(1, 2, 3).join();
verify(writeFn, times(1)).putAsync(any(), any());
verify(writeFn, times(1)).putAsync(any(), any(), any());
// PutAllAsync
verify(writeFn, times(0)).putAllAsync(anyCollection());
verify(writeFn, times(0)).putAllAsync(anyCollection(), any());
table.putAllAsync(Arrays.asList(1)).join();
verify(writeFn, times(1)).putAllAsync(anyCollection());
verify(writeFn, times(0)).putAllAsync(anyCollection(), any());
table.putAllAsync(Arrays.asList(1), Arrays.asList(1)).join();
verify(writeFn, times(1)).putAllAsync(anyCollection());
verify(writeFn, times(1)).putAllAsync(anyCollection(), any());
// DeleteAsync
verify(writeFn, times(0)).deleteAsync(any());
verify(writeFn, times(0)).deleteAsync(any(), any());
table.deleteAsync(1).join();
verify(writeFn, times(1)).deleteAsync(any());
verify(writeFn, times(0)).deleteAsync(any(), any());
table.deleteAsync(1, 2).join();
verify(writeFn, times(1)).deleteAsync(any());
verify(writeFn, times(1)).deleteAsync(any(), any());
// DeleteAllAsync
verify(writeFn, times(0)).deleteAllAsync(anyCollection());
verify(writeFn, times(0)).deleteAllAsync(anyCollection(), any());
table.deleteAllAsync(Arrays.asList(1)).join();
verify(writeFn, times(1)).deleteAllAsync(anyCollection());
verify(writeFn, times(0)).deleteAllAsync(anyCollection(), any());
table.deleteAllAsync(Arrays.asList(1), Arrays.asList(2)).join();
verify(writeFn, times(1)).deleteAllAsync(anyCollection());
verify(writeFn, times(1)).deleteAllAsync(anyCollection(), any());
// WriteAsync
verify(writeFn, times(0)).writeAsync(anyInt(), any());
table.writeAsync(1, 2).join();
verify(writeFn, times(1)).writeAsync(anyInt(), any());
}
@Test
public void testPutWithoutRetry() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(100));
TableReadFunction readFn = mock(TableReadFunction.class);
TableWriteFunction writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAllAsync(any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
int times = 0;
table.init(TestRemoteTable.getMockContext());
verify(readFn, times(0)).init(any(), any());
verify(writeFn, times(0)).init(any(), any());
table.putAsync("foo", "bar").join();
verify(writeFn, times(1)).putAsync(any(), any());
assertEquals(++times, table.writeRetryMetrics.successCount.getCount());
table.putAllAsync(Arrays.asList(new Entry("1", "2"))).join();
verify(writeFn, times(1)).putAllAsync(any());
assertEquals(++times, table.writeRetryMetrics.successCount.getCount());
table.deleteAsync("1").join();
verify(writeFn, times(1)).deleteAsync(any());
assertEquals(++times, table.writeRetryMetrics.successCount.getCount());
table.deleteAllAsync(Arrays.asList("1", "2")).join();
verify(writeFn, times(1)).deleteAllAsync(any());
assertEquals(++times, table.writeRetryMetrics.successCount.getCount());
assertEquals(0, table.writeRetryMetrics.retryCount.getCount());
assertEquals(0, table.writeRetryMetrics.retryTimer.getSnapshot().getMax());
assertEquals(0, table.writeRetryMetrics.permFailureCount.getCount());
assertNull(table.readRetryMetrics);
}
@Test
public void testPutWithRetryDisabled() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(10));
policy.withStopAfterDelay(Duration.ofMillis(100));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
doReturn(false).when(writeFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(writeFn).putAsync(any(), any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
table.init(TestRemoteTable.getMockContext());
try {
table.putAsync("foo", "bar").join();
fail();
} catch (Throwable t) {
}
verify(writeFn, times(1)).putAsync(any(), any());
assertEquals(0, table.writeRetryMetrics.retryCount.getCount());
assertEquals(0, table.writeRetryMetrics.successCount.getCount());
assertEquals(0, table.writeRetryMetrics.permFailureCount.getCount());
assertEquals(0, table.writeRetryMetrics.retryTimer.getSnapshot().getMax());
}
@Test
public void testPutAllWithOneRetry() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(10));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
AtomicInteger times = new AtomicInteger();
doAnswer(invocation -> {
CompletableFuture<Map<String, String>> future = new CompletableFuture();
if (times.get() > 0) {
future.complete(null);
} else {
times.incrementAndGet();
future.completeExceptionally(new RuntimeException("test exception"));
}
return future;
}).when(writeFn).putAllAsync(any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
table.init(TestRemoteTable.getMockContext());
table.putAllAsync(Arrays.asList(new Entry(1, 2))).join();
verify(writeFn, times(2)).putAllAsync(any());
assertEquals(1, table.writeRetryMetrics.retryCount.getCount());
assertEquals(0, table.writeRetryMetrics.successCount.getCount());
assertEquals(0, table.writeRetryMetrics.permFailureCount.getCount());
assertTrue(table.writeRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testPutWithPermFailureOnTimeout() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(5));
policy.withStopAfterDelay(Duration.ofMillis(100));
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(readFn).getAsync(anyString());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
table.init(TestRemoteTable.getMockContext());
try {
table.putAsync("foo", "bar").join();
fail();
} catch (Throwable t) {
}
verify(writeFn, atLeast(3)).putAsync(any(), any());
assertTrue(table.writeRetryMetrics.retryCount.getCount() >= 3);
assertEquals(0, table.writeRetryMetrics.successCount.getCount());
assertEquals(1, table.writeRetryMetrics.permFailureCount.getCount());
assertTrue(table.writeRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testPutWithPermFailureOnMaxCount() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(5));
policy.withStopAfterAttempts(10);
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
doReturn(true).when(writeFn).isRetriable(any());
CompletableFuture<String> future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("test exception"));
doReturn(future).when(writeFn).putAllAsync(any());
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
table.init(TestRemoteTable.getMockContext());
try {
table.putAllAsync(Arrays.asList(new Entry(1, 2))).join();
fail();
} catch (Throwable t) {
}
verify(writeFn, atLeast(11)).putAllAsync(any());
assertEquals(10, table.writeRetryMetrics.retryCount.getCount());
assertEquals(0, table.writeRetryMetrics.successCount.getCount());
assertEquals(1, table.writeRetryMetrics.permFailureCount.getCount());
assertTrue(table.writeRetryMetrics.retryTimer.getSnapshot().getMax() > 0);
}
@Test
public void testFlushAndClose() {
TableRetryPolicy policy = new TableRetryPolicy();
policy.withFixedBackoff(Duration.ofMillis(100));
TableReadFunction readFn = mock(TableReadFunction.class);
TableWriteFunction writeFn = mock(TableWriteFunction.class);
AsyncReadWriteTable delegate = new AsyncRemoteTable(readFn, writeFn);
AsyncRetriableTable table = new AsyncRetriableTable("t1", delegate, null, policy, schedExec, readFn, writeFn);
table.flush();
verify(writeFn, times(1)).flush();
table.close();
verify(readFn, times(1)).close();
verify(writeFn, times(1)).close();
}
}