blob: c766206b933b1bde24c11019fb2cd990e12b68bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.table;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.client.IgniteClientException;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.client.proto.ClientOp;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
import org.apache.ignite.table.mapper.KeyMapper;
import org.apache.ignite.table.mapper.RecordMapper;
import org.apache.ignite.table.mapper.ValueMapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.msgpack.core.MessageFormat;
/**
* Client table API implementation.
*/
public class ClientTable implements Table {
/** */
private final UUID id;
/** */
private final String name;
/** */
private final ReliableChannel ch;
/** */
private final ConcurrentHashMap<Integer, ClientSchema> schemas = new ConcurrentHashMap<>();
/** */
private volatile int latestSchemaVer = -1;
/** */
private final Object latestSchemaLock = new Object();
/**
* Constructor.
*
* @param ch Channel.
* @param id Table id.
* @param name Table name.
*/
public ClientTable(ReliableChannel ch, UUID id, String name) {
assert ch != null;
assert id != null;
assert name != null && !name.isEmpty();
this.ch = ch;
this.id = id;
this.name = name;
}
/**
* Gets the table id.
*
* @return Table id.
*/
public UUID tableId() {
return id;
}
/** {@inheritDoc} */
@Override public @NotNull String tableName() {
return name;
}
/** {@inheritDoc} */
@Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public KeyValueBinaryView kvView() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Table withTransaction(Transaction tx) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public TupleBuilder tupleBuilder() {
return new ClientTupleBuilder();
}
/** {@inheritDoc} */
@Override public Tuple get(@NotNull Tuple keyRec) {
return getAsync(keyRec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) {
return getLatestSchema().thenCompose(schema ->
ch.serviceAsync(ClientOp.TUPLE_GET, w -> writeTuple(keyRec, schema, w, true), r -> {
if (r.in().getNextFormat() == MessageFormat.NIL)
return null;
var schemaVer = r.in().unpackInt();
return new IgniteBiTuple<>(r, schemaVer);
})).thenCompose(biTuple -> {
if (biTuple == null)
return CompletableFuture.completedFuture(null);
assert biTuple.getKey() != null;
assert biTuple.getValue() != null;
return getSchema(biTuple.getValue()).thenApply(schema -> readTuple(schema, biTuple.getKey()));
});
}
/** {@inheritDoc} */
@Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void upsert(@NotNull Tuple rec) {
upsertAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) {
return getLatestSchema().thenCompose(schema -> ch.serviceAsync(ClientOp.TUPLE_UPSERT,
w -> writeTuple(rec, schema, w, false), r -> null));
}
/** {@inheritDoc} */
@Override public void upsertAll(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) {
return null;
}
/** {@inheritDoc} */
@Override public Tuple getAndUpsert(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean insert(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Tuple getAndReplace(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean delete(@NotNull Tuple keyRec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean deleteExact(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Tuple getAndDelete(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@NotNull Collection<Tuple> recs) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public <T extends Serializable> T invoke(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public <T extends Serializable> Map<Tuple, T> invokeAll(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public @Nullable Transaction transaction() {
throw new UnsupportedOperationException();
}
private CompletableFuture<ClientSchema> getLatestSchema() {
if (latestSchemaVer >= 0)
return CompletableFuture.completedFuture(schemas.get(latestSchemaVer));
return loadSchema(null);
}
private CompletableFuture<ClientSchema> getSchema(int ver) {
var schema = schemas.get(ver);
if (schema != null)
return CompletableFuture.completedFuture(schema);
return loadSchema(ver);
}
private CompletableFuture<ClientSchema> loadSchema(Integer ver) {
return ch.serviceAsync(ClientOp.SCHEMAS_GET, w -> {
w.out().packUuid(id);
if (ver == null)
w.out().packNil();
else {
w.out().packArrayHeader(1);
w.out().packInt(ver);
}
}, r -> {
int schemaCnt = r.in().unpackMapHeader();
if (schemaCnt == 0)
throw new IgniteClientException("Schema not found: " + ver);
ClientSchema last = null;
for (var i = 0; i < schemaCnt; i++)
last = readSchema(r.in());
return last;
});
}
private ClientSchema readSchema(ClientMessageUnpacker in) throws IOException {
var schemaVer = in.unpackInt();
var colCnt = in.unpackArrayHeader();
var columns = new ClientColumn[colCnt];
for (int i = 0; i < colCnt; i++) {
var propCnt = in.unpackArrayHeader();
assert propCnt >= 4;
var name = in.unpackString();
var type = in.unpackInt();
var isKey = in.unpackBoolean();
var isNullable = in.unpackBoolean();
// Skip unknown extra properties, if any.
in.skipValue(propCnt - 4);
var column = new ClientColumn(name, type, isNullable, isKey, i);
columns[i] = column;
}
var schema = new ClientSchema(schemaVer, columns);
schemas.put(schemaVer, schema);
synchronized (latestSchemaLock) {
if (schemaVer > latestSchemaVer) {
latestSchemaVer = schemaVer;
}
}
return schema;
}
/** {@inheritDoc} */
@Override public String toString() {
return IgniteToStringBuilder.toString(ClientTable.class, this);
}
private void writeTuple(@NotNull Tuple tuple, ClientSchema schema, PayloadOutputChannel w, boolean keyOnly) throws IOException {
// TODO: We should accept any Tuple implementation, but this requires extending the Tuple interface
// with methods to retrieve column list.
var rec = (ClientTupleBuilder) tuple;
var vals = new Object[keyOnly ? schema.keyColumnCount() : schema.columns().length];
for (var entry : rec.map().entrySet()) {
var col = schema.column(entry.getKey());
if (keyOnly && !col.key())
continue;
vals[col.schemaIndex()] = entry.getValue();
}
w.out().packUuid(id);
w.out().packInt(schema.version());
for (var val : vals)
w.out().packObject(val);
}
private Tuple readTuple(ClientSchema schema, PayloadInputChannel r) {
var builder = new ClientTupleBuilder();
try {
for (var col : schema.columns())
builder.set(col.name(), r.in().unpackObject(col.type()));
} catch (IOException e) {
throw new CompletionException(e);
}
return builder;
}
}