Fluo #1112 - Change CompletableFuture<Void> to CompletableFuture<Loader> (#1113)
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index 606292f..a1d6055 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -48,11 +48,15 @@
/**
* Same as {@link #execute(Loader)} except it returns a future that completes upon successful
- * commit and if an exception is thrown in the loader, it will be relayed through the future.
+ * commit and if an exception is thrown in the loader, it will be relayed through the future. The
+ * result of the future is the Loader that was successfully executed. If storing any information
+ * in the loader object, keep in mind that loaders may execute multiple times in the case of
+ * commit collisions. If a loader executes multiple times, it may see different data on subsequent
+ * executions.
*
* @since 2.0.0
*/
- CompletableFuture<Void> submit(Loader loader);
+ <T extends Loader> CompletableFuture<T> submit(T loader);
/**
@@ -61,7 +65,7 @@
* @param identity see {@link #execute(String, Loader)} for a description of this parameter
* @since 2.0.0
*/
- CompletableFuture<Void> submit(String identity, Loader loader);
+ <T extends Loader> CompletableFuture<T> submit(String identity, T loader);
/**
* Waits for all queued and running Loader task to complete, then cleans up resources.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index 5c3a539..95c123b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -56,13 +56,13 @@
}
}
- class LoaderCommitObserver implements AsyncCommitObserver, Runnable {
+ class LoaderCommitObserver<T extends Loader> implements AsyncCommitObserver, Runnable {
AsyncTransaction txi;
- Loader loader;
+ T loader;
private AtomicBoolean done = new AtomicBoolean(false);
private String identity;
- private CompletableFuture<Void> future;
+ private CompletableFuture<T> future;
private void close() {
txi = null;
@@ -77,13 +77,13 @@
}
- public LoaderCommitObserver(String alias, Loader loader2) {
+ public LoaderCommitObserver(String alias, T loader2) {
this.identity = alias;
this.loader = loader2;
}
- public LoaderCommitObserver(String alias, Loader loader2, CompletableFuture<Void> future) {
+ public LoaderCommitObserver(String alias, T loader2, CompletableFuture<T> future) {
this(alias, loader2);
this.future = future;
}
@@ -93,7 +93,7 @@
public void committed() {
close();
if (future != null) {
- future.complete(null);
+ future.complete(loader);
}
}
@@ -233,13 +233,13 @@
}
@Override
- public CompletableFuture<Void> submit(Loader loader) {
+ public <T extends Loader> CompletableFuture<T> submit(T loader) {
return submit(loader.getClass().getSimpleName(), loader);
}
@Override
- public CompletableFuture<Void> submit(String alias, Loader loader) {
- CompletableFuture<Void> future = new CompletableFuture<Void>();
+ public <T extends Loader> CompletableFuture<T> submit(String alias, T loader) {
+ CompletableFuture<T> future = new CompletableFuture<T>();
try {
while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) {
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
index 6f141aa..73d68f5 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
@@ -47,6 +47,32 @@
}
+ public static class CounterLoader implements Loader {
+
+ private Long newValue;
+
+ @Override
+ public void load(TransactionBase tx, Context context) throws Exception {
+
+ // Increment a counter in Fluo
+ String row = "r";
+ Column column = new Column("f", "q");
+ String currentStringValue = tx.gets(row, column);
+ if (currentStringValue != null) {
+ Long currentValue = Long.parseLong(currentStringValue);
+ newValue = currentValue + 1;
+ } else {
+ newValue = 1L;
+ }
+
+ tx.set(row, column, newValue.toString());
+ }
+
+ public Long getNewValue() {
+ return newValue;
+ }
+ }
+
@Test
public void testLoaderFailure() {
LoaderExecutor le = client.newLoaderExecutor();
@@ -65,7 +91,7 @@
LoaderExecutor le = client.newLoaderExecutor();
- List<CompletableFuture<Void>> futures = new ArrayList<>();
+ List<CompletableFuture<Loader>> futures = new ArrayList<>();
futures.add(le.submit("test", (tx, ctx) -> {
tx.set("1234", new Column("last", "date"), "20060101");
@@ -113,4 +139,22 @@
// expected from future
}
}
+
+ @Test
+ public void testSubmitResult() throws Exception {
+
+ LoaderExecutor le = client.newLoaderExecutor();
+ // Each time the CounterLoader is executed, it increments a counter and makes that value
+ // available in the returned Future
+ CompletableFuture<CounterLoader> future = le.submit(new CounterLoader());
+ Assert.assertEquals(future.get().getNewValue().longValue(), 1);
+
+ // test with an alias parameter
+ future = le.submit("alias", new CounterLoader());
+ Assert.assertEquals(future.get().getNewValue().longValue(), 2);
+
+ // verify the Future can take on the Loader interface type
+ CompletableFuture<Loader> future2 = le.submit(new CounterLoader());
+ Assert.assertEquals(((CounterLoader) future2.get()).getNewValue().longValue(), 3);
+ }
}