Merge pull request #1099 from keith-turner/fluo-969 fixes #967

Add asynchronous get methods.
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 6f4d1ff..7c4a226 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -18,6 +18,7 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
@@ -42,7 +43,7 @@
   /**
    * Retrieves the value (in {@link Bytes}) stored at a given row and {@link Column}. Returns the
    * passed in defaultValue if does not exist.
-   * 
+   *
    * @param defaultValue this will be returned if row+columns does not exists
    */
   Bytes get(Bytes row, Column column, Bytes defaultValue);
@@ -82,14 +83,14 @@
   /**
    * This method is the starting point for constructing a scanner. Scanners can be constructed over
    * a {@link Span} and/or with a subset of columns. Below is simple example of building a scanner.
-   * 
+   *
    * <pre>
    * <code>
    *   Transaction tx = ...;
    *   Span span = Span.exact("row4");
    *   Column col1 = new Column("fam1","qual1");
    *   Column col2 = new Column("fam1","qual2");
-   * 
+   *
    *   //create a scanner over row4 fetching columns fam1:qual1 and fam1:qual2
    *   CellScanner cs = tx.scanner().over(span).fetch(col1,col2).build();
    *   for(RowColumnValue rcv : cs) {
@@ -166,4 +167,40 @@
    * @return transactions start timestamp allocated from Oracle.
    */
   long getStartTimestamp();
