blob: 6fbcbe5f98c557e3fcaa58ff33e2c27d246f0d99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.fakes;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.booleanCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* Fake internal table.
*/
public class FakeInternalTable implements InternalTable {
public static final int PARTITIONS = 4;
/** Table name. */
private final String tableName;
/** Table ID. */
private final int tableId;
private final ColumnsExtractor keyExtractor;
/** Table data. */
private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
/** Data access listener. */
private BiConsumer<String, Object> dataAccessListener;
/**
* The constructor.
*
* @param tableName Name.
* @param tableId Id.
* @param keyExtractor Function which converts given binary row to an index key.
*/
public FakeInternalTable(String tableName, int tableId, ColumnsExtractor keyExtractor) {
this.tableName = tableName;
this.tableId = tableId;
this.keyExtractor = keyExtractor;
}
@Override
public MvTableStorage storage() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int partitions() {
return PARTITIONS;
}
@Override
public int tableId() {
return tableId;
}
@Override
public String name() {
return tableName;
}
@Override
public void name(String newName) {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public int partitionId(BinaryRowEx row) {
return 0;
}
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
return completedFuture(getImpl(keyRow.tupleSlice(), keyRow));
}
@Override
public CompletableFuture<BinaryRow> get(
BinaryRowEx keyRow,
HybridTimestamp readTimestamp,
ClusterNode recipientNode) {
return null;
}
private BinaryRow getImpl(ByteBuffer key, BinaryRow keyRow) {
onDataAccess("get", keyRow);
return data.get(key);
}
@Override
public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, @Nullable InternalTransaction tx) {
var res = new ArrayList<BinaryRow>();
for (var key : keyRows) {
var val = get(key, null);
if (val != null) {
res.add(val.getNow(null));
} else {
res.add(null);
}
}
onDataAccess("getAll", keyRows);
return completedFuture(res);
}
@Override
public CompletableFuture<List<BinaryRow>> getAll(
Collection<BinaryRowEx> keyRows,
HybridTimestamp readTimestamp,
ClusterNode recipientNode
) {
return null;
}
@Override
public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) {
upsertImpl(keyExtractor.extractColumns(row), row);
return nullCompletedFuture();
}
private void upsertImpl(BinaryTuple key, BinaryRow row) {
onDataAccess("upsert", row);
data.put(key.byteBuffer(), row);
}
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
for (var row : rows) {
upsert(row, tx);
}
onDataAccess("upsertAll", rows);
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, @Nullable BitSet deleted, int partition) {
int i = 0;
for (var row : rows) {
if (deleted != null && deleted.get(i)) {
delete(row, null);
} else {
upsert(row, null);
}
i++;
}
onDataAccess("updateAll", rows);
return nullCompletedFuture();
}
@Override
public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
@Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
BinaryRow res = getImpl(key.byteBuffer(), row);
upsertImpl(key, row);
onDataAccess("getAndUpsert", row);
return completedFuture(res);
}
@Override
public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
BinaryRow old = getImpl(key.byteBuffer(), row);
if (old == null) {
upsertImpl(key, row);
}
onDataAccess("insert", row);
return booleanCompletedFuture(old == null);
}
@Override
public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
if (!insert(row, tx).getNow(null)) {
skipped.add(row);
}
}
onDataAccess("insertAll", rows);
return completedFuture(skipped);
}
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
return booleanCompletedFuture(replaceImpl(key, row, tx) != null);
}
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(oldRow);
BinaryRow old = getImpl(key.byteBuffer(), oldRow);
if (old == null || !old.tupleSlice().equals(oldRow.tupleSlice())) {
onDataAccess("replace", oldRow);
return falseCompletedFuture();
}
upsertImpl(key, newRow);
onDataAccess("replace", oldRow);
return trueCompletedFuture();
}
private @Nullable BinaryRow replaceImpl(BinaryTuple key, BinaryRow row, @Nullable InternalTransaction tx) {
BinaryRow old = getImpl(key.byteBuffer(), row);
if (old == null) {
onDataAccess("replace", row);
return null;
}
upsertImpl(key, row);
onDataAccess("replace", row);
return old;
}
@Override
public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryTuple key = keyExtractor.extractColumns(row);
BinaryRow replace = replaceImpl(key, row, tx);
onDataAccess("getAndReplace", row);
return completedFuture(replace);
}
@Override
public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
BinaryRow old = getImpl(keyRow.tupleSlice(), keyRow);
if (old != null) {
data.remove(keyRow.tupleSlice());
}
onDataAccess("delete", keyRow);
return booleanCompletedFuture(old != null);
}
@Override
public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) {
var res = false;
BinaryTuple key = keyExtractor.extractColumns(oldRow);
BinaryRow old = getImpl(key.byteBuffer(), oldRow);
if (old != null && old.tupleSlice().equals(oldRow.tupleSlice())) {
data.remove(key.byteBuffer());
res = true;
}
onDataAccess("deleteExact", oldRow);
return booleanCompletedFuture(res);
}
@Override
public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row, @Nullable InternalTransaction tx) {
BinaryRow old = getImpl(row.tupleSlice(), row);
if (old != null) {
data.remove(row.tupleSlice());
}
onDataAccess("getAndDelete", row);
return completedFuture(old);
}
@Override
public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
if (!delete(row, tx).getNow(false)) {
skipped.add(row);
}
}
onDataAccess("deleteAll", rows);
return completedFuture(skipped);
}
@Override
public CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
if (!deleteExact(row, tx).getNow(false)) {
skipped.add(row);
}
}
onDataAccess("deleteAllExact", rows);
return completedFuture(skipped);
}
@Override
public Publisher<BinaryRow> scan(
int partId,
@Nullable InternalTransaction tx,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override
public Publisher<BinaryRow> scan(
int partId,
UUID txId,
TablePartitionId commitPartition,
String txCoordinatorId,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
@Nullable BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override
public Publisher<BinaryRow> scan(
int partId,
UUID txId,
HybridTimestamp readTimestamp,
ClusterNode recipientNode,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
@Nullable BitSet columnsToInclude,
String txCoordinatorId) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override
public Publisher<BinaryRow> scan(
int partId,
UUID txId,
HybridTimestamp readTimestamp,
ClusterNode recipientNode,
String txCoordinatorId
) {
return null;
}
@Override
public Publisher<BinaryRow> lookup(
int partId,
UUID txId,
TablePartitionId commitPartition,
String txCoordinatorId,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override
public Publisher<BinaryRow> lookup(
int partId,
UUID txId,
HybridTimestamp readTimestamp,
ClusterNode recipientNode,
int indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude,
String txCoordinatorId
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override
public TableRaftService tableRaftService() {
throw new IgniteInternalException(new OperationNotSupportedException());
}
@Override public TxStateTableStorage txStateStorage() {
return null;
}
@Override
public int partition(BinaryRowEx keyRow) {
return 0;
}
@Override
public void close() {
// No-op.
}
/**
* Sets the data access operation listener.
*
* @param dataAccessListener Data access operation listener.
*/
public void setDataAccessListener(BiConsumer<String, Object> dataAccessListener) {
this.dataAccessListener = dataAccessListener;
}
private void onDataAccess(String operation, Object arg) {
if (dataAccessListener != null) {
dataAccessListener.accept(operation, arg);
}
}
@Override
public @Nullable PendingComparableValuesTracker<HybridTimestamp, Void> getPartitionSafeTimeTracker(int partitionId) {
return null;
}
@Override
public @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId) {
return null;
}
@Override
public ScheduledExecutorService streamerFlushExecutor() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId partition) {
throw new UnsupportedOperationException("Not implemented yet");
}
}