IGNITE-15218 Add async methods to IgniteTables (#253)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index cf1342e..8ee0e7e 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-/**
- * Interface for manage tables.
- */
 package org.apache.ignite.table.manager;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.table.Table;
@@ -40,6 +38,16 @@
     Table createTable(String name, Consumer<TableChange> tableInitChange);
 
     /**
+     * Creates a new table with the given {@code name}.
+     * If a table with the same name already exists, an exception will be thrown.
+     *
+     * @param name Table name.
+     * @param tableInitChange Table changer.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange);
+
+    /**
      * Alter a cluster table.
      *
      * @param name Table name.
@@ -48,6 +56,15 @@
     void alterTable(String name, Consumer<TableChange> tableChange);
 
     /**
+     * Alter a cluster table.
+     *
+     * @param name Table name.
+     * @param tableChange Table changer.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange);
+
+    /**
      * Creates a new table with the given {@code name} or returns an existing one with the same {@code name}.
      *
      * @param name Table name.
@@ -57,6 +74,15 @@
     Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange);
 
     /**
+     * Creates a new table with the given {@code name} or returns an existing one with the same {@code name}.
+     *
+     * @param name Table name.
+     * @param tableInitChange Table changer.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Table> getOrCreateTableAsync(String name, Consumer<TableChange> tableInitChange);
+
+    /**
      * Drops a table with the name specified.
      * If a table with the specified name does not exist in the cluster, the operation has no effect.
      *
@@ -65,6 +91,15 @@
     void dropTable(String name);
 
     /**
+     * Drops a table with the name specified.
+     * If a table with the specified name does not exist in the cluster, the operation has no effect.
+     *
+     * @param name Table name.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Void> dropTableAsync(String name);
+
+    /**
      * Gets a list of all started tables.
      *
      * @return List of tables.
@@ -72,10 +107,25 @@
     List<Table> tables();
 
     /**
+     * Gets a list of all started tables.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<List<Table>> tablesAsync();
+
+    /**
      * Gets a table by name, if it was created before.
      *
      * @param name Name of the table.
      * @return Tables with corresponding name or {@code null} if table isn't created.
      */
     Table table(String name);
+
+    /**
+     * Gets a table by name, if it was created before.
+     *
+     * @param name Name of the table.
+     * @return Future representing pending completion of the operation.
+     */
+    CompletableFuture<Table> tableAsync(String name);
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
index 32b580e..5304ae3 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
@@ -34,14 +34,12 @@
      * @return Future.
      * @throws IOException On serialization error.
      */
-    public static CompletableFuture<Object> process(
+    public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             IgniteTables tables
     ) throws IOException {
         var tableName = in.unpackString();
 
-        tables.dropTable(tableName);
-
-        return null;
+        return tables.dropTableAsync(tableName);
     }
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
index 5e48025..da39738 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
@@ -19,10 +19,10 @@
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import org.apache.ignite.client.proto.ClientMessagePacker;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.IgniteTables;
 
 /**
@@ -38,19 +38,22 @@
      * @return Future.
      * @throws IOException On serialization error.
      */
-    public static CompletableFuture<Object> process(
+    public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
             IgniteTables tables
     ) throws IOException {
         String tableName = in.unpackString();
-        Table table = tables.table(tableName);
 
-        if (table == null)
-            out.packNil();
-        else
-            out.packUuid(((TableImpl) table).tableId());
-
-        return null;
+        return tables.tableAsync(tableName).thenAccept(table -> {
+            try {
+                if (table == null)
+                    out.packNil();
+                else
+                    out.packUuid(((TableImpl) table).tableId());
+            } catch (IOException e) {
+                throw new CompletionException(e);
+            }
+        });
     }
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
index a05dcd4..a7000b0 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import org.apache.ignite.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.table.manager.IgniteTables;
@@ -33,23 +34,24 @@
      * @param out Packer.
      * @param igniteTables Ignite tables.
      * @return Future.
-     * @throws IOException On serialization error.
      */
-    public static CompletableFuture<Object> process(
+    public static CompletableFuture<Void> process(
             ClientMessagePacker out,
             IgniteTables igniteTables
-    ) throws IOException {
-        var tables = igniteTables.tables();
+    ) {
+        return igniteTables.tablesAsync().thenAccept(tables -> {
+            try {
+                out.packMapHeader(tables.size());
 
-        out.packMapHeader(tables.size());
+                for (var table : tables) {
+                    var tableImpl = (TableImpl) table;
 
-        for (var table : tables) {
-            var tableImpl = (TableImpl) table;
-
-            out.packUuid(tableImpl.tableId());
-            out.packString(table.tableName());
-        }
-
-        return null;
+                    out.packUuid(tableImpl.tableId());
+                    out.packString(table.tableName());
+                }
+            } catch (IOException e) {
+                throw new CompletionException(e);
+            }
+        });
     }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
index e001663..79965b0 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -21,7 +21,6 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
-
 import org.apache.ignite.client.proto.ClientOp;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.internal.client.ReliableChannel;
@@ -49,17 +48,28 @@
         return createTableAsync(name, tableInitChange).join();
     }
 
-    public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void alterTable(String name, Consumer<TableChange> tableChange) {
+        alterTableAsync(name, tableChange).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
+        return getOrCreateTableAsync(name, tableInitChange).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> getOrCreateTableAsync(String name, Consumer<TableChange> tableInitChange) {
         throw new UnsupportedOperationException();
     }
 
@@ -68,7 +78,8 @@
         dropTableAsync(name).join();
     }
 
-    public CompletableFuture<Void> dropTableAsync(String name) {
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> dropTableAsync(String name) {
         return ch.requestAsync(ClientOp.TABLE_DROP, w -> w.out().packString(name));
     }
 
@@ -77,7 +88,8 @@
         return tablesAsync().join();
     }
 
-    public CompletableFuture<List<Table>> tablesAsync() {
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<List<Table>> tablesAsync() {
         return ch.serviceAsync(ClientOp.TABLES_GET, r -> {
             var in = r.in();
             var cnt = in.unpackMapHeader();
@@ -95,7 +107,8 @@
         return tableAsync(name).join();
     }
 
-    public CompletableFuture<Table> tableAsync(String name) {
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> tableAsync(String name) {
         return ch.serviceAsync(ClientOp.TABLE_GET, w -> w.out().packString(name),
                 r -> new ClientTable(ch, r.in().unpackUuid(), name));
     }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index b506eda..68ce1cb 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -20,9 +20,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
-
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -63,8 +63,17 @@
     }
 
     /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
+        return CompletableFuture.completedFuture(createTable(name, tableInitChange));
+    }
+
+    /** {@inheritDoc} */
     @Override public void alterTable(String name, Consumer<TableChange> tableChange) {
-        throw new IgniteException("Not supported");
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -82,6 +91,11 @@
     }
 
     /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> getOrCreateTableAsync(String name, Consumer<TableChange> tableInitChange) {
+        return CompletableFuture.completedFuture(getOrCreateTable(name, tableInitChange));
+    }
+
+    /** {@inheritDoc} */
     @Override public void dropTable(String name) {
         var table = tables.remove(name);
 
@@ -90,15 +104,32 @@
     }
 
     /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> dropTableAsync(String name) {
+        dropTable(name);
+
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /** {@inheritDoc} */
     @Override public List<Table> tables() {
         return new ArrayList<>(tables.values());
     }
 
     /** {@inheritDoc} */
+    @Override public CompletableFuture<List<Table>> tablesAsync() {
+        return CompletableFuture.completedFuture(tables());
+    }
+
+    /** {@inheritDoc} */
     @Override public Table table(String name) {
         return tables.get(name);
     }
 
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> tableAsync(String name) {
+        return CompletableFuture.completedFuture(table(name));
+    }
+
     @NotNull private TableImpl getNewTable(String name) {
         UUID tableId = UUID.randomUUID();
         return new TableImpl(new FakeInternalTable(name, tableId), getSchemaReg(tableId), null, null);
@@ -129,5 +160,4 @@
     @Override public TableImpl table(UUID id) {
         return tablesById.get(id);
     }
-
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91d3004..46c7584 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
@@ -488,12 +489,22 @@
 
     /** {@inheritDoc} */
     @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
-        return createTable(name, tableInitChange, true);
+        return createTableAsync(name, tableInitChange).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
+        return createTableAsync(name, tableInitChange, true);
     }
 
     /** {@inheritDoc} */
     @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
-        return createTable(name, tableInitChange, false);
+        return getOrCreateTableAsync(name, tableInitChange).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> getOrCreateTableAsync(String name, Consumer<TableChange> tableInitChange) {
+        return createTableAsync(name, tableInitChange, false);
     }
 
     /**
@@ -505,7 +516,7 @@
      * {@code false} means the existing table will be returned.
      * @return A table instance.
      */
-    public Table createTable(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
+    private CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
 
         EventListener<TableEventParameters> clo = new EventListener<>() {
@@ -530,37 +541,39 @@
 
         listen(TableEvent.CREATE, clo);
 
-        Table tbl = table(name, true);
+        tableAsync(name, true).thenAccept(tbl -> {
+            if (tbl != null) {
+                if (exceptionWhenExist) {
+                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
+                            LoggerMessageHelper.format("Table already exists [name={}]", name)));
+                } else if (tblFut.complete(tbl))
+                    removeListener(TableEvent.CREATE, clo);
+            } else {
+                try {
+                    configurationMgr
+                            .configurationRegistry()
+                            .getConfiguration(TablesConfiguration.KEY)
+                            .tables()
+                            .change(change -> change.create(name, tableInitChange))
+                            .get();
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Table wasn't created [name=" + name + ']', e);
 
-        if (tbl != null) {
-            if (exceptionWhenExist) {
-                removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
-                    LoggerMessageHelper.format("Table already exists [name={}]", name)));
+                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
+                }
             }
-            else if (tblFut.complete(tbl))
-                removeListener(TableEvent.CREATE, clo);
-        }
-        else {
-            try {
-                configurationMgr
-                    .configurationRegistry()
-                    .getConfiguration(TablesConfiguration.KEY)
-                    .tables()
-                    .change(change -> change.create(name, tableInitChange))
-                    .get();
-            }
-            catch (InterruptedException | ExecutionException e) {
-                LOG.error("Table wasn't created [name=" + name + ']', e);
+        });
 
-                removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
-            }
-        }
-
-        return tblFut.join();
+        return tblFut;
     }
 
     /** {@inheritDoc} */
     @Override public void alterTable(String name, Consumer<TableChange> tableChange) {
+        alterTableAsync(name, tableChange).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
         CompletableFuture<Void> tblFut = new CompletableFuture<>();
 
         listen(TableEvent.ALTER, new EventListener<>() {
@@ -585,8 +598,8 @@
 
         try {
             configurationMgr.configurationRegistry()
-                .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
-                change.createOrUpdate(name, tableChange)).get();
+                    .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
+                    change.createOrUpdate(name, tableChange)).get();
         }
         catch (InterruptedException | ExecutionException e) {
             LOG.error("Table wasn't created [name=" + name + ']', e);
@@ -594,11 +607,16 @@
             tblFut.completeExceptionally(e);
         }
 
-        tblFut.join();
+        return tblFut;
     }
 
     /** {@inheritDoc} */
     @Override public void dropTable(String name) {
+        dropTableAsync(name).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> dropTableAsync(String name) {
         CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
 
         EventListener<TableEventParameters> clo = new EventListener<>() {
@@ -637,11 +655,11 @@
         else {
             try {
                 configurationMgr
-                    .configurationRegistry()
-                    .getConfiguration(TablesConfiguration.KEY)
-                    .tables()
-                    .change(change -> change.delete(name))
-                    .get();
+                        .configurationRegistry()
+                        .getConfiguration(TablesConfiguration.KEY)
+                        .tables()
+                        .change(change -> change.delete(name))
+                        .get();
             }
             catch (InterruptedException | ExecutionException e) {
                 LOG.error("Table wasn't dropped [name=" + name + ']', e);
@@ -650,21 +668,39 @@
             }
         }
 
-        dropTblFut.join();
+        return dropTblFut;
     }
 
     /** {@inheritDoc} */
     @Override public List<Table> tables() {
-        ArrayList<Table> tables = new ArrayList<>();
+        return tablesAsync().join();
+    }
 
-        for (String tblName : tableNamesConfigured()) {
-            Table tbl = table(tblName, false);
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<List<Table>> tablesAsync() {
+        var tableNames = tableNamesConfigured();
+        var tableFuts = new CompletableFuture[tableNames.size()];
+        var i = 0;
 
-            if (tbl != null)
-                tables.add(tbl);
-        }
+        for (String tblName : tableNames)
+            tableFuts[i++] = tableAsync(tblName, false);
 
-        return tables;
+        return CompletableFuture.allOf(tableFuts).thenApply(unused -> {
+            var tables = new ArrayList<Table>(tableNames.size());
+
+            try {
+                for (var fut : tableFuts) {
+                    var table = fut.get();
+
+                    if (table != null)
+                        tables.add((Table) table);
+                }
+            } catch (Throwable t) {
+                throw new CompletionException(t);
+            }
+
+            return tables;
+        });
     }
 
     /**
@@ -703,7 +739,12 @@
 
     /** {@inheritDoc} */
     @Override public Table table(String name) {
-        return table(name, true);
+        return tableAsync(name).join();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Table> tableAsync(String name) {
+        return tableAsync(name, true);
     }
 
     /**
@@ -756,14 +797,14 @@
      * false otherwise.
      * @return A table or {@code null} if table does not exist.
      */
-    private Table table(String name, boolean checkConfiguration) {
+    private CompletableFuture<Table> tableAsync(String name, boolean checkConfiguration) {
         if (checkConfiguration && !isTableConfigured(name))
-            return null;
+            return CompletableFuture.completedFuture(null);
 
         Table tbl = tables.get(name);
 
         if (tbl != null)
-            return tbl;
+            return CompletableFuture.completedFuture(tbl);
 
         CompletableFuture<Table> getTblFut = new CompletableFuture<>();
 
@@ -795,7 +836,7 @@
             !isTableConfigured(name) && getTblFut.complete(null))
             removeListener(TableEvent.CREATE, clo, null);
 
-        return getTblFut.join();
+        return getTblFut;
     }
 
     /**