blob: 9103c4f9c35cdbd8588fd12d3b3095735e13f407 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.replication;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.AdditionalMatchers.gt;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.distributed.replicator.action.RequestTypes;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.marshaller.MarshallerException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.StaleTransactionOperationException;
import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.SingleClusterNodeResolver;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.tx.TransactionException;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junitpioneer.jupiter.cartesian.ArgumentSets;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Tests for partition replica listener. */
@ExtendWith(MockitoExtension.class)
@ExtendWith(ConfigurationExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final int PART_ID = 0;
private static final int CURRENT_SCHEMA_VERSION = 1;
private static final int NEXT_SCHEMA_VERSION = 2;
private static final int FUTURE_SCHEMA_VERSION = NEXT_SCHEMA_VERSION;
private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0;
private static final int TABLE_ID = 1;
private static final TablePartitionId commitPartitionId = new TablePartitionId(TABLE_ID, PART_ID);
private static final int ANOTHER_TABLE_ID = 2;
private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 1L;
private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
/** The storage stores partition data. */
private final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(PART_ID);
private final LockManager lockManager = new HeapLockManager();
private final Function<Command, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> {
if (cmd instanceof WriteIntentSwitchCommand) {
UUID txId = ((WriteIntentSwitchCommand) cmd).txId();
Set<RowId> rows = pendingRows.remove(txId);
HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) cmd).commitTimestamp();
assertNotNull(commitTimestamp);
if (rows != null) {
for (RowId row : rows) {
testMvPartitionStorage.commitWrite(row, commitTimestamp);
}
}
lockManager.releaseAll(txId);
} else if (cmd instanceof UpdateCommand) {
UUID txId = ((UpdateCommand) cmd).txId();
pendingRows.compute(txId, (txId0, v) -> {
if (v == null) {
v = new HashSet<>();
}
RowId rowId = new RowId(PART_ID, ((UpdateCommand) cmd).rowUuid());
v.add(rowId);
return v;
});
return completedFuture(new UpdateCommandResult(true));
} else if (cmd instanceof UpdateAllCommand) {
return completedFuture(new UpdateCommandResult(true));
} else if (cmd instanceof FinishTxCommand) {
FinishTxCommand command = (FinishTxCommand) cmd;
return completedFuture(new TransactionResult(command.commit() ? COMMITTED : ABORTED, command.commitTimestamp()));
}
return nullCompletedFuture();
};
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
/** Table messages factory. */
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
/** Partition group id. */
private final TablePartitionId grpId = new TablePartitionId(TABLE_ID, PART_ID);
/** Hybrid clock. */
private final HybridClock clock = new HybridClockImpl();
private final ClockService clockService = new TestClockService(clock);
/** The storage stores transaction states. */
private final TestTxStateStorage txStateStorage = new TestTxStateStorage();
/** Local cluster node. */
private final ClusterNode localNode = new ClusterNodeImpl("node1", "node1", NetworkAddress.from("127.0.0.1:127"));
/** Another (not local) cluster node. */
private final ClusterNode anotherNode = new ClusterNodeImpl("node2", "node2", NetworkAddress.from("127.0.0.2:127"));
private TransactionStateResolver transactionStateResolver;
private final PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TABLE_ID, PART_ID, testMvPartitionStorage);
@Mock
private RaftGroupService mockRaftClient;
@Mock
private TxManager txManager;
@Mock
private TopologyService topologySrv;
@Mock
private PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeClock;
@Mock
private ValidationSchemasSource validationSchemasSource;
@Spy
private final SchemaSyncService schemaSyncService = new AlwaysSyncedSchemaSyncService();
@Mock
private CatalogService catalogService;
private final TestCatalogServiceEventProducer catalogServiceEventProducer = new TestCatalogServiceEventProducer();
@Mock
private MessagingService messagingService;
@InjectConfiguration
private StorageUpdateConfiguration storageUpdateConfiguration;
@InjectConfiguration
private TransactionConfiguration transactionConfiguration;
/** Schema descriptor for tests. */
private SchemaDescriptor schemaDescriptor;
/** Schema descriptor, version 2. */
private SchemaDescriptor schemaDescriptorVersion2;
/** Key-value marshaller for tests. */
private KvMarshaller<TestKey, TestValue> kvMarshaller;
/** Key-value marshaller using schema version 2. */
private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2;
private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
TABLE_ID, 1, 2, "table", 1,
List.of(
new CatalogTableColumnDescriptor("intKey", ColumnType.INT32, false, 0, 0, 0, null),
new CatalogTableColumnDescriptor("strKey", ColumnType.STRING, false, 0, 0, 0, null),
new CatalogTableColumnDescriptor("intVal", ColumnType.INT32, false, 0, 0, 0, null),
new CatalogTableColumnDescriptor("strVal", ColumnType.STRING, false, 0, 0, 0, null)
),
List.of("intKey", "strKey"),
null,
DEFAULT_STORAGE_PROFILE
);
/** Placement driver. */
private PlacementDriver placementDriver;
/** Partition replication listener to test. */
private PartitionReplicaListener partitionReplicaListener;
/** Primary index. */
private Lazy<TableSchemaAwareIndexStorage> pkStorageSupplier;
/** If true the local replica is considered leader, false otherwise. */
private boolean localLeader;
/** The state is used to resolve write intent. */
@Nullable
private TxState txState;
private TxStateMeta txStateMeta;
/** Secondary sorted index. */
private TableSchemaAwareIndexStorage sortedIndexStorage;
/** Secondary hash index. */
private TableSchemaAwareIndexStorage hashIndexStorage;
private Function<Command, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure;
private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1);
@Captor
private ArgumentCaptor<Command> commandCaptor;
private final TestValue someValue = new TestValue(1, "v1");
@BeforeEach
public void beforeTest() {
doAnswer(invocation -> {
catalogServiceEventProducer.listen(invocation.getArgument(0), invocation.getArgument(1));
return null;
}).when(catalogService).listen(any(), any());
doAnswer(invocation -> {
catalogServiceEventProducer.removeListener(invocation.getArgument(0), invocation.getArgument(1));
return null;
}).when(catalogService).removeListener(any(), any());
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock -> {
if (!localLeader) {
return completedFuture(new LeaderWithTerm(new Peer(anotherNode.name()), 1L));
}
return completedFuture(new LeaderWithTerm(new Peer(localNode.name()), 1L));
});
when(mockRaftClient.run(any()))
.thenAnswer(invocationOnMock -> raftClientFutureClosure.apply(invocationOnMock.getArgument(0)));
when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock -> {
String consistentId = invocationOnMock.getArgument(0);
if (consistentId.equals(anotherNode.name())) {
return anotherNode;
} else if (consistentId.equals(localNode.name())) {
return localNode;
} else {
return null;
}
});
when(topologySrv.localMember()).thenReturn(localNode);
when(safeTimeClock.waitFor(any())).thenReturn(nullCompletedFuture());
when(safeTimeClock.current()).thenReturn(HybridTimestamp.MIN_VALUE);
when(validationSchemasSource.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(nullCompletedFuture());
lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
int pkIndexId = 1;
int sortedIndexId = 2;
int hashIndexId = 3;
schemaDescriptor = schemaDescriptorWith(CURRENT_SCHEMA_VERSION);
schemaDescriptorVersion2 = schemaDescriptorWith(NEXT_SCHEMA_VERSION);
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
pkStorageSupplier = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
pkIndexId,
new TestHashIndexStorage(PART_ID, mock(StorageHashIndexDescriptor.class)),
row2Tuple
));
SortedIndexStorage indexStorage = new TestSortedIndexStorage(PART_ID, new StorageSortedIndexDescriptor(sortedIndexId, List.of(
new StorageSortedIndexColumnDescriptor("intVal", NativeTypes.INT32, false, true)
)));
// 2 is the index of "intVal" in the list of all columns.
ColumnsExtractor columnsExtractor = BinaryRowConverter.columnsExtractor(schemaDescriptor, 2);
sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId, indexStorage, columnsExtractor);
hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
new TestHashIndexStorage(PART_ID, new StorageHashIndexDescriptor(hashIndexId, List.of(
new StorageHashIndexColumnDescriptor("intVal", NativeTypes.INT32, false)
))),
columnsExtractor
);
IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true, lockManager, row2Tuple);
IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId, PART_ID, lockManager, indexStorage, row2Tuple);
IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false, lockManager, row2Tuple);
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage().id(), pkStorage()))
);
CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
when(indexDescriptor.id()).thenReturn(pkIndexId);
when(catalogService.indexes(anyInt(), anyInt())).thenReturn(List.of(indexDescriptor));
configureTxManager(txManager);
doAnswer(invocation -> {
Object argument = invocation.getArgument(1);
if (argument instanceof TxStateCoordinatorRequest) {
TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) argument;
var resp = new TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
return completedFuture(resp);
}
return CompletableFuture.failedFuture(new Exception("Test exception"));
}).when(messagingService).invoke(any(ClusterNode.class), any(), anyLong());
doAnswer(invocation -> {
Object argument = invocation.getArgument(1);
if (argument instanceof TxStateCoordinatorRequest) {
TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) argument;
var resp = new TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
return completedFuture(resp);
}
return CompletableFuture.failedFuture(new Exception("Test exception"));
}).when(messagingService).invoke(anyString(), any(), anyLong());
ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() {
@Override
public ClusterNode getById(String id) {
return id.equals(localNode.id()) ? localNode : anotherNode;
}
@Override
public ClusterNode getByConsistentId(String consistentId) {
return consistentId.equals(localNode.name()) ? localNode : anotherNode;
}
};
transactionStateResolver = new TransactionStateResolver(
txManager,
clockService,
clusterNodeResolver,
messagingService,
mock(PlacementDriver.class),
new TxMessageSender(
messagingService,
mock(ReplicaService.class),
clockService,
transactionConfiguration
)
);
transactionStateResolver.start();
placementDriver = new TestPlacementDriver(localNode);
partitionReplicaListener = new PartitionReplicaListener(
testMvPartitionStorage,
mockRaftClient,
txManager,
lockManager,
Runnable::run,
PART_ID,
TABLE_ID,
() -> Map.of(pkLocker.id(), pkLocker, sortedIndexId, sortedIndexLocker, hashIndexId, hashIndexLocker),
pkStorageSupplier,
() -> Map.of(sortedIndexId, sortedIndexStorage, hashIndexId, hashIndexStorage),
clockService,
safeTimeClock,
txStateStorage,
transactionStateResolver,
new StorageUpdateHandler(
PART_ID,
partitionDataStorage,
indexUpdateHandler,
storageUpdateConfiguration
),
validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
placementDriver,
new SingleClusterNodeResolver(localNode),
new RemotelyTriggeredResourceRegistry(),
new DummySchemaManagerImpl(schemaDescriptor, schemaDescriptorVersion2)
);
kvMarshaller = marshallerFor(schemaDescriptor);
kvMarshallerVersion2 = marshallerFor(schemaDescriptorVersion2);
reset();
}
@AfterEach
public void clearMocks() {
Mockito.framework().clearInlineMocks();
}
private static SchemaDescriptor schemaDescriptorWith(int ver) {
return new SchemaDescriptor(ver, new Column[]{
new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
}, new Column[]{
new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
});
}
private static KvMarshaller<TestKey, TestValue> marshallerFor(SchemaDescriptor descriptor) {
MarshallerFactory marshallerFactory = new ReflectionMarshallerFactory();
return marshallerFactory.create(descriptor, TestKey.class, TestValue.class);
}
private TableSchemaAwareIndexStorage pkStorage() {
return Objects.requireNonNull(pkStorageSupplier.get());
}
private void reset() {
localLeader = true;
txState = null;
((TestHashIndexStorage) pkStorage().storage()).clear();
((TestHashIndexStorage) hashIndexStorage.storage()).clear();
((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
testMvPartitionStorage.clear();
pendingRows.clear();
}
@Test
public void testTxStateReplicaRequestEmptyState() throws Exception {
doAnswer(invocation -> {
UUID txId = invocation.getArgument(4);
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null));
return nullCompletedFuture();
}).when(txManager).finish(any(), any(), anyBoolean(), any(), any());
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(newTxId())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), "senderId");
TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(txMeta);
assertEquals(ABORTED, txMeta.txState());
}
@Test
public void testTxStateReplicaRequestCommitState() throws Exception {
UUID txId = newTxId();
txStateStorage.put(txId, new TxMeta(COMMITTED, clock.now()));
HybridTimestamp readTimestamp = clock.now();
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(txId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), localNode.id());
TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(txMeta);
assertEquals(COMMITTED, txMeta.txState());
assertNotNull(txMeta.commitTimestamp());
assertTrue(readTimestamp.compareTo(txMeta.commitTimestamp()) > 0);
}
@Test
public void testEnsureReplicaIsPrimaryThrowsPrimaryReplicaMissIfEnlistmentConsistencyTokenDoesNotMatchTheOneInLease() {
localLeader = false;
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(newTxId())
.enlistmentConsistencyToken(10L)
.build(), localNode.id());
assertThrowsWithCause(
() -> fut.get(1, TimeUnit.SECONDS).result(),
PrimaryReplicaMissException.class);
}
@Test
public void testEnsureReplicaIsPrimaryThrowsPrimaryReplicaMissIfNodeIdDoesNotMatchTheLeaseholder() {
localLeader = false;
((TestPlacementDriver) placementDriver).setPrimaryReplicaSupplier(() -> new TestReplicaMetaImpl("node3", "node3"));
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(newTxId())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), localNode.id());
assertThrowsWithCause(
() -> fut.get(1, TimeUnit.SECONDS).result(),
PrimaryReplicaMissException.class);
}
@Test
public void testReadOnlySingleRowReplicaRequestEmptyResult() throws Exception {
BinaryRow testBinaryKey = nextBinaryKey();
CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNull(binaryRow);
}
private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk) {
return doReadOnlySingleGet(pk, clock.now());
}
private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk, HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
.readTimestampLong(readTimestamp.longValue())
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
.requestType(RequestType.RO_GET)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
private CompletableFuture<ReplicaResult> doReadOnlyDirectSingleGet(BinaryRow pk) {
ReadOnlyDirectSingleRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
.groupId(grpId)
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
.requestType(RequestType.RO_GET)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
@Test
public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
testMvPartitionStorage.commitWrite(rowId, clock.now());
CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(binaryRow);
}
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() throws Exception {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(PART_ID);
txState = COMMITTED;
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now()));
CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(binaryRow);
}
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throws Exception {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null));
CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNull(binaryRow);
}
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throws Exception {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(PART_ID);
txState = ABORTED;
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null));
CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNull(binaryRow);
}
@Test
public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exception {
UUID txId = newTxId();
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
RowId rowId = new RowId(PART_ID);
int indexedVal = i % 5; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
BinaryTuple indexedValue = new BinaryTuple(1,
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.timestampLong(clock.nowLong())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(4, rows.size());
// Request second batch
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.timestampLong(clock.nowLong())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
// Request bounded.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(1))
.upperBoundPrefix(toIndexBound(3))
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(0, rows.size());
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.exactKey(toIndexKey(0))
.batchSize(5)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
}
@Test
public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exception {
UUID txId = newTxId();
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
RowId rowId = new RowId(PART_ID);
int indexedVal = i % 5; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
BinaryTuple indexedValue = new BinaryTuple(1,
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
.coordinatorId(localNode.id())
.build(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(4, rows.size());
// Request second batch
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
// Request bounded.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(1))
.upperBoundPrefix(toIndexBound(3))
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(0, rows.size());
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
.exactKey(toIndexKey(0))
.batchSize(5)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(2, rows.size());
}
@Test
public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Exception {
UUID txId = newTxId();
int hashIndexId = hashIndexStorage.id();
IntStream.range(0, 7).forEach(i -> {
RowId rowId = new RowId(PART_ID);
int indexedVal = i % 2; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
BinaryTuple indexedValue = new BinaryTuple(1,
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, PART_ID);
hashIndexStorage.storage().put(new IndexRowImpl(indexedValue, rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(0))
.batchSize(3)
.coordinatorId(localNode.id())
.build(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(3, rows.size());
// Request second batch
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(scanTxId)
.readTimestampLong(clock.nowLong())
.scanId(1L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(0))
.batchSize(1)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(1, rows.size());
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(5))
.batchSize(5)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(0, rows.size());
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
.exactKey(toIndexKey(1))
.batchSize(5)
.coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
assertNotNull(rows);
assertEquals(3, rows.size());
}
@Test
public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() {
UUID txId = newTxId();
BinaryRow testRow = binaryRow(0);
BinaryRow testRowPk = marshalQuietly(new TestKey(0, "k0"), kvMarshaller);
assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully());
checkRowInMvStorage(testRow, true);
BinaryRow br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v1"));
assertThat(doSingleRowRequest(txId, br, RequestType.RW_UPSERT), willCompleteSuccessfully());
checkRowInMvStorage(br, true);
assertThat(doSingleRowPkRequest(txId, testRowPk, RequestType.RW_DELETE), willCompleteSuccessfully());
checkNoRowInIndex(testRow);
assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully());
checkRowInMvStorage(testRow, true);
br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v2"));
assertThat(doSingleRowRequest(txId, br, RequestType.RW_GET_AND_REPLACE), willCompleteSuccessfully());
checkRowInMvStorage(br, true);
br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v3"));
assertThat(doSingleRowRequest(txId, br, RequestType.RW_GET_AND_UPSERT), willCompleteSuccessfully());
checkRowInMvStorage(br, true);
assertThat(doSingleRowPkRequest(txId, testRowPk, RequestType.RW_GET_AND_DELETE), willCompleteSuccessfully());
checkNoRowInIndex(br);
assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully());
checkRowInMvStorage(testRow, true);
assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_DELETE_EXACT), willCompleteSuccessfully());
checkNoRowInIndex(testRow);
cleanup(txId);
}
private static <K, V> Row marshalQuietly(K key, KvMarshaller<K, V> marshaller) {
try {
return marshaller.marshal(key);
} catch (MarshallerException e) {
throw new RuntimeException(e);
}
}
@Test
public void testWriteIntentOnPrimaryReplicaMultiRowOps() {
UUID txId = newTxId();
BinaryRow row0 = binaryRow(0);
BinaryRow row1 = binaryRow(1);
Collection<BinaryRow> rows = asList(row0, row1);
assertThat(doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL), willCompleteSuccessfully());
checkRowInMvStorage(row0, true);
checkRowInMvStorage(row1, true);
BinaryRow newRow0 = binaryRow(new TestKey(0, "k0"), new TestValue(2, "v2"));
BinaryRow newRow1 = binaryRow(new TestKey(1, "k1"), new TestValue(3, "v3"));
Collection<BinaryRow> newRows = asList(newRow0, newRow1);
assertThat(doMultiRowRequest(txId, newRows, RequestType.RW_UPSERT_ALL), willCompleteSuccessfully());
checkRowInMvStorage(row0, false);
checkRowInMvStorage(row1, false);
checkRowInMvStorage(newRow0, true);
checkRowInMvStorage(newRow1, true);
Collection<BinaryRow> newRowPks = List.of(
marshalQuietly(new TestKey(0, "k0"), kvMarshaller),
marshalQuietly(new TestKey(1, "k1"), kvMarshaller)
);
assertThat(doMultiRowPkRequest(txId, newRowPks, RequestType.RW_DELETE_ALL), willCompleteSuccessfully());
checkNoRowInIndex(row0);
checkNoRowInIndex(row1);
checkNoRowInIndex(newRow0);
checkNoRowInIndex(newRow1);
assertThat(doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL), willCompleteSuccessfully());
checkRowInMvStorage(row0, true);
checkRowInMvStorage(row1, true);
assertThat(doMultiRowRequest(txId, rows, RequestType.RW_DELETE_EXACT_ALL), willCompleteSuccessfully());
checkNoRowInIndex(row0);
checkNoRowInIndex(row1);
cleanup(txId);
}
private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
return doSingleRowRequest(txId, binaryRow, requestType, false);
}
private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
.schemaVersion(binaryRow.schemaVersion())
.binaryTuple(binaryRow.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
);
}
private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
return doSingleRowPkRequest(txId, binaryRow, requestType, false);
}
private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
.schemaVersion(binaryRow.schemaVersion())
.primaryKey(binaryRow.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
);
}
private TablePartitionIdMessage commitPartitionId() {
return TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
.partitionId(PART_ID)
.tableId(TABLE_ID)
.build();
}
private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
return doMultiRowRequest(txId, binaryRows, requestType, false);
}
private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
.schemaVersion(binaryRows.iterator().next().schemaVersion())
.binaryTuples(binaryRowsToBuffers(binaryRows))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
);
}
static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow> binaryRows) {
return binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList());
}
private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
return doMultiRowPkRequest(txId, binaryRows, requestType, false);
}
private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
.schemaVersion(binaryRows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(binaryRows))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
);
}
@Test
public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
UUID txId = newTxId();
AtomicInteger counter = new AtomicInteger();
testWriteIntentOnPrimaryReplica(
txId,
() -> {
BinaryRow binaryRow = binaryRow(counter.getAndIncrement());
return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_INSERT)
.schemaVersion(binaryRow.schemaVersion())
.binaryTuple(binaryRow.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
},
() -> checkRowInMvStorage(binaryRow(0), true)
);
cleanup(txId);
}
@Test
public void testWriteIntentOnPrimaryReplicaUpdateAll() {
UUID txId = newTxId();
AtomicInteger counter = new AtomicInteger();
testWriteIntentOnPrimaryReplica(
txId,
() -> {
int cntr = counter.getAndIncrement();
BinaryRow binaryRow0 = binaryRow(cntr * 2);
BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1);
return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_UPSERT_ALL)
.schemaVersion(binaryRow0.schemaVersion())
.binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice()))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
},
() -> checkRowInMvStorage(binaryRow(0), true)
);
cleanup(txId);
}
private void checkRowInMvStorage(BinaryRow binaryRow, boolean shouldBePresent) {
Cursor<RowId> cursor = pkStorage().get(binaryRow);
if (shouldBePresent) {
boolean found = false;
// There can be write intents for deletion.
while (cursor.hasNext()) {
RowId rowId = cursor.next();
BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
if (equalToRow(binaryRow).matches(row)) {
found = true;
}
}
assertTrue(found);
} else {
RowId rowId = cursor.next();
BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
assertTrue(row == null || !row.equals(binaryRow));
}
}
private void checkNoRowInIndex(BinaryRow binaryRow) {
try (Cursor<RowId> cursor = pkStorage().get(binaryRow)) {
assertFalse(cursor.hasNext());
}
}
private void testWriteIntentOnPrimaryReplica(
UUID txId,
Supplier<ReadWriteReplicaRequest> updatingRequestSupplier,
Runnable checkAfterFirstOperation
) {
partitionReplicaListener.invoke(updatingRequestSupplier.get(), localNode.id());
checkAfterFirstOperation.run();
// Check that cleanup request processing awaits all write requests.
CompletableFuture<UpdateCommandResult> writeFut = new CompletableFuture<>();
raftClientFutureClosure = cmd -> writeFut;
try {
CompletableFuture<ReplicaResult> replicaWriteFut = partitionReplicaListener.invoke(updatingRequestSupplier.get(),
localNode.id());
assertTrue(replicaWriteFut.isDone());
raftClientFutureClosure = defaultMockRaftFutureClosure;
HybridTimestamp now = clock.now();
// Imitation of tx commit.
txStateStorage.put(txId, new TxMeta(COMMITTED, now));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID().toString(), commitPartitionId, now));
CompletableFuture<?> replicaCleanupFut = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
.groupId(grpId)
.txId(txId)
.commit(true)
.commitTimestampLong(now.longValue())
.build(),
localNode.id()
);
assertFalse(replicaCleanupFut.isDone());
writeFut.complete(new UpdateCommandResult(true));
assertThat(replicaCleanupFut, willSucceedFast());
} finally {
raftClientFutureClosure = defaultMockRaftFutureClosure;
}
// Check that one more write after cleanup is discarded.
CompletableFuture<?> writeAfterCleanupFuture = partitionReplicaListener.invoke(updatingRequestSupplier.get(), localNode.id());
assertThat(writeAfterCleanupFuture, willThrowFast(TransactionException.class));
}
@Test
void testWriteIntentBearsLastCommitTimestamp() {
BinaryRow br1 = binaryRow(1);
BinaryRow br2 = binaryRow(2);
// First insert a row
UUID tx0 = newTxId();
upsert(tx0, br1);
upsert(tx0, br2);
cleanup(tx0);
raftClientFutureClosure = partitionCommand -> {
assertTrue(partitionCommand instanceof UpdateCommandImpl);
UpdateCommandImpl impl = (UpdateCommandImpl) partitionCommand;
assertNotNull(impl.messageRowToUpdate());
assertNotNull(impl.messageRowToUpdate().binaryRow());
assertNotNull(impl.messageRowToUpdate().timestamp());
return defaultMockRaftFutureClosure.apply(partitionCommand);
};
UUID tx1 = newTxId();
upsert(tx1, br1);
}
/**
* Puts several records into the storage, optionally leaving them as write intents, alternately deleting and upserting the same row
* within the same RW transaction, then checking read correctness via read only request.
*
* @param insertFirst Whether to insert some values before RW transaction.
* @param upsertAfterDelete Whether to insert value after delete in RW transaction, so that it would present as non-null write
* intent.
* @param committed Whether to commit RW transaction before doing RO request.
* @param multiple Whether to check multiple rows via getAll request.
*/
@CartesianTest
void testReadOnlyGetAfterRowRewrite(
@Values(booleans = {false, true}) boolean insertFirst,
@Values(booleans = {false, true}) boolean upsertAfterDelete,
@Values(booleans = {false, true}) boolean committed,
@Values(booleans = {false, true}) boolean multiple
) {
BinaryRow br1 = binaryRow(1);
BinaryRow br1Pk = marshalQuietly(new TestKey(1, "k" + 1), kvMarshaller);
BinaryRow br2 = binaryRow(2);
BinaryRow br2Pk = marshalQuietly(new TestKey(2, "k" + 2), kvMarshaller);
// Preloading the data if needed.
if (insertFirst) {
UUID tx0 = newTxId();
upsert(tx0, br1);
upsert(tx0, br2);
cleanup(tx0);
}
txState = null;
// Delete the same row 2 times within the same transaction to generate garbage rows in storage.
// If the data was not preloaded, there will be one deletion actually.
UUID tx1 = newTxId();
delete(tx1, br1Pk);
upsert(tx1, br1);
delete(tx1, br1Pk);
if (upsertAfterDelete) {
upsert(tx1, br1);
}
if (!insertFirst && !upsertAfterDelete) {
Cursor<RowId> cursor = pkStorage().get(br1);
// Data was not preloaded or inserted after deletion.
assertFalse(cursor.hasNext());
} else {
// We create a null row with a row id having minimum possible value to ensure this row would be the first in cursor.
// This is needed to check that this row will be skipped by RO tx and it will see the data anyway.
// TODO https://issues.apache.org/jira/browse/IGNITE-18767 after this, the following check may be not needed.
RowId emptyRowId = new RowId(PART_ID, new UUID(Long.MIN_VALUE, Long.MIN_VALUE));
testMvPartitionStorage.addWrite(emptyRowId, null, tx1, TABLE_ID, PART_ID);
if (committed) {
testMvPartitionStorage.commitWrite(emptyRowId, clock.now());
}
pkStorage().put(br1, emptyRowId);
}
// If committed, there will be actual values in storage, otherwise write intents.
if (committed) {
cleanup(tx1);
}
if (multiple) {
List<BinaryRow> allRowsPks = insertFirst ? List.of(br1Pk, br2Pk) : List.of(br1Pk);
List<BinaryRow> allRows = insertFirst ? List.of(br1, br2) : List.of(br1);
List<BinaryRow> allRowsButModified = insertFirst ? Arrays.asList(null, br2) : singletonList((BinaryRow) null);
List<BinaryRow> expected = committed
? (upsertAfterDelete ? allRows : allRowsButModified)
: (insertFirst ? allRows : singletonList((BinaryRow) null));
List<BinaryRow> res = roGetAll(allRowsPks, clock.now());
assertEquals(allRows.size(), res.size());
List<Matcher<? super BinaryRow>> matchers = expected.stream()
.map(row -> row == null ? nullValue(BinaryRow.class) : equalToRow(row))
.collect(toList());
assertThat(res, contains(matchers));
} else {
BinaryRow res = roGet(br1Pk, clock.nowLong());
BinaryRow expected = committed
? (upsertAfterDelete ? br1 : null)
: (insertFirst ? br1 : null);
assertThat(res, is(expected == null ? nullValue(BinaryRow.class) : equalToRow(expected)));
}
cleanup(tx1);
}
@Test
public void abortsSuccessfully() {
AtomicReference<Boolean> committed = interceptFinishTxCommand();
CompletableFuture<?> future = beginAndAbortTx();
assertThat(future, willSucceedFast());
assertThat(committed.get(), is(false));
}
private CompletableFuture<?> beginAndAbortTx() {
when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture());
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdFor(beginTimestamp);
TxFinishReplicaRequest commitRequest = TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(grpId)
.txId(txId)
.groups(Map.of(grpId, localNode.name()))
.commit(false)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(commitRequest, localNode.id());
}
private static UUID transactionIdFor(HybridTimestamp beginTimestamp) {
return TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTimestamp);
}
@Test
public void commitsOnSameSchemaSuccessfully() {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col")))
));
AtomicReference<Boolean> committed = interceptFinishTxCommand();
CompletableFuture<?> future = beginAndCommitTx();
assertThat(future, willSucceedFast());
assertThat(committed.get(), is(true));
}
private static CatalogTableColumnDescriptor nullableColumn(String colName) {
return new CatalogTableColumnDescriptor(colName, ColumnType.INT32, true, 0, 0, 0, DefaultValue.constant(null));
}
private static CatalogTableColumnDescriptor defaultedColumn(String colName, int defaultValue) {
return new CatalogTableColumnDescriptor(colName, ColumnType.INT32, false, 0, 0, 0, DefaultValue.constant(defaultValue));
}
private static FullTableSchema tableSchema(int schemaVersion, List<CatalogTableColumnDescriptor> columns) {
return new FullTableSchema(schemaVersion, TABLE_ID, "test", columns);
}
private AtomicReference<Boolean> interceptFinishTxCommand() {
AtomicReference<Boolean> committed = new AtomicReference<>();
raftClientFutureClosure = command -> {
if (command instanceof FinishTxCommand) {
committed.set(((FinishTxCommand) command).commit());
}
return defaultMockRaftFutureClosure.apply(command);
};
return committed;
}
private CompletableFuture<?> beginAndCommitTx() {
when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture());
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdFor(beginTimestamp);
HybridTimestamp commitTimestamp = clock.now();
TxFinishReplicaRequest commitRequest = TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(grpId)
.txId(txId)
.groups(Map.of(grpId, localNode.name()))
.commit(true)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(commitRequest, localNode.id());
}
@Test
public void commitsOnCompatibleSchemaChangeSuccessfully() {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col1"))),
// Addition of a nullable column is forward-compatible.
tableSchema(FUTURE_SCHEMA_VERSION, List.of(nullableColumn("col1"), nullableColumn("col2")))
));
AtomicReference<Boolean> committed = interceptFinishTxCommand();
CompletableFuture<?> future = beginAndCommitTx();
assertThat(future, willSucceedFast());
assertThat(committed.get(), is(true));
}
@Test
public void abortsCommitOnIncompatibleSchema() {
simulateForwardIncompatibleSchemaChange(CURRENT_SCHEMA_VERSION, FUTURE_SCHEMA_VERSION);
AtomicReference<Boolean> committed = interceptFinishTxCommand();
CompletableFuture<?> future = beginAndCommitTx();
MismatchingTransactionOutcomeException ex = assertWillThrowFast(future,
MismatchingTransactionOutcomeException.class);
assertThat(ex.getMessage(), containsString("Commit failed because schema 1 is not forward-compatible with 2"));
assertThat(committed.get(), is(false));
}
private void simulateForwardIncompatibleSchemaChange(int fromSchemaVersion, int toSchemaVersion) {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
.thenReturn(incompatibleSchemaVersions(fromSchemaVersion, toSchemaVersion));
}
private void simulateBackwardIncompatibleSchemaChange(int fromSchemaVersion, int toSchemaVersion) {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), anyInt()))
.thenReturn(incompatibleSchemaVersions(fromSchemaVersion, toSchemaVersion));
}
private static List<FullTableSchema> incompatibleSchemaVersions(int fromSchemaVersion, int toSchemaVersion) {
return List.of(
tableSchema(fromSchemaVersion, List.of(defaultedColumn("col", 4))),
tableSchema(toSchemaVersion, List.of(defaultedColumn("col", 5)))
);
}
@ParameterizedTest
@MethodSource("singleRowRequestTypes")
public void failsWhenReadingSingleRowFromFutureIncompatibleSchema(RequestType requestType) {
switch (requestType) {
case RW_GET:
case RW_DELETE:
case RW_GET_AND_DELETE:
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType)
);
break;
default:
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType)
);
}
}
private BinaryRow marshalKeyOrKeyValue(RequestType requestType, TestKey key) {
try {
return RequestTypes.isKeyOnly(requestType) ? marshalQuietly(key, kvMarshaller) : kvMarshaller.marshal(key, someValue);
} catch (MarshallerException e) {
throw new AssertionError(e);
}
}
private void testFailsWhenReadingFromFutureIncompatibleSchema(RwListenerInvocation listenerInvocation) {
UUID targetTxId = transactionIdFor(clock.now());
TestKey key = simulateWriteWithSchemaVersionFromFuture();
simulateBackwardIncompatibleSchemaChange(CURRENT_SCHEMA_VERSION, FUTURE_SCHEMA_VERSION);
AtomicReference<Boolean> committed = interceptFinishTxCommand();
CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
assertFailureDueToBackwardIncompatibleSchemaChange(future, committed);
}
private static Stream<Arguments> singleRowRequestTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isSingleRowRw)
.map(Arguments::of);
}
private TestKey simulateWriteWithSchemaVersionFromFuture() {
UUID futureSchemaVersionTxId = transactionIdFor(clock.now());
TestKey key = nextKey();
BinaryRow futureSchemaVersionRow = binaryRow(key, new TestValue(2, "v2"), kvMarshallerVersion2);
var rowId = new RowId(PART_ID);
BinaryTuple indexedValue = new BinaryTuple(1,
new BinaryTupleBuilder(1).appendInt(FUTURE_SCHEMA_ROW_INDEXED_VALUE).build()
);
pkStorage().put(futureSchemaVersionRow, rowId);
testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow, futureSchemaVersionTxId, TABLE_ID, PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
return key;
}
private static void assertFailureDueToBackwardIncompatibleSchemaChange(
CompletableFuture<?> future,
AtomicReference<Boolean> committed
) {
IncompatibleSchemaException ex = assertWillThrowFast(future,
IncompatibleSchemaException.class);
assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
assertThat(ex.getMessage(), containsString("Operation failed because schema 1 is not backward-compatible with 2"));
// Tx should not be finished.
assertThat(committed.get(), is(nullValue()));
}
@ParameterizedTest
@MethodSource("multiRowsRequestTypes")
public void failsWhenReadingMultiRowsFromFutureIncompatibleSchema(RequestType requestType) {
if (requestType == RequestType.RW_GET_ALL || requestType == RequestType.RW_DELETE_ALL) {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType)
);
} else {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType)
);
}
}
private static Stream<Arguments> multiRowsRequestTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isMultipleRowsRw)
.map(Arguments::of);
}
@Test
public void failsWhenReplacingOnTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doReplaceRequest(
targetTxId,
binaryRow(key, new TestValue(1, "v1")),
binaryRow(key, new TestValue(3, "v3"))
)
);
}
private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow) {
return doReplaceRequest(targetTxId, oldRow, newRow, false);
}
private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.requestType(RequestType.RW_REPLACE)
.schemaVersion(oldRow.schemaVersion())
.oldBinaryTuple(oldRow.tupleSlice())
.newBinaryTuple(newRow.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
);
}
@Test
public void failsWhenScanByExactMatchReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
.exactKey(toIndexKey(FUTURE_SCHEMA_ROW_INDEXED_VALUE))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(),
localNode.id()
)
);
}
@Test
public void failsWhenScanByIndexReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(),
localNode.id()
)
);
}
@Test
public void failsWhenScanReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
(targetTxId, key) -> doRwScanRetrieveBatchRequest(targetTxId)
);
}
private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.full(false)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build(),
localNode.id()
);
}
private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.scanId(1)
.build(),
localNode.id()
);
}
private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, HybridTimestamp readTimestamp) {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
.scanId(1)
.batchSize(100)
.readTimestampLong(readTimestamp.longValue())
.coordinatorId(localNode.id())
.build(),
localNode.id()
);
}
@ParameterizedTest
@MethodSource("singleRowWriteRequestTypes")
public void singleRowWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType) {
if (requestType == RequestType.RW_DELETE || requestType == RequestType.RW_GET_AND_DELETE) {
testWritesAreSuppliedWithRequiredCatalogVersion(
requestType,
(targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType)
);
} else {
testWritesAreSuppliedWithRequiredCatalogVersion(
requestType,
(targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType)
);
}
}
private static Stream<Arguments> singleRowWriteRequestTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isSingleRowWrite)
.map(Arguments::of);
}
private static void configureTxManager(TxManager txManager) {
ConcurrentHashMap<UUID, TxStateMeta> txStateMap = new ConcurrentHashMap<>();
doAnswer(invocation -> txStateMap.get(invocation.getArgument(0)))
.when(txManager).stateMeta(any());
doAnswer(invocation -> {
UUID txId = invocation.getArgument(0);
Function<TxStateMeta, TxStateMeta> updater = invocation.getArgument(1);
txStateMap.compute(txId, (k, oldMeta) -> {
TxStateMeta newMeta = updater.apply(oldMeta);
if (newMeta == null) {
return null;
}
TxState oldState = oldMeta == null ? null : oldMeta.txState();
return checkTransitionCorrectness(oldState, newMeta.txState()) ? newMeta : oldMeta;
});
return null;
}).when(txManager).updateTxMeta(any(), any());
doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation -> nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture()).when(txManager).cleanup(anyString(), any());
}
private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (RequestTypes.looksUpFirst(requestType)) {
upsertInNewTxFor(key);
// While handling the upsert, our mocks were touched, let's reset them to prevent false-positives during verification.
Mockito.reset(schemaSyncService);
}
when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42);
UUID targetTxId = newTxId();
CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
assertThat(future, willCompleteSuccessfully());
// Make sure catalog required version is filled in the executed update command.
verify(mockRaftClient, atLeast(1)).run(commandCaptor.capture());
List<Command> commands = commandCaptor.getAllValues();
Command updateCommand = commands.get(commands.size() - 1);
assertThat(updateCommand, is(instanceOf(CatalogVersionAware.class)));
CatalogVersionAware catalogVersionAware = (CatalogVersionAware) updateCommand;
assertThat(catalogVersionAware.requiredCatalogVersion(), is(42));
}
private void upsertInNewTxFor(TestKey key) {
UUID tx0 = newTxId();
upsert(tx0, binaryRow(key, someValue));
cleanup(tx0);
}
@Test
public void replaceRequestIsSuppliedWithRequiredCatalogVersion() {
testWritesAreSuppliedWithRequiredCatalogVersion(RequestType.RW_REPLACE, (targetTxId, key) -> {
return doReplaceRequest(
targetTxId,
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key)
);
});
}
@ParameterizedTest
@MethodSource("multiRowsWriteRequestTypes")
public void multiRowWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType) {
if (requestType == RequestType.RW_DELETE_ALL) {
testWritesAreSuppliedWithRequiredCatalogVersion(
requestType,
(targetTxId, key) -> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType)
);
} else {
testWritesAreSuppliedWithRequiredCatalogVersion(
requestType,
(targetTxId, key) -> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType)
);
}
}
private static Stream<Arguments> multiRowsWriteRequestTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isMultipleRowsWrite)
.map(Arguments::of);
}
@CartesianTest
@CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
void singleRowRwOperationsFailIfTableAlteredAfterTxStart(
RequestType requestType,
boolean onExistingRow,
boolean full
) {
RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
}
@SuppressWarnings("unused")
private static ArgumentSets singleRowRwOperationTypesFactory() {
return ArgumentSets.argumentsForFirstParameter(singleRowRwOperationTypes())
.argumentsForNextParameter(false, true)
.argumentsForNextParameter(false, true);
}
private static Stream<RequestType> singleRowRwOperationTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isSingleRowRw);
}
private void testRwOperationFailsIfTableWasAlteredAfterTxStart(
RequestType requestType,
boolean onExistingRow,
RwListenerInvocation listenerInvocation
) {
TestKey key = nextKey();
if (onExistingRow) {
upsertInNewTxFor(key);
}
UUID txId = newTxId();
HybridTimestamp txBeginTs = beginTimestamp(txId);
makeSchemaChangeAfter(txBeginTs);
CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
boolean expectValidationFailure;
if (RequestTypes.neverMisses(requestType)) {
expectValidationFailure = true;
} else {
expectValidationFailure = onExistingRow == RequestTypes.writesIfKeyDoesNotExist(requestType);
}
if (expectValidationFailure) {
IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
assertThat(
ex.getMessage(),
is("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2]")
);
} else {
assertThat(future, willCompleteSuccessfully());
}
}
private void makeSchemaChangeAfter(HybridTimestamp txBeginTs) {
CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1);
when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
}
@CartesianTest
@CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
void multiRowRwOperationsFailIfTableAlteredAfterTxStart(
RequestType requestType, boolean onExistingRow, boolean full
) {
RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
}
@SuppressWarnings("unused")
private static ArgumentSets multiRowRwOperationTypesFactory() {
return ArgumentSets.argumentsForFirstParameter(multiRowRwOperationTypes())
.argumentsForNextParameter(false, true)
.argumentsForNextParameter(false, true);
}
private static Stream<RequestType> multiRowRwOperationTypes() {
return Arrays.stream(RequestType.values())
.filter(RequestTypes::isMultipleRowsRw);
}
@CartesianTest
void replaceRequestFailsIfTableAlteredAfterTxStart(
@Values(booleans = {false, true}) boolean onExistingRow,
@Values(booleans = {false, true}) boolean full
) {
testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
return doReplaceRequest(
targetTxId,
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
full
);
});
}
@CartesianTest
void rwScanRequestFailsIfTableAlteredAfterTxStart(@Values(booleans = {false, true}) boolean onExistingRow) {
testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
return doRwScanRetrieveBatchRequest(targetTxId);
});
}
@Test
void rwScanCloseRequestSucceedsIfTableAlteredAfterTxStart() {
UUID txId = newTxId();
HybridTimestamp txBeginTs = beginTimestamp(txId);
makeSchemaChangeAfter(txBeginTs);
CompletableFuture<?> future = doRwScanCloseRequest(txId);
assertThat(future, willCompleteSuccessfully());
}
@CartesianTest
@CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
}
private void testRwOperationFailsIfTableWasDropped(boolean onExistingRow, RwListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (onExistingRow) {
upsertInNewTxFor(key);
}
UUID txId = newTxId();
HybridTimestamp txBeginTs = beginTimestamp(txId);
makeTableBeDroppedAfter(txBeginTs);
CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
}
private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs) {
makeTableBeDroppedAfter(txBeginTs, TABLE_ID);
}
private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs, int tableId) {
CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
when(catalogService.table(tableId, txBeginTs.longValue())).thenReturn(tableVersion1);
when(catalogService.table(eq(tableId), gt(txBeginTs.longValue()))).thenReturn(null);
}
@CartesianTest
@CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
}
@CartesianTest
void replaceRequestFailsIfTableWasDropped(
@Values(booleans = {false, true}) boolean onExistingRow,
@Values(booleans = {false, true}) boolean full
) {
testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
return doReplaceRequest(
targetTxId,
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
full
);
});
}
@CartesianTest
void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
return doRwScanRetrieveBatchRequest(targetTxId);
});
}
@Test
void rwScanCloseRequestSucceedsIfTableWasDropped() {
UUID txId = newTxId();
HybridTimestamp txBeginTs = beginTimestamp(txId);
makeTableBeDroppedAfter(txBeginTs);
CompletableFuture<?> future = doRwScanCloseRequest(txId);
assertThat(future, willCompleteSuccessfully());
}
@CartesianTest
void singleRowRoGetFailsIfTableWasDropped(
@Values(booleans = {false, true}) boolean direct,
@Values(booleans = {false, true}) boolean onExistingRow
) {
testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
if (direct) {
return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller));
} else {
return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), readTimestamp);
}
});
}
private void testRoOperationFailsIfTableWasDropped(boolean onExistingRow, RoListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (onExistingRow) {
upsertInNewTxFor(key);
}
UUID txId = newTxId();
HybridTimestamp readTs = clock.now();
when(catalogService.table(eq(TABLE_ID), anyLong())).thenReturn(null);
CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key);
IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
}
@CartesianTest
void multiRowRoGetFailsIfTableWasDropped(
@Values(booleans = {false, true}) boolean direct,
@Values(booleans = {false, true}) boolean onExistingRow
) {
testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
if (direct) {
return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller)));
} else {
return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp);
}
});
}
@CartesianTest
void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp);
});
}
@Test
void commitRequestFailsIfCommitPartitionTableWasDropped() {
testCommitRequestIfTableWasDropped(grpId, Map.of(grpId, localNode.name()), grpId.tableId());
}
@Test
void commitRequestFailsIfNonCommitPartitionTableWasDropped() {
TablePartitionId anotherPartitionId = new TablePartitionId(ANOTHER_TABLE_ID, 0);
testCommitRequestIfTableWasDropped(grpId, Map.of(grpId, localNode.name(), anotherPartitionId, localNode.name()),
anotherPartitionId.tableId());
}
private void testCommitRequestIfTableWasDropped(
TablePartitionId commitPartitionId,
Map<ReplicationGroupId, String> groups,
int tableToBeDroppedId
) {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col")))
));
when(txManager.cleanup(any(), anyBoolean(), any(), any())).thenReturn(nullCompletedFuture());
AtomicReference<Boolean> committed = interceptFinishTxCommand();
UUID txId = newTxId();
HybridTimestamp txBeginTs = beginTimestamp(txId);
makeTableBeDroppedAfter(txBeginTs, tableToBeDroppedId);
CompletableFuture<?> future = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(commitPartitionId)
.groups(groups)
.txId(txId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commit(true)
.commitTimestampLong(clock.nowLong())
.build(),
localNode.id()
);
MismatchingTransactionOutcomeException ex = assertWillThrowFast(future, MismatchingTransactionOutcomeException.class);
assertThat(ex.getMessage(), is("Commit failed because a table was already dropped [tableId=" + tableToBeDroppedId + "]"));
assertThat("The transaction must have been aborted", committed.get(), is(false));
}
@CartesianTest
@CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) {
RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, invocation);
}
private void testRwOperationFailsIfSchemaVersionMismatchesTx(boolean onExistingRow, RwListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (onExistingRow) {
upsertInNewTxFor(key);
}
UUID txId = newTxId();
makeSchemaBeNextVersion();
CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
assertThat(future, willThrow(InternalSchemaVersionMismatchException.class));
}
private void makeSchemaBeNextVersion() {
CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
when(catalogService.table(eq(TABLE_ID), anyLong())).thenReturn(tableVersion2);
}
@CartesianTest
@CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
void multiRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) {
RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
invocation = (targetTxId, key)
-> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
} else {
fail("Uncovered type: " + requestType);
}
testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, invocation);
}
@CartesianTest
void replaceRequestFailsIfSchemaVersionMismatchesTx(
@Values(booleans = {false, true}) boolean onExistingRow,
@Values(booleans = {false, true}) boolean full
) {
testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, key) -> {
return doReplaceRequest(
targetTxId,
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
full
);
});
}
@CartesianTest
void singleRowRoGetFailsIfSchemaVersionMismatchesTx(
@Values(booleans = {false, true}) boolean direct,
@Values(booleans = {false, true}) boolean onExistingRow
) {
testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, readTimestamp, key) -> {
if (direct) {
return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller));
} else {
return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), readTimestamp);
}
});
}
private void testRoOperationFailsIfSchemaVersionMismatchesTx(boolean onExistingRow, RoListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (onExistingRow) {
upsertInNewTxFor(key);
}
UUID txId = newTxId();
HybridTimestamp readTs = clock.now();
makeSchemaBeNextVersion();
CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key);
assertThat(future, willThrow(InternalSchemaVersionMismatchException.class));
}
@CartesianTest
void multiRowRoGetFailsIfSchemaVersionMismatchesTx(
@Values(booleans = {false, true}) boolean direct,
@Values(booleans = {false, true}) boolean onExistingRow
) {
testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, readTimestamp, key) -> {
if (direct) {
return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller)));
} else {
return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp);
}
});
}
private UUID newTxId() {
return transactionIdFor(clock.now());
}
private void upsert(UUID txId, BinaryRow row) {
assertThat(upsertAsync(txId, row), willCompleteSuccessfully());
}
private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row) {
return upsertAsync(txId, row, false);
}
private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row, boolean full) {
ReadWriteSingleRowReplicaRequest message = TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RW_UPSERT)
.transactionId(txId)
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
.build();
return partitionReplicaListener.invoke(message, localNode.id());
}
private void delete(UUID txId, BinaryRow row) {
ReadWriteSingleRowPkReplicaRequest message = TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RW_DELETE)
.transactionId(txId)
.schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
assertThat(partitionReplicaListener.invoke(message, localNode.id()), willCompleteSuccessfully());
}
private BinaryRow roGet(BinaryRow row, long readTimestamp) {
CompletableFuture<BinaryRow> roGetAsync = roGetAsync(row, readTimestamp);
assertThat(roGetAsync, willCompleteSuccessfully());
return roGetAsync.join();
}
private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row, long readTimestamp) {
ReadOnlySingleRowPkReplicaRequest message = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET)
.readTimestampLong(readTimestamp)
.schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.build();
return partitionReplicaListener.invoke(message, localNode.id()).thenApply(replicaResult -> (BinaryRow) replicaResult.result());
}
private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
CompletableFuture<ReplicaResult> future = doReadOnlyMultiGet(rows, readTimestamp);
assertThat(future, willCompleteSuccessfully());
return (List<BinaryRow>) future.join().result();
}
private CompletableFuture<ReplicaResult> doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp.longValue())
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(rows))
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
private CompletableFuture<ReplicaResult> doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
ReadOnlyDirectMultiRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(rows))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
private void cleanup(UUID txId) {
HybridTimestamp commitTs = clock.now();
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID().toString(), commitPartitionId, commitTs));
WriteIntentSwitchReplicaRequest message = TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
.groupId(grpId)
.txId(txId)
.commit(true)
.commitTimestampLong(commitTs.longValue())
.build();
assertThat(partitionReplicaListener.invoke(message, localNode.id()), willCompleteSuccessfully());
txState = COMMITTED;
}
private BinaryTupleMessage toIndexBound(int val) {
ByteBuffer tuple = new BinaryTuplePrefixBuilder(1, 1).appendInt(val).build();
return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
.tuple(tuple)
.elementCount(1)
.build();
}
private BinaryTupleMessage toIndexKey(int val) {
ByteBuffer tuple = new BinaryTupleBuilder(1).appendInt(val).build();
return TABLE_MESSAGES_FACTORY.binaryTupleMessage()
.tuple(tuple)
.elementCount(1)
.build();
}
private BinaryRow nextBinaryKey() {
return marshalQuietly(nextKey(), kvMarshaller);
}
private static TestKey nextKey() {
return new TestKey(monotonicInt(), "key " + monotonicInt());
}
private static int monotonicInt() {
return nextMonotonicInt.getAndIncrement();
}
protected BinaryRow binaryRow(int i) {
return binaryRow(new TestKey(i, "k" + i), new TestValue(i, "v" + i));
}
private BinaryRow binaryRow(TestKey key, TestValue value) {
return binaryRow(key, value, kvMarshaller);
}
private static BinaryRow binaryRow(TestKey key, TestValue value, KvMarshaller<TestKey, TestValue> marshaller) {
try {
return marshaller.marshal(key, value);
} catch (MarshallerException e) {
throw new AssertionError(e);
}
}
private TestKey key(BinaryRow binaryRow) {
try {
return kvMarshaller.unmarshalKeyOnly(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, binaryRow));
} catch (MarshallerException e) {
throw new AssertionError(e);
}
}
private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
return TABLE_MESSAGES_FACTORY.binaryRowMessage()
.binaryTuple(binaryRow.tupleSlice())
.schemaVersion(binaryRow.schemaVersion())
.build();
}
/**
* Test pojo key.
*/
private static class TestKey {
@IgniteToStringInclude
public int intKey;
@IgniteToStringInclude
public String strKey;
public TestKey() {
}
public TestKey(int intKey, String strKey) {
this.intKey = intKey;
this.strKey = strKey;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestKey testKey = (TestKey) o;
return intKey == testKey.intKey && Objects.equals(strKey, testKey.strKey);
}
@Override
public int hashCode() {
return Objects.hash(intKey, strKey);
}
@Override
public String toString() {
return S.toString(TestKey.class, this);
}
}
/**
* Test pojo value.
*/
private static class TestValue implements Comparable<TestValue> {
@IgniteToStringInclude
public Integer intVal;
@IgniteToStringInclude
public String strVal;
public TestValue() {
}
public TestValue(Integer intVal, String strVal) {
this.intVal = intVal;
this.strVal = strVal;
}
@Override
public int compareTo(TestValue o) {
int cmp = Integer.compare(intVal, o.intVal);
return cmp != 0 ? cmp : strVal.compareTo(o.strVal);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestValue testValue = (TestValue) o;
return Objects.equals(intVal, testValue.intVal) && Objects.equals(strVal, testValue.strVal);
}
@Override
public int hashCode() {
return Objects.hash(intVal, strVal);
}
@Override
public String toString() {
return S.toString(TestValue.class, this);
}
}
@FunctionalInterface
private interface RwListenerInvocation {
CompletableFuture<?> invoke(UUID targetTxId, TestKey key);
}
@FunctionalInterface
private interface RoListenerInvocation {
CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key);
}
@ParameterizedTest(name = "readOnly = {0}")
@ValueSource(booleans = {true, false})
void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) {
fireHashIndexStartBuildingEventForStaleTxOperation(hashIndexStorage.id(), 1, 2);
UUID txId = newTxId();
long beginTs = beginTimestamp(txId).longValue();
when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
BinaryRow row = binaryRow(0);
if (readOnly) {
assertThat(roGetAsync(row, clock.nowLong()), willCompleteSuccessfully());
} else {
assertThat(upsertAsync(txId, row), willThrow(StaleTransactionOperationException.class));
}
}
@Test
void testBuildIndexReplicaRequestWithoutRwTxOperations() {
int indexId = hashIndexStorage.id();
int indexCreationCatalogVersion = 1;
CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
indexId,
indexCreationCatalogVersion
);
assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion, indexCreationCatalogVersion + 1);
assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
assertThat(invokeBuildIndexReplicaRequestAsync(indexId, indexCreationCatalogVersion), willCompleteSuccessfully());
}
@ParameterizedTest(name = "failCmd = {0}")
@ValueSource(booleans = {false, true})
void testBuildIndexReplicaRequest(boolean failCmd) {
var continueNotBuildIndexCmdFuture = new CompletableFuture<Void>();
var buildIndexCommandFuture = new CompletableFuture<BuildIndexCommand>();
when(mockRaftClient.run(any())).thenAnswer(invocation -> {
Command cmd = invocation.getArgument(0);
if (cmd instanceof BuildIndexCommand) {
buildIndexCommandFuture.complete((BuildIndexCommand) cmd);
return raftClientFutureClosure.apply(cmd);
}
return continueNotBuildIndexCmdFuture.thenCompose(unused -> raftClientFutureClosure.apply(cmd));
});
UUID txId = newTxId();
long beginTs = beginTimestamp(txId).longValue();
when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
BinaryRow row = binaryRow(0);
CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true);
int indexId = hashIndexStorage.id();
int indexCreationCatalogVersion = 1;
int indexStartBuildingCatalogVersion = 2;
CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(
indexId,
indexCreationCatalogVersion
);
fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion, indexStartBuildingCatalogVersion);
assertFalse(upsertFuture.isDone());
assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
if (failCmd) {
continueNotBuildIndexCmdFuture.completeExceptionally(new RuntimeException("error from test"));
assertThat(upsertFuture, willThrow(RuntimeException.class));
} else {
continueNotBuildIndexCmdFuture.complete(null);
assertThat(upsertFuture, willCompleteSuccessfully());
}
assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
HybridTimestamp startBuildingIndexActivationTs = hybridTimestamp(catalogService.catalog(indexStartBuildingCatalogVersion).time());
verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs));
assertThat(buildIndexCommandFuture, willCompleteSuccessfully());
BuildIndexCommand buildIndexCommand = buildIndexCommandFuture.join();
assertThat(buildIndexCommand.indexId(), equalTo(indexId));
assertThat(buildIndexCommand.creationCatalogVersion(), equalTo(indexCreationCatalogVersion));
assertThat(buildIndexCommand.requiredCatalogVersion(), equalTo(indexStartBuildingCatalogVersion));
}
private void fireHashIndexStartBuildingEventForStaleTxOperation(
int indexId,
int creationIndexCatalogVersion,
int startBuildingIndexCatalogVersion
) {
var registeredIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
var buildingIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
when(registeredIndexDescriptor.id()).thenReturn(indexId);
when(buildingIndexDescriptor.id()).thenReturn(indexId);
when(buildingIndexDescriptor.tableId()).thenReturn(TABLE_ID);
when(registeredIndexDescriptor.tableId()).thenReturn(TABLE_ID);
when(registeredIndexDescriptor.status()).thenReturn(REGISTERED);
when(buildingIndexDescriptor.status()).thenReturn(BUILDING);
when(buildingIndexDescriptor.txWaitCatalogVersion()).thenReturn(creationIndexCatalogVersion);
when(catalogService.index(eq(indexId), eq(creationIndexCatalogVersion))).thenReturn(registeredIndexDescriptor);
when(catalogService.index(eq(indexId), eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexDescriptor);
when(catalogService.latestCatalogVersion()).thenReturn(startBuildingIndexCatalogVersion);
var registeredIndexCatalog = mock(Catalog.class);
var buildingIndexCatalog = mock(Catalog.class);
when(registeredIndexCatalog.version()).thenReturn(creationIndexCatalogVersion);
when(buildingIndexCatalog.version()).thenReturn(startBuildingIndexCatalogVersion);
long registeredIndexActivationTs = clock.now().addPhysicalTime(-100).longValue();
long buildingIndexActivationTs = clock.nowLong();
when(registeredIndexCatalog.time()).thenReturn(registeredIndexActivationTs);
when(buildingIndexCatalog.time()).thenReturn(buildingIndexActivationTs);
when(catalogService.catalog(eq(creationIndexCatalogVersion))).thenReturn(registeredIndexCatalog);
when(catalogService.catalog(eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexCatalog);
assertThat(
catalogServiceEventProducer.fireEvent(
INDEX_BUILDING,
new StartBuildingIndexEventParameters(0L, startBuildingIndexCatalogVersion, indexId)
),
willCompleteSuccessfully()
);
}
private CompletableFuture<?> invokeBuildIndexReplicaRequestAsync(int indexId, int indexCreationCatalogVersion) {
BuildIndexReplicaRequest request = TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest()
.groupId(grpId)
.indexId(indexId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.creationCatalogVersion(indexCreationCatalogVersion)
.rowIds(List.of())
.build();
return partitionReplicaListener.invoke(request, localNode.id());
}
}