blob: 4134b4397e2a5233fc829410ddb97627e07aa95b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
/**
* Key-value view implementation for binary user-object representation.
*/
public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinaryView {
/** Marshaller. */
private final TupleMarshallerImpl marsh;
/** Table manager. */
private final TableManager tblMgr;
/**
* Constructor.
*
* @param tbl Table storage.
* @param schemaReg Schema registry.
*/
public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg, TableManager tblMgr, Transaction tx) {
super(tbl, schemaReg, tx);
this.tblMgr = tblMgr;
marsh = new TupleMarshallerImpl(tblMgr, tbl, schemaReg);
}
/** {@inheritDoc} */
@Override public Tuple get(@NotNull Tuple key) {
return sync(getAsync(key));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple key) {
Objects.requireNonNull(key);
Row kRow = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
return tbl.get(kRow, tx) // Load async.
.thenApply(this::wrap) // Binary -> schema-aware row
.thenApply(TableRow::valueTuple); // Narrow to value.
}
/** {@inheritDoc} */
@Override public Map<Tuple, Tuple> getAll(@NotNull Collection<Tuple> keys) {
return sync(getAllAsync(keys));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@NotNull Collection<Tuple> keys) {
Objects.requireNonNull(keys);
return tbl.getAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()), tx)
.thenApply(ts -> ts.stream().filter(Objects::nonNull).map(this::wrap).collect(Collectors.toMap(TableRow::keyTuple, TableRow::valueTuple)));
}
/** {@inheritDoc} */
@Override public boolean contains(@NotNull Tuple key) {
return get(key) != null;
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> containsAsync(@NotNull Tuple key) {
return getAsync(key).thenApply(Objects::nonNull);
}
/** {@inheritDoc} */
@Override public void put(@NotNull Tuple key, Tuple val) {
sync(putAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> putAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
return tbl.upsert(row, tx);
}
/** {@inheritDoc} */
@Override public void putAll(@NotNull Map<Tuple, Tuple> pairs) {
sync(putAllAsync(pairs));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<Tuple, Tuple> pairs) {
Objects.requireNonNull(pairs);
return tbl.upsertAll(pairs.entrySet()
.stream()
.map(this::marshalPair)
.collect(Collectors.toList()), tx);
}
/** {@inheritDoc} */
@Override public Tuple getAndPut(@NotNull Tuple key, Tuple val) {
return sync(getAndPutAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
return tbl.getAndUpsert(row, tx)
.thenApply(this::wrap) // Binary -> schema-aware row
.thenApply(TableRow::valueTuple); // Narrow to value.
}
/** {@inheritDoc} */
@Override public boolean putIfAbsent(@NotNull Tuple key, @NotNull Tuple val) {
return sync(putIfAbsentAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
return tbl.insert(row, tx);
}
/** {@inheritDoc} */
@Override public boolean remove(@NotNull Tuple key) {
return sync(removeAsync(key));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
return tbl.delete(row, tx);
}
/** {@inheritDoc} */
@Override public boolean remove(@NotNull Tuple key, @NotNull Tuple val) {
return sync(removeAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key, @NotNull Tuple val) {
Objects.requireNonNull(key);
Objects.requireNonNull(val);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
return tbl.deleteExact(row, tx);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> removeAll(@NotNull Collection<Tuple> keys) {
Objects.requireNonNull(keys);
return sync(removeAllAsync(keys));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(@NotNull Collection<Tuple> keys) {
Objects.requireNonNull(keys);
return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()), tx)
.thenApply(t -> t.stream().filter(Objects::nonNull).map(this::wrap).map(TableRow::valueTuple).collect(Collectors.toList()));
}
/** {@inheritDoc} */
@Override public Tuple getAndRemove(@NotNull Tuple key) {
Objects.requireNonNull(key);
return sync(getAndRemoveAsync(key));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(@NotNull Tuple key) {
Objects.requireNonNull(key);
return tbl.getAndDelete(marsh.marshal(key, null), tx)
.thenApply(this::wrap)
.thenApply(TableRow::valueTuple);
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple key, Tuple val) {
return sync(replaceAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
return tbl.replace(row, tx);
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
return sync(replaceAsync(key, oldVal, newVal));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
Objects.requireNonNull(key);
Row oldRow = marshaller().marshal(key, oldVal); // Convert to portable format to pass TX/storage layer.
Row newRow = marshaller().marshal(key, newVal); // Convert to portable format to pass TX/storage layer.
return tbl.replace(oldRow, newRow, tx);
}
/** {@inheritDoc} */
@Override public Tuple getAndReplace(@NotNull Tuple key, Tuple val) {
return sync(getAndReplaceAsync(key, val));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
return tbl.getAndReplace(marsh.marshal(key, val), tx)
.thenApply(this::wrap)
.thenApply(TableRow::valueTuple);
}
/** {@inheritDoc} */
@Override public <R extends Serializable> R invoke(
@NotNull Tuple key,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
@NotNull Tuple key,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public <R extends Serializable> Map<Tuple, R> invokeAll(
@NotNull Collection<Tuple> keys,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<Map<Tuple, R>> invokeAllAsync(
@NotNull Collection<Tuple> keys,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public KVBinaryViewImpl withTransaction(Transaction tx) {
return new KVBinaryViewImpl(tbl, schemaReg, tblMgr, tx);
}
/**
* @return Marshaller.
*/
private TupleMarshaller marshaller() {
return marsh;
}
/**
* @param row Binary row.
* @return Row.
*/
protected Row wrap(BinaryRow row) {
if (row == null)
return null;
return schemaReg.resolve(row);
}
/**
* Marshals a key-value pair into the table row.
*
* @param pair A map entry represents the key-value pair.
* @return Row.
*/
private Row marshalPair(Map.Entry<Tuple, Tuple> pair) {
return marshaller().marshal(pair.getKey(), pair.getValue());
}
}