+
+  /**
+   * Asynchronous get, may retrieve the value in the background and return immediately.
+   *
+   * @since 2.0.0
+   */
+  default CompletableFuture<String> getsAsync(String row, Column column) {
+    return CompletableFuture.completedFuture(gets(row, column));
+  }
+
+  /**
+   * Asynchronous get, may retrieve the value in the background and return immediately.
+   *
+   * @since 2.0.0
+   */
+  default CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
+    return CompletableFuture.completedFuture(gets(row, column, defaultValue));
+  }
+
+  /**
+   * Asynchronous get, may retrieve the value in the background and return immediately.
+   *
+   * @since 2.0.0
+   */
+  default CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
+    return CompletableFuture.completedFuture(get(row, column));
+  }
+
+  /**
+   * Asynchronous get, may retrieve the value in the background and return immediately.
+   *
+   * @since 2.0.0
+   */
+  default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
+    return CompletableFuture.completedFuture(get(row, column, defaultValue));
+  }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java
new file mode 100644
index 0000000..c549c5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.Collections2;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.core.impl.TransactionImpl;
+
+public class AsyncReader {
+  private BlockingQueue<AsyncGet> asyncGetsQueue;
+  private ExecutorService executorService;
+  private TransactionImpl tx;
+
+  public AsyncReader(TransactionImpl tx) {
+    this.tx = tx;
+    asyncGetsQueue = new LinkedBlockingQueue<>();
+    executorService = Executors.newSingleThreadExecutor();
+  }
+
+  public CompletableFuture<Bytes> get(Bytes row, Column column) {
+    return get(row, column, null);
+  }
+
+  public CompletableFuture<Bytes> get(Bytes row, Column column, Bytes defaultValue) {
+    AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue);
+    asyncGetsQueue.add(curAsyncGet);
+
+    executorService.submit(() -> {
+      List<AsyncGet> getsList = new ArrayList<>();
+      asyncGetsQueue.drainTo(getsList);
+
+      try {
+        Collection<RowColumn> rowColumns = Collections2.transform(getsList, ag -> ag.rc);
+
+        Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);
+
+        for (AsyncGet asyncGet : getsList) {
+          Bytes result = getsMap.get(asyncGet.rc);
+          asyncGet.res.complete(result == null ? asyncGet.defaultValue : result);
+        }
+      } catch (Exception e) {
+        for (AsyncGet asyncGet : getsList) {
+          asyncGet.res.completeExceptionally(e);
+        }
+      }
+    });
+
+    return curAsyncGet.res;
+  }
+
+  public CompletableFuture<String> gets(String row, Column column) {
+    return gets(row, column, null);
+  }
+
+  public CompletableFuture<String> gets(String row, Column column, String defaultValue) {
+    Bytes defaultValueBytes = defaultValue == null ? new Bytes() : Bytes.of(defaultValue);
+    return get(Bytes.of(row), column, defaultValueBytes).thenApply(b -> b.toString());
+  }
+
+  public void close() {
+    executorService.shutdown();
+  }
+
+  static class AsyncGet {
+    RowColumn rc;
+    CompletableFuture<Bytes> res;
+    Bytes defaultValue;
+
+    public AsyncGet(Bytes row, Column column, Bytes defaultValue) {
+      rc = new RowColumn(row, column);
+      res = new CompletableFuture<>();
+      this.defaultValue = defaultValue;
+    }
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 8201a12..9597662 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -75,6 +75,7 @@
 import org.apache.fluo.core.async.SyncCommitObserver;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.exceptions.StaleScanException;
+import org.apache.fluo.core.impl.AsyncReader;
 import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
 import org.apache.fluo.core.oracle.Stamp;
 import org.apache.fluo.core.util.ByteUtil;
@@ -138,6 +139,8 @@
   private TransactorNode tnode = null;
   private TxStatus status = TxStatus.OPEN;
   private boolean commitAttempted = false;
+  private AsyncReader asyncReader = null;
+
 
   public TransactionImpl(Environment env, Notification trigger, long startTs) {
     Objects.requireNonNull(env, "environment cannot be null");
@@ -298,6 +301,33 @@
   }
 
   @Override
+  public CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
+    return getAsyncReader().get(row, column);
+  }
+
+  @Override
+  public CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
+    return getAsyncReader().get(row, column, defaultValue);
+  }
+
+  @Override
+  public CompletableFuture<String> getsAsync(String row, Column column) {
+    return getAsyncReader().gets(row, column);
+  }
+
+  @Override
+  public CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
+    return getAsyncReader().gets(row, column, defaultValue);
+  }
+
+  private AsyncReader getAsyncReader() {
+    if (asyncReader == null) {
+      asyncReader = new AsyncReader(this);
+    }
+    return asyncReader;
+  }
+
+  @Override
   public ScannerBuilder scanner() {
     checkIfOpen();
     return new ScannerBuilderImpl(this);
@@ -763,6 +793,10 @@
   }
 
   private synchronized void close(boolean checkForStaleScan) {
+    if (asyncReader != null) {
+      asyncReader.close();
+    }
+
     if (status != TxStatus.CLOSED) {
       status = TxStatus.CLOSED;
 
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java
new file mode 100644
index 0000000..ff1f387
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests TransactionImpl classes
+ */
+public class TransactionImplIT extends ITBaseImpl {
+
+  @Test
+  public void testgetsAsync() throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("row1", new Column("col1"), "val1");
+      tx.set("row2", new Column("col2"), "val2");
+      tx.set("row3", new Column("col3"), "val3");
+
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      CompletableFuture<String> res1 = tx.getsAsync("row1", new Column("col1"));
+      CompletableFuture<String> res2 = tx.getsAsync("row2", new Column("col2"), "foo");
+      CompletableFuture<String> res3 = tx.getsAsync("row3", new Column("col3"));
+      CompletableFuture<String> res4 = tx.getsAsync("row4", new Column("col4"), "val4");
+
+      Assert.assertEquals("val1", res1.get());
+      Assert.assertEquals("val2", res2.get());
+      Assert.assertEquals("val3", res3.get());
+      Assert.assertEquals("val4", res4.get());
+    }
+  }
+
+  @Test
+  public void testgetAsync() throws Exception {
+    Bytes row1 = Bytes.of("row1");
+    Bytes row2 = Bytes.of("row2");
+    Bytes row3 = Bytes.of("row3");
+    Bytes row4 = Bytes.of("row4");
+
+    Bytes val1 = Bytes.of("val1");
+    Bytes val2 = Bytes.of("val2");
+    Bytes val3 = Bytes.of("val3");
+    Bytes val4 = Bytes.of("val4");
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.set(row1, new Column("col1"), val1);
+      tx.set(row2, new Column("col2"), val2);
+      tx.set(row3, new Column("col3"), val3);
+
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      CompletableFuture<Bytes> res1 = tx.getAsync(row1, new Column("col1"));
+      CompletableFuture<Bytes> res2 = tx.getAsync(row2, new Column("col2"), Bytes.of("foo"));
+      CompletableFuture<Bytes> res3 = tx.getAsync(row3, new Column("col3"));
+      CompletableFuture<Bytes> res4 = tx.getAsync(row4, new Column("col4"), Bytes.of("val4"));
+
+      Assert.assertEquals(val1, res1.get());
+      Assert.assertEquals(val2, res2.get());
+      Assert.assertEquals(val3, res3.get());
+      Assert.assertEquals(val4, res4.get());
+    }
+  }
+
+
+}