| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.client.table; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.BiConsumer; |
| import java.util.function.BiFunction; |
| import java.util.function.Function; |
| |
| import org.apache.ignite.client.IgniteClientException; |
| import org.apache.ignite.internal.client.ReliableChannel; |
| import org.apache.ignite.internal.client.proto.ClientMessagePacker; |
| import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; |
| import org.apache.ignite.internal.client.proto.ClientOp; |
| import org.apache.ignite.internal.tostring.IgniteToStringBuilder; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.table.InvokeProcessor; |
| import org.apache.ignite.table.KeyValueBinaryView; |
| import org.apache.ignite.table.KeyValueView; |
| import org.apache.ignite.table.RecordView; |
| import org.apache.ignite.table.Table; |
| import org.apache.ignite.table.Tuple; |
| import org.apache.ignite.table.mapper.KeyMapper; |
| import org.apache.ignite.table.mapper.RecordMapper; |
| import org.apache.ignite.table.mapper.ValueMapper; |
| import org.apache.ignite.tx.Transaction; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.msgpack.core.MessageFormat; |
| |
| /** |
| * Client table API implementation. |
| */ |
| public class ClientTable implements Table { |
| /** */ |
| private final IgniteUuid id; |
| |
| /** */ |
| private final String name; |
| |
| /** */ |
| private final ReliableChannel ch; |
| |
| /** */ |
| private final ConcurrentHashMap<Integer, ClientSchema> schemas = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private volatile int latestSchemaVer = -1; |
| |
| /** */ |
| private final Object latestSchemaLock = new Object(); |
| |
| /** */ |
| private final KeyValueBinaryView kvView; |
| |
| /** |
| * Constructor. |
| * |
| * @param ch Channel. |
| * @param id Table id. |
| * @param name Table name. |
| */ |
| public ClientTable(ReliableChannel ch, IgniteUuid id, String name) { |
| assert ch != null; |
| assert id != null; |
| assert name != null && !name.isEmpty(); |
| |
| this.ch = ch; |
| this.id = id; |
| this.name = name; |
| this.kvView = new ClientKeyValueBinaryView(this); |
| } |
| |
| /** |
| * Gets the table id. |
| * |
| * @return Table id. |
| */ |
| public IgniteUuid tableId() { |
| return id; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull String tableName() { |
| return name; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public KeyValueBinaryView kvView() { |
| return kvView; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Table withTransaction(Transaction tx) { |
| // TODO: Transactions IGNITE-15240 |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Tuple get(@NotNull Tuple keyRec) { |
| return getAsync(keyRec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) { |
| Objects.requireNonNull(keyRec); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_GET, |
| (schema, out) -> writeTuple(keyRec, schema, out, true), |
| (inSchema, in) -> readValueTuple(inSchema, in, keyRec)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) { |
| return getAllAsync(keyRecs).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) { |
| Objects.requireNonNull(keyRecs); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_GET_ALL, |
| (s, w) -> writeTuples(keyRecs, s, w, true), |
| this::readTuples, |
| Collections.emptyList()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void upsert(@NotNull Tuple rec) { |
| upsertAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| // TODO IGNITE-15194: Convert Tuple to a schema-order Array as a first step. |
| // If it does not match the latest schema, then request latest and convert again. |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_UPSERT, |
| (s, w) -> writeTuple(rec, s, w), |
| r -> null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void upsertAll(@NotNull Collection<Tuple> recs) { |
| upsertAllAsync(recs).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) { |
| Objects.requireNonNull(recs); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_UPSERT_ALL, |
| (s, w) -> writeTuples(recs, s, w, false), |
| r -> null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Tuple getAndUpsert(@NotNull Tuple rec) { |
| return getAndUpsertAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_GET_AND_UPSERT, |
| (s, w) -> writeTuple(rec, s, w, false), |
| (schema, in) -> readValueTuple(schema, in, rec)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean insert(@NotNull Tuple rec) { |
| return insertAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_INSERT, |
| (s, w) -> writeTuple(rec, s, w, false), |
| ClientMessageUnpacker::unpackBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) { |
| return insertAllAsync(recs).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) { |
| Objects.requireNonNull(recs); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_INSERT_ALL, |
| (s, w) -> writeTuples(recs, s, w, false), |
| this::readTuples, |
| Collections.emptyList()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean replace(@NotNull Tuple rec) { |
| return replaceAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_REPLACE, |
| (s, w) -> writeTuple(rec, s, w, false), |
| ClientMessageUnpacker::unpackBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) { |
| return replaceAsync(oldRec, newRec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) { |
| Objects.requireNonNull(oldRec); |
| Objects.requireNonNull(newRec); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_REPLACE_EXACT, |
| (s, w) -> { |
| writeTuple(oldRec, s, w, false, false); |
| writeTuple(newRec, s, w, false, true); |
| }, |
| ClientMessageUnpacker::unpackBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Tuple getAndReplace(@NotNull Tuple rec) { |
| return getAndReplaceAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_GET_AND_REPLACE, |
| (s, w) -> writeTuple(rec, s, w, false), |
| (schema, in) -> readValueTuple(schema, in, rec)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean delete(@NotNull Tuple keyRec) { |
| return deleteAsync(keyRec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) { |
| Objects.requireNonNull(keyRec); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_DELETE, |
| (s, w) -> writeTuple(keyRec, s, w, true), |
| ClientMessageUnpacker::unpackBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean deleteExact(@NotNull Tuple rec) { |
| return deleteExactAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutOpAsync( |
| ClientOp.TUPLE_DELETE_EXACT, |
| (s, w) -> writeTuple(rec, s, w, false), |
| ClientMessageUnpacker::unpackBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Tuple getAndDelete(@NotNull Tuple rec) { |
| return getAndDeleteAsync(rec).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) { |
| Objects.requireNonNull(rec); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_GET_AND_DELETE, |
| (s, w) -> writeTuple(rec, s, w, false), |
| (schema, in) -> readValueTuple(schema, in, rec)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) { |
| return deleteAllAsync(recs).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) { |
| Objects.requireNonNull(recs); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_DELETE_ALL, |
| (s, w) -> writeTuples(recs, s, w, true), |
| (schema, in) -> readTuples(schema, in, true), |
| Collections.emptyList()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) { |
| return deleteAllExactAsync(recs).join(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@NotNull Collection<Tuple> recs) { |
| Objects.requireNonNull(recs); |
| |
| return doSchemaOutInOpAsync( |
| ClientOp.TUPLE_DELETE_ALL_EXACT, |
| (s, w) -> writeTuples(recs, s, w, false), |
| this::readTuples, |
| Collections.emptyList()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T extends Serializable> T invoke(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T extends Serializable> Map<Tuple, T> invokeAll(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable Transaction transaction() { |
| // TODO: Transactions IGNITE-15240 |
| throw new UnsupportedOperationException(); |
| } |
| |
| private CompletableFuture<ClientSchema> getLatestSchema() { |
| if (latestSchemaVer >= 0) |
| return CompletableFuture.completedFuture(schemas.get(latestSchemaVer)); |
| |
| return loadSchema(null); |
| } |
| |
| private CompletableFuture<ClientSchema> getSchema(int ver) { |
| var schema = schemas.get(ver); |
| |
| if (schema != null) |
| return CompletableFuture.completedFuture(schema); |
| |
| return loadSchema(ver); |
| } |
| |
| private CompletableFuture<ClientSchema> loadSchema(Integer ver) { |
| return ch.serviceAsync(ClientOp.SCHEMAS_GET, w -> { |
| w.out().packIgniteUuid(id); |
| |
| if (ver == null) |
| w.out().packNil(); |
| else { |
| w.out().packArrayHeader(1); |
| w.out().packInt(ver); |
| } |
| }, r -> { |
| int schemaCnt = r.in().unpackMapHeader(); |
| |
| if (schemaCnt == 0) |
| throw new IgniteClientException("Schema not found: " + ver); |
| |
| ClientSchema last = null; |
| |
| for (var i = 0; i < schemaCnt; i++) |
| last = readSchema(r.in()); |
| |
| return last; |
| }); |
| } |
| |
| private ClientSchema readSchema(ClientMessageUnpacker in) { |
| var schemaVer = in.unpackInt(); |
| var colCnt = in.unpackArrayHeader(); |
| |
| var columns = new ClientColumn[colCnt]; |
| |
| for (int i = 0; i < colCnt; i++) { |
| var propCnt = in.unpackArrayHeader(); |
| |
| assert propCnt >= 4; |
| |
| var name = in.unpackString(); |
| var type = in.unpackInt(); |
| var isKey = in.unpackBoolean(); |
| var isNullable = in.unpackBoolean(); |
| |
| // Skip unknown extra properties, if any. |
| in.skipValue(propCnt - 4); |
| |
| var column = new ClientColumn(name, type, isNullable, isKey, i); |
| columns[i] = column; |
| } |
| |
| var schema = new ClientSchema(schemaVer, columns); |
| |
| schemas.put(schemaVer, schema); |
| |
| synchronized (latestSchemaLock) { |
| if (schemaVer > latestSchemaVer) { |
| latestSchemaVer = schemaVer; |
| } |
| } |
| |
| return schema; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return IgniteToStringBuilder.toString(ClientTable.class, this); |
| } |
| |
| public void writeTuple( |
| @NotNull Tuple tuple, |
| ClientSchema schema, |
| ClientMessagePacker out |
| ) { |
| writeTuple(tuple, schema, out, false, false); |
| } |
| |
| public void writeTuple( |
| @NotNull Tuple tuple, |
| ClientSchema schema, |
| ClientMessagePacker out, |
| boolean keyOnly |
| ) { |
| writeTuple(tuple, schema, out, keyOnly, false); |
| } |
| |
| public void writeTuple( |
| @NotNull Tuple tuple, |
| ClientSchema schema, |
| ClientMessagePacker out, |
| boolean keyOnly, |
| boolean skipHeader |
| ) { |
| // TODO: Special case for ClientTupleBuilder - it has columns in order |
| var vals = new Object[keyOnly ? schema.keyColumnCount() : schema.columns().length]; |
| var tupleSize = tuple.columnCount(); |
| |
| for (var i = 0; i < tupleSize; i++) { |
| var colName = tuple.columnName(i); |
| var col = schema.column(colName); |
| |
| if (keyOnly && !col.key()) |
| continue; |
| |
| vals[col.schemaIndex()] = tuple.value(i); |
| } |
| |
| if (!skipHeader) { |
| out.packIgniteUuid(id); |
| out.packInt(schema.version()); |
| } |
| |
| for (var val : vals) |
| out.packObject(val); |
| } |
| |
| public void writeKvTuple( |
| @NotNull Tuple key, |
| @Nullable Tuple val, |
| ClientSchema schema, |
| ClientMessagePacker out, |
| boolean skipHeader |
| ) { |
| var vals = new Object[schema.columns().length]; |
| |
| for (var i = 0; i < key.columnCount(); i++) { |
| var colName = key.columnName(i); |
| var col = schema.column(colName); |
| |
| if (!col.key()) |
| continue; |
| |
| vals[col.schemaIndex()] = key.value(i); |
| } |
| |
| if (val != null) { |
| for (var i = 0; i < val.columnCount(); i++) { |
| var colName = val.columnName(i); |
| var col = schema.column(colName); |
| |
| if (col.key()) |
| continue; |
| |
| vals[col.schemaIndex()] = val.value(i); |
| } |
| } |
| |
| if (!skipHeader) { |
| out.packIgniteUuid(id); |
| out.packInt(schema.version()); |
| } |
| |
| for (var v : vals) |
| out.packObject(v); |
| } |
| |
| public void writeKvTuples(Map<Tuple, Tuple> pairs, ClientSchema schema, ClientMessagePacker out) { |
| out.packIgniteUuid(id); |
| out.packInt(schema.version()); |
| out.packInt(pairs.size()); |
| |
| for (Map.Entry<Tuple, Tuple> pair : pairs.entrySet()) |
| writeKvTuple(pair.getKey(), pair.getValue(), schema, out, true); |
| } |
| |
| public void writeTuples( |
| @NotNull Collection<Tuple> tuples, |
| ClientSchema schema, |
| ClientMessagePacker out, |
| boolean keyOnly |
| ) { |
| out.packIgniteUuid(id); |
| out.packInt(schema.version()); |
| out.packInt(tuples.size()); |
| |
| for (var tuple : tuples) |
| writeTuple(tuple, schema, out, keyOnly, true); |
| } |
| |
| public static Tuple readTuple(ClientSchema schema, ClientMessageUnpacker in) { |
| return readTuple(schema, in, false); |
| } |
| |
| private static Tuple readTuple(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) { |
| var tuple = new ClientTuple(schema); |
| |
| var colCnt = keyOnly ? schema.keyColumnCount() : schema.columns().length; |
| |
| for (var i = 0; i < colCnt; i++) |
| tuple.setInternal(i, in.unpackObject(schema.columns()[i].type())); |
| |
| return tuple; |
| } |
| |
| public static Tuple readValueTuple(ClientSchema schema, ClientMessageUnpacker in, Tuple keyTuple) { |
| var tuple = new ClientTuple(schema); |
| |
| for (var i = 0; i < schema.columns().length; i++) { |
| ClientColumn col = schema.columns()[i]; |
| |
| Object value = i < schema.keyColumnCount() |
| ? keyTuple.value(col.name()) |
| : in.unpackObject(schema.columns()[i].type()); |
| |
| tuple.setInternal(i, value); |
| } |
| |
| return tuple; |
| } |
| |
| public static Tuple readValueTuple(ClientSchema schema, ClientMessageUnpacker in) { |
| var keyColCnt = schema.keyColumnCount(); |
| var colCnt = schema.columns().length; |
| |
| var valTuple = new ClientTuple(schema, keyColCnt, schema.columns().length - 1); |
| |
| for (var i = keyColCnt; i < colCnt; i++) { |
| ClientColumn col = schema.columns()[i]; |
| Object val = in.unpackObject(col.type()); |
| |
| valTuple.setInternal(i - keyColCnt, val); |
| } |
| |
| return valTuple; |
| } |
| |
| public static IgniteBiTuple<Tuple, Tuple> readKvTuple(ClientSchema schema, ClientMessageUnpacker in) { |
| var keyColCnt = schema.keyColumnCount(); |
| var colCnt = schema.columns().length; |
| |
| var keyTuple = new ClientTuple(schema, 0, keyColCnt - 1); |
| var valTuple = new ClientTuple(schema, keyColCnt, schema.columns().length - 1); |
| |
| for (var i = 0; i < colCnt; i++) { |
| ClientColumn col = schema.columns()[i]; |
| Object val = in.unpackObject(col.type()); |
| |
| if (i < keyColCnt) |
| keyTuple.setInternal(i, val); |
| else |
| valTuple.setInternal(i - keyColCnt, val); |
| } |
| |
| return new IgniteBiTuple<>(keyTuple, valTuple); |
| } |
| |
| public Map<Tuple, Tuple> readKvTuples(ClientSchema schema, ClientMessageUnpacker in) { |
| var cnt = in.unpackInt(); |
| Map<Tuple, Tuple> res = new HashMap<>(cnt); |
| |
| for (int i = 0; i < cnt; i++) { |
| var pair = readKvTuple(schema, in); |
| res.put(pair.get1(), pair.get2()); |
| } |
| |
| return res; |
| } |
| |
| public Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in) { |
| return readTuples(schema, in, false); |
| } |
| |
| private Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) { |
| var cnt = in.unpackInt(); |
| var res = new ArrayList<Tuple>(cnt); |
| |
| for (int i = 0; i < cnt; i++) |
| res.add(readTuple(schema, in, keyOnly)); |
| |
| return res; |
| } |
| |
| public <T> CompletableFuture<T> doSchemaOutInOpAsync( |
| int opCode, |
| BiConsumer<ClientSchema, ClientMessagePacker> writer, |
| BiFunction<ClientSchema, ClientMessageUnpacker, T> reader |
| ) { |
| return doSchemaOutInOpAsync(opCode, writer, reader, null); |
| } |
| |
| public <T> CompletableFuture<T> doSchemaOutInOpAsync( |
| int opCode, |
| BiConsumer<ClientSchema, ClientMessagePacker> writer, |
| BiFunction<ClientSchema, ClientMessageUnpacker, T> reader, |
| T defaultValue |
| ) { |
| return getLatestSchema() |
| .thenCompose(schema -> |
| ch.serviceAsync(opCode, |
| w -> writer.accept(schema, w.out()), |
| r -> readSchemaAndReadData(schema, r.in(), reader, defaultValue))) |
| .thenCompose(t -> loadSchemaAndReadData(t, reader)); |
| } |
| |
| protected <T> CompletableFuture<T> doSchemaOutOpAsync( |
| int opCode, |
| BiConsumer<ClientSchema, ClientMessagePacker> writer, |
| Function<ClientMessageUnpacker, T> reader) { |
| return getLatestSchema() |
| .thenCompose(schema -> |
| ch.serviceAsync(opCode, |
| w -> writer.accept(schema, w.out()), |
| r -> reader.apply(r.in()))); |
| } |
| |
| private <T> Object readSchemaAndReadData( |
| ClientSchema knownSchema, |
| ClientMessageUnpacker in, |
| BiFunction<ClientSchema, ClientMessageUnpacker, T> fn, |
| T defaultValue |
| ) { |
| if (in.getNextFormat() == MessageFormat.NIL) |
| return defaultValue; |
| |
| var schemaVer = in.unpackInt(); |
| |
| var resSchema = schemaVer == knownSchema.version() ? knownSchema : schemas.get(schemaVer); |
| |
| if (resSchema != null) |
| return fn.apply(knownSchema, in); |
| |
| // Schema is not yet known - request. |
| // Retain unpacker - normally it is closed when this method exits. |
| return new IgniteBiTuple<>(in.retain(), schemaVer); |
| } |
| |
| private <T> CompletionStage<T> loadSchemaAndReadData( |
| Object data, |
| BiFunction<ClientSchema, ClientMessageUnpacker, T> fn |
| ) { |
| if (!(data instanceof IgniteBiTuple)) |
| return CompletableFuture.completedFuture((T) data); |
| |
| var biTuple = (IgniteBiTuple<ClientMessageUnpacker, Integer>) data; |
| |
| var in = biTuple.getKey(); |
| var schemaId = biTuple.getValue(); |
| |
| assert in != null; |
| assert schemaId != null; |
| |
| var resFut = getSchema(schemaId).thenApply(schema -> fn.apply(schema, in)); |
| |
| // Close unpacker. |
| resFut.handle((tuple, err) -> { |
| in.close(); |
| return null; |
| }); |
| |
| return resFut; |
| } |
| } |