Merge pull request #1099 from keith-turner/fluo-969 fixes #967
Add asynchronous get methods.
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 6f4d1ff..7c4a226 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
@@ -42,7 +43,7 @@
/**
* Retrieves the value (in {@link Bytes}) stored at a given row and {@link Column}. Returns the
* passed in defaultValue if does not exist.
- *
+ *
* @param defaultValue this will be returned if row+columns does not exists
*/
Bytes get(Bytes row, Column column, Bytes defaultValue);
@@ -82,14 +83,14 @@
/**
* This method is the starting point for constructing a scanner. Scanners can be constructed over
* a {@link Span} and/or with a subset of columns. Below is simple example of building a scanner.
- *
+ *
* <pre>
* <code>
* Transaction tx = ...;
* Span span = Span.exact("row4");
* Column col1 = new Column("fam1","qual1");
* Column col2 = new Column("fam1","qual2");
- *
+ *
* //create a scanner over row4 fetching columns fam1:qual1 and fam1:qual2
* CellScanner cs = tx.scanner().over(span).fetch(col1,col2).build();
* for(RowColumnValue rcv : cs) {
@@ -166,4 +167,40 @@
* @return transactions start timestamp allocated from Oracle.
*/
long getStartTimestamp();
+
+ /**
+ * Asynchronous get, may retrieve the value in the background and return immediately.
+ *
+ * @since 2.0.0
+ */
+ default CompletableFuture<String> getsAsync(String row, Column column) {
+ return CompletableFuture.completedFuture(gets(row, column));
+ }
+
+ /**
+ * Asynchronous get, may retrieve the value in the background and return immediately.
+ *
+ * @since 2.0.0
+ */
+ default CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
+ return CompletableFuture.completedFuture(gets(row, column, defaultValue));
+ }
+
+ /**
+ * Asynchronous get, may retrieve the value in the background and return immediately.
+ *
+ * @since 2.0.0
+ */
+ default CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
+ return CompletableFuture.completedFuture(get(row, column));
+ }
+
+ /**
+ * Asynchronous get, may retrieve the value in the background and return immediately.
+ *
+ * @since 2.0.0
+ */
+ default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
+ return CompletableFuture.completedFuture(get(row, column, defaultValue));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java
new file mode 100644
index 0000000..c549c5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.Collections2;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.core.impl.TransactionImpl;
+
+public class AsyncReader {
+ private BlockingQueue<AsyncGet> asyncGetsQueue;
+ private ExecutorService executorService;
+ private TransactionImpl tx;
+
+ public AsyncReader(TransactionImpl tx) {
+ this.tx = tx;
+ asyncGetsQueue = new LinkedBlockingQueue<>();
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ public CompletableFuture<Bytes> get(Bytes row, Column column) {
+ return get(row, column, null);
+ }
+
+ public CompletableFuture<Bytes> get(Bytes row, Column column, Bytes defaultValue) {
+ AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue);
+ asyncGetsQueue.add(curAsyncGet);
+
+ executorService.submit(() -> {
+ List<AsyncGet> getsList = new ArrayList<>();
+ asyncGetsQueue.drainTo(getsList);
+
+ try {
+ Collection<RowColumn> rowColumns = Collections2.transform(getsList, ag -> ag.rc);
+
+ Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);
+
+ for (AsyncGet asyncGet : getsList) {
+ Bytes result = getsMap.get(asyncGet.rc);
+ asyncGet.res.complete(result == null ? asyncGet.defaultValue : result);
+ }
+ } catch (Exception e) {
+ for (AsyncGet asyncGet : getsList) {
+ asyncGet.res.completeExceptionally(e);
+ }
+ }
+ });
+
+ return curAsyncGet.res;
+ }
+
+ public CompletableFuture<String> gets(String row, Column column) {
+ return gets(row, column, null);
+ }
+
+ public CompletableFuture<String> gets(String row, Column column, String defaultValue) {
+ Bytes defaultValueBytes = defaultValue == null ? new Bytes() : Bytes.of(defaultValue);
+ return get(Bytes.of(row), column, defaultValueBytes).thenApply(b -> b.toString());
+ }
+
+ public void close() {
+ executorService.shutdown();
+ }
+
+ static class AsyncGet {
+ RowColumn rc;
+ CompletableFuture<Bytes> res;
+ Bytes defaultValue;
+
+ public AsyncGet(Bytes row, Column column, Bytes defaultValue) {
+ rc = new RowColumn(row, column);
+ res = new CompletableFuture<>();
+ this.defaultValue = defaultValue;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 8201a12..9597662 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -75,6 +75,7 @@
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
+import org.apache.fluo.core.impl.AsyncReader;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ByteUtil;
@@ -138,6 +139,8 @@
private TransactorNode tnode = null;
private TxStatus status = TxStatus.OPEN;
private boolean commitAttempted = false;
+ private AsyncReader asyncReader = null;
+
public TransactionImpl(Environment env, Notification trigger, long startTs) {
Objects.requireNonNull(env, "environment cannot be null");
@@ -298,6 +301,33 @@
}
@Override
+ public CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
+ return getAsyncReader().get(row, column);
+ }
+
+ @Override
+ public CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
+ return getAsyncReader().get(row, column, defaultValue);
+ }
+
+ @Override
+ public CompletableFuture<String> getsAsync(String row, Column column) {
+ return getAsyncReader().gets(row, column);
+ }
+
+ @Override
+ public CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
+ return getAsyncReader().gets(row, column, defaultValue);
+ }
+
+ private AsyncReader getAsyncReader() {
+ if (asyncReader == null) {
+ asyncReader = new AsyncReader(this);
+ }
+ return asyncReader;
+ }
+
+ @Override
public ScannerBuilder scanner() {
checkIfOpen();
return new ScannerBuilderImpl(this);
@@ -763,6 +793,10 @@
}
private synchronized void close(boolean checkForStaleScan) {
+ if (asyncReader != null) {
+ asyncReader.close();
+ }
+
if (status != TxStatus.CLOSED) {
status = TxStatus.CLOSED;
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java
new file mode 100644
index 0000000..ff1f387
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests TransactionImpl classes
+ */
+public class TransactionImplIT extends ITBaseImpl {
+
+ @Test
+ public void testgetsAsync() throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("row1", new Column("col1"), "val1");
+ tx.set("row2", new Column("col2"), "val2");
+ tx.set("row3", new Column("col3"), "val3");
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ CompletableFuture<String> res1 = tx.getsAsync("row1", new Column("col1"));
+ CompletableFuture<String> res2 = tx.getsAsync("row2", new Column("col2"), "foo");
+ CompletableFuture<String> res3 = tx.getsAsync("row3", new Column("col3"));
+ CompletableFuture<String> res4 = tx.getsAsync("row4", new Column("col4"), "val4");
+
+ Assert.assertEquals("val1", res1.get());
+ Assert.assertEquals("val2", res2.get());
+ Assert.assertEquals("val3", res3.get());
+ Assert.assertEquals("val4", res4.get());
+ }
+ }
+
+ @Test
+ public void testgetAsync() throws Exception {
+ Bytes row1 = Bytes.of("row1");
+ Bytes row2 = Bytes.of("row2");
+ Bytes row3 = Bytes.of("row3");
+ Bytes row4 = Bytes.of("row4");
+
+ Bytes val1 = Bytes.of("val1");
+ Bytes val2 = Bytes.of("val2");
+ Bytes val3 = Bytes.of("val3");
+ Bytes val4 = Bytes.of("val4");
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.set(row1, new Column("col1"), val1);
+ tx.set(row2, new Column("col2"), val2);
+ tx.set(row3, new Column("col3"), val3);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ CompletableFuture<Bytes> res1 = tx.getAsync(row1, new Column("col1"));
+ CompletableFuture<Bytes> res2 = tx.getAsync(row2, new Column("col2"), Bytes.of("foo"));
+ CompletableFuture<Bytes> res3 = tx.getAsync(row3, new Column("col3"));
+ CompletableFuture<Bytes> res4 = tx.getAsync(row4, new Column("col4"), Bytes.of("val4"));
+
+ Assert.assertEquals(val1, res1.get());
+ Assert.assertEquals(val2, res2.get());
+ Assert.assertEquals(val3, res3.get());
+ Assert.assertEquals(val4, res4.get());
+ }
+ }
+
+
+}