blob: 4a048231db02f8f9bfb753ad5960d9a28bc31471 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.schemasync;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
/**
* Tests about interaction between Schema Synchronization and Replication.
*/
@SuppressWarnings("resource")
class ItSchemaSyncAndReplicationTest extends ClusterPerTestIntegrationTest {
private static final int NODES_TO_START = 3;
private static final String TABLE_NAME = "TEST";
private final LogInspector appendEntriesInterceptorInspector = LogInspector.create(CheckCatalogVersionOnAppendEntries.class, true);
@Override
protected int initialNodes() {
return NODES_TO_START;
}
@AfterEach
void stopLogInspector() {
appendEntriesInterceptorInspector.stop();
}
/**
* The replication mechanism must not replicate commands for which schemas are not yet available on the node
* to which replication happens (in Raft, it means that followers/learners cannot receive commands that they
* cannot execute without waiting for schemas). This method tests this scenario.
*/
@Test
void laggingSchemasPreventPartitionDataReplication() throws Exception {
createTestTableWith3Replicas();
final int notInhibitedNodeIndex = 0;
transferLeadershipsTo(notInhibitedNodeIndex);
IgniteImpl nodeToInhibitMetaStorage = cluster.node(1);
WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
listenerInhibitor.startInhibit();
try {
CompletableFuture<?> rejectionTriggered = rejectionDueToMetadataLagTriggered();
updateTableSchemaAt(notInhibitedNodeIndex);
putToTableAt(notInhibitedNodeIndex);
assertThat("Did not see rejections due to lagging metadata", rejectionTriggered, willSucceedIn(10, TimeUnit.SECONDS));
assertTrue(solePartitionIsEmpty(nodeToInhibitMetaStorage), "Something was written to the partition");
listenerInhibitor.stopInhibit();
assertTrue(
waitForCondition(() -> !solePartitionIsEmpty(nodeToInhibitMetaStorage), 10_000),
"Nothing was written to partition even after inhibiting was cancelled"
);
} finally {
listenerInhibitor.stopInhibit();
}
}
private void createTestTableWith3Replicas() {
String zoneSql = "create zone test_zone with partitions=1, replicas=3";
String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20))"
+ " with primary_zone='TEST_ZONE'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
}
private void transferLeadershipsTo(int nodeIndex) throws InterruptedException {
cluster.transferLeadershipTo(nodeIndex, MetastorageGroupId.INSTANCE);
cluster.transferLeadershipTo(nodeIndex, cluster.solePartitionId());
}
private CompletableFuture<?> rejectionDueToMetadataLagTriggered() {
CompletableFuture<?> rejectionTriggered = new CompletableFuture<>();
appendEntriesInterceptorInspector.addHandler(
event -> event.getMessage().getFormattedMessage().startsWith("Metadata not yet available, rejecting AppendEntriesRequest"),
() -> rejectionTriggered.complete(null)
);
return rejectionTriggered;
}
private void putToTableAt(int nodeIndex) {
cluster.node(nodeIndex)
.tables()
.table(TABLE_NAME)
.keyValueView()
.put(null, Tuple.create().set("key", 1), Tuple.create().set("val", "one"));
}
private void updateTableSchemaAt(int nodeIndex) {
cluster.doInSession(nodeIndex, session -> {
session.execute(null, "alter table " + TABLE_NAME + " add column added int");
});
}
private static boolean solePartitionIsEmpty(IgniteImpl node) {
MvPartitionStorage mvPartitionStorage = solePartitionStorage(node);
RowId rowId = IgniteTestUtils.bypassingThreadAssertions(() -> mvPartitionStorage.closestRowId(RowId.lowestRowId(0)));
return rowId == null;
}
private static MvPartitionStorage solePartitionStorage(IgniteImpl node) {
// We use this api because there is no waiting for schemas to sync.
TableViewInternal table = unwrapTableManager(node.tables()).cachedTable(TABLE_NAME);
assertNotNull(table);
MvPartitionStorage mvPartitionStorage = table.internalTable().storage().getMvPartition(0);
assertThat(mvPartitionStorage, is(notNullValue()));
return mvPartitionStorage;
}
}