blob: d6c9a40d681beae0d645598673045cc7b9227fda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.index;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.index.message.IndexMessagesFactory;
import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeResponse;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/** For testing {@link IndexNodeFinishedRwTransactionsChecker}. */
@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, value = "false")
public class ItIndexNodeFinishedRwTransactionsCheckerTest extends ClusterPerClassIntegrationTest {
private static final IndexMessagesFactory FACTORY = new IndexMessagesFactory();
private static final String TABLE_NAME = "TEST_TABLE";
private static String zoneNameForUpdateCatalogVersionOnly = "FAKE_TEST_ZONE";
@Override
protected int initialNodes() {
return 1;
}
@BeforeEach
void setUp() {
if (node() != null) {
createZoneOnlyIfNotExists(zoneName(TABLE_NAME), 1, 2, TestProfileConfigurationSchema.TEST_PROFILE_NAME);
createZoneOnlyIfNotExists(zoneNameForUpdateCatalogVersionOnly, 1, 1, TestProfileConfigurationSchema.TEST_PROFILE_NAME);
createTableOnly(TABLE_NAME, zoneName(TABLE_NAME));
}
}
@AfterEach
void tearDown() {
if (node() != null) {
sql("DROP TABLE IF EXISTS " + TABLE_NAME);
sql("DROP ZONE IF EXISTS " + zoneName(TABLE_NAME));
sql("DROP ZONE IF EXISTS " + zoneNameForUpdateCatalogVersionOnly);
}
}
@Test
void testNoTransactions() {
int latestCatalogVersion = latestCatalogVersion();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion - 1));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion + 1));
fakeUpdateCatalog();
int newLatestCatalogVersion = latestCatalogVersion();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(newLatestCatalogVersion));
}
@ParameterizedTest(name = "commit = {0}")
@ValueSource(booleans = {true, false})
void testEmptyTransaction(boolean commit) {
int oldLatestCatalogVersion = latestCatalogVersion();
runInTx(commit, tx -> {
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion + 1));
fakeUpdateCatalog();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
});
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
}
@ParameterizedTest(name = "commit = {0}")
@ValueSource(booleans = {true, false})
void testNotEmptyTransaction(boolean commit) {
insertPeople(TABLE_NAME, new Person(0, "0", 0.0));
int oldLatestCatalogVersion = latestCatalogVersion();
runInTx(commit, tx -> {
insertPeople(tx, TABLE_NAME, new Person(1, "1", 1.0));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion + 1));
fakeUpdateCatalog();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
});
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
}
@ParameterizedTest(name = "commit = {0}")
@ValueSource(booleans = {true, false})
void testTransactionInsertIntoMultiplePartitions(boolean commit) {
assertThat(tableImpl().internalTable().partitions(), greaterThan(1));
int oldLatestCatalogVersion = latestCatalogVersion();
runInTx(commit, tx -> {
long[] beforeInserts = partitionSizes();
int id = 0;
do {
insertPeople(tx, TABLE_NAME, new Person(id, String.valueOf(id), id + 0.0));
id++;
} while (differences(beforeInserts, partitionSizes()) < 2);
fakeUpdateCatalog();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
});
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
}
@Test
void testOnePhaseCommitViaKeyValue() {
int oldLatestCatalogVersion = latestCatalogVersion();
TableImpl tableImpl = tableImpl();
var continueUpdateMvPartitionStorageFuture = new CompletableFuture<Void>();
CompletableFuture<Void> awaitStartUpdateAnyMvPartitionStorageFuture = awaitStartUpdateAnyMvPartitionStorage(
tableImpl.internalTable().storage(),
continueUpdateMvPartitionStorageFuture
);
CompletableFuture<Void> putAsync = tableImpl.keyValueView().putAsync(
null,
Tuple.create().set("ID", 0), Tuple.create().set("NAME", "0").set("SALARY", 0.0)
);
assertThat(awaitStartUpdateAnyMvPartitionStorageFuture, willCompleteSuccessfully());
fakeUpdateCatalog();
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertFalse(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
continueUpdateMvPartitionStorageFuture.complete(null);
assertThat(putAsync, willCompleteSuccessfully());
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(oldLatestCatalogVersion));
assertTrue(isNodeFinishedRwTransactionsStartedBeforeFromNetwork(latestCatalogVersion()));
}
private static void runInTx(boolean commit, Consumer<Transaction> consumer) {
Transaction tx = node().transactions().begin(new TransactionOptions().readOnly(false));
try {
consumer.accept(tx);
} finally {
assertThat(commit ? tx.commitAsync() : tx.rollbackAsync(), willCompleteSuccessfully());
}
}
private static TableImpl tableImpl() {
CompletableFuture<Table> tableFuture = node().tables().tableAsync(TABLE_NAME);
assertThat(tableFuture, willBe(notNullValue()));
return unwrapTableImpl(tableFuture.join());
}
private static long[] partitionSizes() {
return IgniteTestUtils.bypassingThreadAssertions(() -> {
InternalTable table = tableImpl().internalTable();
return IntStream.range(0, table.partitions())
.mapToLong(partitionId -> table.storage().getMvPartition(partitionId).rowsCount())
.toArray();
});
}
private static int differences(long[] partitionSizes0, long[] partitionsSizes1) {
assertEquals(partitionSizes0.length, partitionsSizes1.length);
return (int) IntStream.range(0, partitionSizes0.length)
.filter(i -> partitionSizes0[i] != partitionsSizes1[i])
.count();
}
private static void fakeUpdateCatalog() {
int oldLatestCatalogVersion = latestCatalogVersion();
String oldZoneName = zoneNameForUpdateCatalogVersionOnly;
String newZoneName = zoneNameForUpdateCatalogVersionOnly + 0;
sql(String.format("ALTER ZONE %s RENAME TO %s", oldZoneName, newZoneName));
zoneNameForUpdateCatalogVersionOnly = newZoneName;
assertThat(latestCatalogVersion(), greaterThan(oldLatestCatalogVersion));
}
private static int latestCatalogVersion() {
return node().catalogManager().latestCatalogVersion();
}
private static IgniteImpl node() {
return CLUSTER.node(0);
}
private static boolean isNodeFinishedRwTransactionsStartedBeforeFromNetwork(int catalogVersion) {
CompletableFuture<NetworkMessage> invokeFuture = node().clusterService().messagingService().invoke(
node().node(),
FACTORY.isNodeFinishedRwTransactionsStartedBeforeRequest().targetCatalogVersion(catalogVersion).build(),
1_000
);
assertThat(invokeFuture, willCompleteSuccessfully());
return ((IsNodeFinishedRwTransactionsStartedBeforeResponse) invokeFuture.join()).finished();
}
private static CompletableFuture<Void> awaitStartUpdateAnyMvPartitionStorage(
MvTableStorage mvTableStorage,
CompletableFuture<Void> continueUpdateFuture
) {
var awaitStartUpdateAnyMvPartitionStorageFuture = new CompletableFuture<Void>();
for (int partitionId = 0; partitionId < mvTableStorage.getTableDescriptor().getPartitions(); partitionId++) {
MvPartitionStorage mvPartitionStorage = mvTableStorage.getMvPartition(partitionId);
doAnswer(invocation -> {
awaitStartUpdateAnyMvPartitionStorageFuture.complete(null);
assertThat(continueUpdateFuture, willCompleteSuccessfully());
return invocation.callRealMethod();
}).when(mvPartitionStorage).runConsistently(any());
}
return awaitStartUpdateAnyMvPartitionStorageFuture;
}
}