blob: ada4b72b3bd6fe4fe64636d3f04a9e30bd44b92d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.TABLE_CREATE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.TABLE_DROP;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.sql.ColumnType.INT64;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Tests scenarios for table manager. */
@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerTest extends IgniteAbstractTest {
/** The name of the table which is preconfigured. */
private static final String PRECONFIGURED_TABLE_NAME = "T1";
/** The name of the table which will be configured dynamically. */
private static final String DYNAMIC_TABLE_NAME = "T2";
/** The name of table to drop it. */
private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "T3";
/** Table partitions. */
private static final int PARTITIONS = 32;
/** Node name. */
private static final String NODE_NAME = "node1";
/** Count of replicas. */
private static final int REPLICAS = 1;
/** Zone name. */
private static final String ZONE_NAME = "zone1";
/** Topology service. */
@Mock
private TopologyService ts;
/** Raft manager. */
@Mock
private Loza rm;
/** Replica manager. */
@Mock
private ReplicaManager replicaMgr;
/** TX manager. */
@Mock
private TxManager tm;
/** Meta storage manager. */
@Mock
private MetaStorageManager msm;
/** Mock cluster service. */
@Mock
private ClusterService clusterService;
private volatile MvTableStorage mvTableStorage;
private volatile TxStateTableStorage txStateTableStorage;
/** Revision updater. */
private Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater;
/** Garbage collector configuration. */
@InjectConfiguration
private GcConfiguration gcConfig;
@InjectConfiguration
private TransactionConfiguration txConfig;
/** Storage update configuration. */
@InjectConfiguration
private StorageUpdateConfiguration storageUpdateConfiguration;
@InjectConfiguration("mock = {profiles.default = {engine = \"aipersist\"}}")
private StorageConfiguration storageConfiguration;
@Mock
private ConfigurationRegistry configRegistry;
private DataStorageManager dsm;
private SchemaManager sm;
private DistributionZoneManager distributionZoneManager;
/** Test node. */
private final ClusterNode node = new ClusterNodeImpl(
UUID.randomUUID().toString(),
NODE_NAME,
new NetworkAddress("127.0.0.1", 2245)
);
/** The future will be completed after each tests of this class. */
private CompletableFuture<TableManager> tblManagerFut;
/** Hybrid clock. */
private final HybridClock clock = new HybridClockImpl();
/** Catalog metastore. */
private MetaStorageManager catalogMetastore;
/** Catalog manager. */
private CatalogManager catalogManager;
private ExecutorService partitionOperationsExecutor;
private TestLowWatermark lowWatermark;
@BeforeEach
void before() throws NodeStoppingException {
lowWatermark = new TestLowWatermark();
catalogMetastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(NODE_NAME));
catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, catalogMetastore);
assertThat(allOf(catalogMetastore.start(), catalogManager.start()), willCompleteSuccessfully());
revisionUpdater = (LongFunction<CompletableFuture<?>> function) -> catalogMetastore.registerRevisionUpdateListener(function::apply);
assertThat(catalogMetastore.deployWatches(), willCompleteSuccessfully());
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
TopologyService topologyService = mock(TopologyService.class);
when(clusterService.topologyService()).thenReturn(topologyService);
when(topologyService.localMember()).thenReturn(node);
distributionZoneManager = mock(DistributionZoneManager.class);
when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt())).thenReturn(emptySetCompletedFuture());
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
tblManagerFut = new CompletableFuture<>();
mockMetastore();
partitionOperationsExecutor = new ThreadPoolExecutor(
0, 5,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
IgniteThreadFactory.create("test", "partition-operations", log, STORAGE_READ, STORAGE_WRITE)
);
}
@AfterEach
void after() throws Exception {
IgniteUtils.closeAll(
() -> {
assertTrue(tblManagerFut.isDone());
tblManagerFut.join().beforeNodeStop();
tblManagerFut.join().stop();
},
dsm == null ? null : dsm::stop,
sm == null ? null : sm::stop,
catalogManager == null ? null : catalogManager::stop,
catalogMetastore == null ? null : catalogMetastore::stop,
partitionOperationsExecutor == null ? null
: () -> IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, TimeUnit.SECONDS)
);
}
/**
* Tests a table which was preconfigured.
*/
@Test
public void testPreconfiguredTable() throws Exception {
when(rm.startRaftGroupService(any(), any(), any(), any()))
.thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class)));
TableManager tableManager = createTableManager(tblManagerFut);
tblManagerFut.complete(tableManager);
createZone(PARTITIONS, REPLICAS);
createTable(PRECONFIGURED_TABLE_NAME);
assertEquals(1, tableManager.tables().size());
assertNotNull(tableManager.table(PRECONFIGURED_TABLE_NAME));
}
/**
* Tests create a table through public API.
*
* @throws Exception If failed.
*/
@Test
public void testCreateTable() throws Exception {
Table table = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
assertNotNull(table);
assertSame(table, tblManagerFut.join().table(DYNAMIC_TABLE_NAME));
}
/**
* Testing TableManager#writeTableAssignmentsToMetastore for 2 exceptional scenarios:
* 1. the method was interrupted in outer future before invoke calling completion.
* 2. the method was interrupted in inner metastore's future when the result of invocation had gotten, but after error happens;
*
* @throws Exception if something goes wrong on mocks creation.
*/
@Test
public void testWriteTableAssignmentsToMetastoreExceptionally() throws Exception {
TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
int tableId = table.tableId();
TableManager tableManager = tblManagerFut.join();
List<Assignments> assignmentsList = List.of(Assignments.of(Assignment.forPeer(node.id())));
// the first case scenario
CompletableFuture<List<Assignments>> assignmentsFuture = new CompletableFuture<>();
var outerExceptionMsg = "Outer future is interrupted";
assignmentsFuture.completeExceptionally(new TimeoutException(outerExceptionMsg));
CompletableFuture<List<Assignments>> writtenAssignmentsFuture = tableManager
.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, outerExceptionMsg);
// the second case scenario
assignmentsFuture = completedFuture(assignmentsList);
CompletableFuture<Boolean> invokeTimeoutFuture = new CompletableFuture<>();
var innerExceptionMsg = "Inner future is interrupted";
invokeTimeoutFuture.completeExceptionally(new TimeoutException(innerExceptionMsg));
when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(invokeTimeoutFuture);
writtenAssignmentsFuture = tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, innerExceptionMsg);
}
/**
* Tests drop a table through public API.
*
* @throws Exception If failed.
*/
@Test
public void testDropTable() throws Exception {
mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut);
TableManager tableManager = tblManagerFut.join();
dropTable(DYNAMIC_TABLE_FOR_DROP_NAME);
assertNull(tableManager.table(DYNAMIC_TABLE_FOR_DROP_NAME));
assertEquals(0, tableManager.tables().size());
verify(mvTableStorage, atMost(0)).destroy();
verify(txStateTableStorage, atMost(0)).destroy();
verify(replicaMgr, atMost(0)).stopReplica(any());
assertThat(fireDestroyEvent(), willCompleteSuccessfully());
verify(mvTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
verify(txStateTableStorage, timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
verify(replicaMgr, timeout(TimeUnit.SECONDS.toMillis(10)).times(PARTITIONS)).stopReplica(any());
}
/**
* Tests create a table through public API right after another table with the same name was dropped.
*
* @throws Exception If failed.
*/
@Test
public void testReCreateTableWithSameName() throws Exception {
mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
TableManager tableManager = tblManagerFut.join();
TableViewInternal table = (TableViewInternal) tableManager.table(DYNAMIC_TABLE_NAME);
assertNotNull(table);
int oldTableId = table.tableId();
dropTable(DYNAMIC_TABLE_NAME);
createTable(DYNAMIC_TABLE_NAME);
table = tableManager.tableView(DYNAMIC_TABLE_NAME);
assertNotNull(table);
assertNotEquals(oldTableId, table.tableId());
assertNotNull(tableManager.cachedTable(oldTableId));
assertNotNull(tableManager.cachedTable(table.tableId()));
assertNotSame(tableManager.cachedTable(oldTableId), tableManager.cachedTable(table.tableId()));
}
/**
* Tests a work of the public API for Table manager {@see org.apache.ignite.table.manager.IgniteTables} when the manager is stopping.
*/
@Test
public void testApiTableManagerOnStop() throws Exception {
createTableManager(tblManagerFut);
TableManager tableManager = tblManagerFut.join();
tableManager.beforeNodeStop();
tableManager.stop();
assertThrowsWithCause(tableManager::tables, NodeStoppingException.class);
assertThrowsWithCause(() -> tableManager.table(DYNAMIC_TABLE_FOR_DROP_NAME), NodeStoppingException.class);
assertThat(tableManager.tablesAsync(), willThrow(NodeStoppingException.class));
assertThat(tableManager.tableAsync(DYNAMIC_TABLE_FOR_DROP_NAME), willThrow(NodeStoppingException.class));
}
/**
* Tests a work of the public API for Table manager {@see org.apache.ignite.internal.table.IgniteTablesInternal} when the manager is
* stopping.
*/
@Test
public void testInternalApiTableManagerOnStop() throws Exception {
createTableManager(tblManagerFut);
TableManager tableManager = tblManagerFut.join();
tableManager.beforeNodeStop();
tableManager.stop();
int fakeTblId = 1;
assertThrowsWithCause(() -> tableManager.table(fakeTblId), NodeStoppingException.class);
assertThat(tableManager.tableAsync(fakeTblId), willThrow(NodeStoppingException.class));
}
/**
* Checks that all RAFT nodes will be stopped when Table manager is stopping and an exception that was thrown by one of the
* components will not prevent stopping other components.
*
* @throws Exception If failed.
*/
@Test
public void tableManagerStopTest1() throws Exception {
IgniteBiTuple<TableViewInternal, TableManager> tblAndMnr = startTableManagerStopTest();
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> {
try {
when(rm.stopRaftNodes(any())).thenThrow(NodeStoppingException.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Checks that all RAFT nodes will be stopped when Table manager is stopping and an exception that was thrown by one of the
* components will not prevent stopping other components.
*
* @throws Exception If failed.
*/
@Test
public void tableManagerStopTest2() throws Exception {
IgniteBiTuple<TableViewInternal, TableManager> tblAndMnr = startTableManagerStopTest();
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> {
try {
when(replicaMgr.stopReplica(any())).thenThrow(NodeStoppingException.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Checks that all RAFT nodes will be stopped when Table manager is stopping and an exception that was thrown by one of the
* components will not prevent stopping other components.
*
* @throws Exception If failed.
*/
@Test
public void tableManagerStopTest3() throws Exception {
IgniteBiTuple<TableViewInternal, TableManager> tblAndMnr = startTableManagerStopTest();
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> {
try {
doThrow(new RuntimeException("Test exception")).when(tblAndMnr.get1().internalTable().storage()).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Checks that all RAFT nodes will be stopped when Table manager is stopping and an exception that was thrown by one of the
* components will not prevent stopping other components.
*
* @throws Exception If failed.
*/
@Test
public void tableManagerStopTest4() throws Exception {
IgniteBiTuple<TableViewInternal, TableManager> tblAndMnr = startTableManagerStopTest();
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> doThrow(new RuntimeException()).when(tblAndMnr.get1().internalTable().txStateStorage()).close());
}
private IgniteBiTuple<TableViewInternal, TableManager> startTableManagerStopTest() throws Exception {
TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut);
verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any(), any());
TableManager tableManager = tblManagerFut.join();
return new IgniteBiTuple<>(table, tableManager);
}
private void endTableManagerStopTest(TableViewInternal table, TableManager tableManager, Runnable mockDoThrow) throws Exception {
mockDoThrow.run();
tableManager.beforeNodeStop();
tableManager.stop();
verify(rm, times(PARTITIONS)).stopRaftNodes(any());
verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
verify(table.internalTable().storage()).close();
verify(table.internalTable().txStateStorage()).close();
}
/**
* Instantiates a table and prepares Table manager.
*/
@Test
public void testGetTableDuringCreation() {
Phaser phaser = new Phaser(2);
CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() -> {
try {
return mockManagersAndCreateTableWithDelay(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut, phaser);
} catch (Exception e) {
fail(e.getMessage());
}
return null;
});
CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
return tblManagerFut.join().table(DYNAMIC_TABLE_FOR_DROP_NAME);
});
CompletableFuture<Collection<Table>> getAllTablesFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
return tblManagerFut.join().tables();
});
assertFalse(createFut.isDone());
assertFalse(getFut.isDone());
assertFalse(getAllTablesFut.isDone());
phaser.arrive();
assertSame(createFut.join(), getFut.join());
assertEquals(1, getAllTablesFut.join().size());
}
@Test
void testStoragesGetClearedInMiddleOfFailedTxStorageRebalance() throws Exception {
testStoragesGetClearedInMiddleOfFailedRebalance(true);
}
@Test
void testStoragesGetClearedInMiddleOfFailedPartitionStorageRebalance() throws Exception {
testStoragesGetClearedInMiddleOfFailedRebalance(false);
}
/**
* Emulates a situation, when either a TX state storage or partition storage were stopped in a middle of a rebalance. We then expect
* that these storages get cleared upon startup.
*
* @param isTxStorageUnderRebalance When {@code true} - TX state storage is emulated as being under rebalance, when {@code false} -
* partition storage is emulated instead.
*/
private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorageUnderRebalance) throws NodeStoppingException {
when(rm.startRaftGroupService(any(), any(), any(), any()))
.thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class)));
createZone(1, 1);
var txStateStorage = mock(TxStateStorage.class);
var mvPartitionStorage = mock(MvPartitionStorage.class);
if (isTxStorageUnderRebalance) {
// Emulate a situation when TX state storage was stopped in a middle of rebalance.
when(txStateStorage.lastAppliedIndex()).thenReturn(TxStateStorage.REBALANCE_IN_PROGRESS);
} else {
// Emulate a situation when partition storage was stopped in a middle of rebalance.
when(mvPartitionStorage.lastAppliedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);
}
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
when(txStateStorage.clear()).thenReturn(nullCompletedFuture());
when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
// For some reason, "when(something).thenReturn" does not work on spies, but this notation works.
createTableManager(tblManagerFut, (mvTableStorage) -> {
doReturn(completedFuture(mvPartitionStorage)).when(mvTableStorage).createMvPartition(anyInt());
doReturn(mvPartitionStorage).when(mvTableStorage).getMvPartition(anyInt());
doReturn(nullCompletedFuture()).when(mvTableStorage).clearPartition(anyInt());
}, (txStateTableStorage) -> {
doReturn(txStateStorage).when(txStateTableStorage).getOrCreateTxStateStorage(anyInt());
doReturn(txStateStorage).when(txStateTableStorage).getTxStateStorage(anyInt());
});
createTable(PRECONFIGURED_TABLE_NAME);
verify(txStateStorage, timeout(1000)).clear();
verify(mvTableStorage, timeout(1000)).clearPartition(anyInt());
}
/**
* Instantiates Table manager and creates a table in it.
*
* @param tableName Table name.
* @param tblManagerFut Future for table manager.
* @return Table.
* @throws Exception If something went wrong.
*/
private TableViewInternal mockManagersAndCreateTable(String tableName, CompletableFuture<TableManager> tblManagerFut) throws Exception {
return mockManagersAndCreateTableWithDelay(tableName, tblManagerFut, null);
}
/** Dummy metastore activity mock. */
private void mockMetastore() {
when(msm.prefix(any())).thenReturn(subscriber -> {
subscriber.onSubscribe(mock(Subscription.class));
subscriber.onComplete();
});
when(msm.invoke(any(), any(Operation.class), any(Operation.class))).thenReturn(trueCompletedFuture());
when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(trueCompletedFuture());
when(msm.get(any())).thenReturn(nullCompletedFuture());
when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
when(msm.prefixLocally(any(), anyLong())).thenReturn(CursorUtils.emptyCursor());
}
/**
* Instantiates a table and prepares Table manager. When the latch would open, the method completes.
*
* @param tableName Table name.
* @param tblManagerFut Future for table manager.
* @param phaser Phaser for the wait.
* @return Table manager.
* @throws Exception If something went wrong.
*/
private TableViewInternal mockManagersAndCreateTableWithDelay(
String tableName,
CompletableFuture<TableManager> tblManagerFut,
@Nullable Phaser phaser
) throws Exception {
String consistentId = "node0";
when(rm.startRaftGroupService(any(), any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(TopologyAwareRaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
return completedFuture(raftGrpSrvcMock);
});
when(ts.getByConsistentId(any())).thenReturn(new ClusterNodeImpl(
UUID.randomUUID().toString(),
consistentId,
new NetworkAddress("localhost", 47500)
));
try (MockedStatic<SchemaUtils> schemaServiceMock = mockStatic(SchemaUtils.class)) {
schemaServiceMock.when(() -> SchemaUtils.prepareSchemaDescriptor(any()))
.thenReturn(mock(SchemaDescriptor.class));
}
try (MockedStatic<AffinityUtils> affinityServiceMock = mockStatic(AffinityUtils.class)) {
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++) {
assignment.add(new ArrayList<>(Collections.singleton(node)));
}
affinityServiceMock.when(() -> AffinityUtils.calculateAssignments(any(), anyInt(), anyInt()))
.thenReturn(assignment);
}
TableManager tableManager = createTableManager(tblManagerFut);
int tablesBeforeCreation = tableManager.tables().size();
if (phaser != null) {
catalogManager.listen(TABLE_CREATE, parameters -> {
phaser.arriveAndAwaitAdvance();
return falseCompletedFuture();
});
catalogManager.listen(TABLE_DROP, parameters -> {
phaser.arriveAndAwaitAdvance();
return falseCompletedFuture();
});
}
createZone(PARTITIONS, REPLICAS);
createTable(tableName);
TableViewInternal tbl2 = tableManager.tableView(tableName);
assertNotNull(tbl2);
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
return tbl2;
}
private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut) {
return createTableManager(tblManagerFut, unused -> {}, unused -> {});
}
/**
* Creates Table manager.
*
* @param tblManagerFut Future to wrap Table manager.
* @param tableStorageDecorator Table storage spy decorator.
* @param txStateTableStorageDecorator Tx state table storage spy decorator.
*
* @return Table manager.
*/
private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
TableManager tableManager = new TableManager(
NODE_NAME,
revisionUpdater,
gcConfig,
txConfig,
storageUpdateConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
clusterService.serializationRegistry(),
rm,
replicaMgr,
null,
null,
tm,
dsm = createDataStorageManager(configRegistry, workDir),
workDir,
msm,
sm = new SchemaManager(revisionUpdater, catalogManager),
budgetView -> new LocalLogStorageFactory(),
partitionOperationsExecutor,
partitionOperationsExecutor,
clock,
new TestClockService(clock),
new OutgoingSnapshotsManager(clusterService.messagingService()),
mock(TopologyAwareRaftGroupServiceFactory.class),
distributionZoneManager,
new AlwaysSyncedSchemaSyncService(),
catalogManager,
new HybridTimestampTracker(),
new TestPlacementDriver(node),
() -> mock(IgniteSql.class),
new RemotelyTriggeredResourceRegistry(),
mock(ScheduledExecutorService.class),
lowWatermark,
mock(TransactionInflights.class)
) {
@Override
protected MvTableStorage createTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
mvTableStorage = spy(super.createTableStorage(tableDescriptor, zoneDescriptor));
tableStorageDecorator.accept(mvTableStorage);
return mvTableStorage;
}
@Override
protected TxStateTableStorage createTxStateTableStorage(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor
) {
txStateTableStorage = spy(super.createTxStateTableStorage(tableDescriptor, zoneDescriptor));
txStateTableStorageDecorator.accept(txStateTableStorage);
return txStateTableStorage;
}
};
assertThat(allOf(sm.start(), tableManager.start()), willCompleteSuccessfully());
tblManagerFut.complete(tableManager);
return tableManager;
}
private DataStorageManager createDataStorageManager(
ConfigurationRegistry mockedRegistry,
Path storagePath
) {
when(mockedRegistry.getConfiguration(StorageConfiguration.KEY)).thenReturn(storageConfiguration);
DataStorageModules dataStorageModules = new DataStorageModules(
List.of(new PersistentPageMemoryDataStorageModule())
);
DataStorageManager manager = new DataStorageManager(
dataStorageModules.createStorageEngines(
NODE_NAME,
mockedRegistry,
storagePath,
null,
mock(FailureProcessor.class),
mock(LogSyncer.class)
),
storageConfiguration
);
assertThat(manager.start(), willCompleteSuccessfully());
return manager;
}
private void createZone(int partitions, int replicas) {
DistributionZonesTestUtil.createZone(catalogManager, ZONE_NAME, partitions, replicas);
}
private void createTable(String tableName) {
TableTestUtils.createTable(
catalogManager,
DEFAULT_SCHEMA_NAME,
ZONE_NAME,
tableName,
List.of(
ColumnParams.builder().name("key").type(INT64).build(),
ColumnParams.builder().name("val").type(INT64).nullable(true).build()
),
List.of("key")
);
}
private void dropTable(String tableName) {
TableTestUtils.dropTable(catalogManager, DEFAULT_SCHEMA_NAME, tableName);
}
private Collection<CatalogTableDescriptor> allTableDescriptors() {
return catalogManager.tables(catalogManager.latestCatalogVersion());
}
private CompletableFuture<Void> fireDestroyEvent() {
return lowWatermark.updateAndNotify(clock.now());
}
}