Fluo #1112 - Change CompletableFuture<Void> to CompletableFuture<Loader> (#1113)

diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index 606292f..a1d6055 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -48,11 +48,15 @@
 
   /**
    * Same as {@link #execute(Loader)} except it returns a future that completes upon successful
-   * commit and if an exception is thrown in the loader, it will be relayed through the future.
+   * commit and if an exception is thrown in the loader, it will be relayed through the future. The
+   * result of the future is the Loader that was successfully executed. If storing any information
+   * in the loader object, keep in mind that loaders may execute multiple times in the case of
+   * commit collisions. If a loader executes multiple times, it may see different data on subsequent
+   * executions.
    *
    * @since 2.0.0
    */
-  CompletableFuture<Void> submit(Loader loader);
+  <T extends Loader> CompletableFuture<T> submit(T loader);
 
 
   /**
@@ -61,7 +65,7 @@
    * @param identity see {@link #execute(String, Loader)} for a description of this parameter
    * @since 2.0.0
    */
-  CompletableFuture<Void> submit(String identity, Loader loader);
+  <T extends Loader> CompletableFuture<T> submit(String identity, T loader);
 
   /**
    * Waits for all queued and running Loader task to complete, then cleans up resources.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index 5c3a539..95c123b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -56,13 +56,13 @@
     }
   }
 
-  class LoaderCommitObserver implements AsyncCommitObserver, Runnable {
+  class LoaderCommitObserver<T extends Loader> implements AsyncCommitObserver, Runnable {
 
     AsyncTransaction txi;
-    Loader loader;
+    T loader;
     private AtomicBoolean done = new AtomicBoolean(false);
     private String identity;
-    private CompletableFuture<Void> future;
+    private CompletableFuture<T> future;
 
     private void close() {
       txi = null;
@@ -77,13 +77,13 @@
     }
 
 
-    public LoaderCommitObserver(String alias, Loader loader2) {
+    public LoaderCommitObserver(String alias, T loader2) {
       this.identity = alias;
       this.loader = loader2;
     }
 
 
-    public LoaderCommitObserver(String alias, Loader loader2, CompletableFuture<Void> future) {
+    public LoaderCommitObserver(String alias, T loader2, CompletableFuture<T> future) {
       this(alias, loader2);
       this.future = future;
     }
@@ -93,7 +93,7 @@
     public void committed() {
       close();
       if (future != null) {
-        future.complete(null);
+        future.complete(loader);
       }
     }
 
@@ -233,13 +233,13 @@
   }
 
   @Override
-  public CompletableFuture<Void> submit(Loader loader) {
+  public <T extends Loader> CompletableFuture<T> submit(T loader) {
     return submit(loader.getClass().getSimpleName(), loader);
   }
 
   @Override
-  public CompletableFuture<Void> submit(String alias, Loader loader) {
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
+  public <T extends Loader> CompletableFuture<T> submit(String alias, T loader) {
+    CompletableFuture<T> future = new CompletableFuture<T>();
 
     try {
       while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) {
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
index 6f141aa..73d68f5 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
@@ -47,6 +47,32 @@
 
   }
 
+  public static class CounterLoader implements Loader {
+
+    private Long newValue;
+
+    @Override
+    public void load(TransactionBase tx, Context context) throws Exception {
+
+      // Increment a counter in Fluo
+      String row = "r";
+      Column column = new Column("f", "q");
+      String currentStringValue = tx.gets(row, column);
+      if (currentStringValue != null) {
+        Long currentValue = Long.parseLong(currentStringValue);
+        newValue = currentValue + 1;
+      } else {
+        newValue = 1L;
+      }
+
+      tx.set(row, column, newValue.toString());
+    }
+
+    public Long getNewValue() {
+      return newValue;
+    }
+  }
+
   @Test
   public void testLoaderFailure() {
     LoaderExecutor le = client.newLoaderExecutor();
@@ -65,7 +91,7 @@
 
     LoaderExecutor le = client.newLoaderExecutor();
 
-    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    List<CompletableFuture<Loader>> futures = new ArrayList<>();
 
     futures.add(le.submit("test", (tx, ctx) -> {
       tx.set("1234", new Column("last", "date"), "20060101");
@@ -113,4 +139,22 @@
       // expected from future
     }
   }
+
+  @Test
+  public void testSubmitResult() throws Exception {
+
+    LoaderExecutor le = client.newLoaderExecutor();
+    // Each time the CounterLoader is executed, it increments a counter and makes that value
+    // available in the returned Future
+    CompletableFuture<CounterLoader> future = le.submit(new CounterLoader());
+    Assert.assertEquals(future.get().getNewValue().longValue(), 1);
+
+    // test with an alias parameter
+    future = le.submit("alias", new CounterLoader());
+    Assert.assertEquals(future.get().getNewValue().longValue(), 2);
+
+    // verify the Future can take on the Loader interface type
+    CompletableFuture<Loader> future2 = le.submit(new CounterLoader());
+    Assert.assertEquals(((CounterLoader) future2.get()).getNewValue().longValue(), 3);
+  }
 }