blob: 8e71a677bf155f669638259f0865cfeb28c670c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.rebalance;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Tests for recovery of the rebalance procedure.
*/
public class ItRebalanceTriggersRecoveryTest extends ClusterPerTestIntegrationTest {
private static final String US_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " nodeAttributes: {\n"
+ " nodeAttributes: {region: {attribute: \"US\"}, zone: {attribute: \"global\"}}\n"
+ " },\n"
+ " rest.port: {}\n"
+ "}";
private static final String GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " nodeAttributes: {\n"
+ " nodeAttributes: {zone: {attribute: \"global\"}}\n"
+ " },\n"
+ " rest.port: {}\n"
+ "}";
@Override
protected int initialNodes() {
return 1;
}
@Test
void testRebalanceTriggersRecoveryAfterFilterUpdate() throws InterruptedException {
// The nodes from different regions/zones needed to implement the predictable way of nodes choice.
startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
cluster.doInSession(0, session -> {
session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=2, DATA_NODES_FILTER='$[?(@.region == \"US\")]'");
session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'");
session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000));
assertFalse(containsPartition(cluster.node(2)));
// By this we guarantee, that there will no any partition data nodes, which will be available to perform the rebalance.
// To run the actual changePeersAsync we need the partition leader, which catch the metastore event about new pending keys.
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
cluster.doInSession(0, session -> {
session.execute(null, "ALTER ZONE TEST_ZONE SET DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
});
// Check that metastore node schedule the rebalance procedure.
assertTrue(waitForCondition(
(() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of(
Assignment.forPeer(node(2).name()),
Assignment.forPeer(node(1).name())))),
10_000));
// Remove the pending keys in a barbarian way. So, the rebalance can be triggered only by the recovery logic now.
Integer tableId = getTableId(node(0).catalogManager(), "TEST", new HybridClockImpl().nowLong());
node(0)
.metaStorageManager()
.remove(pendingPartAssignmentsKey(new TablePartitionId(tableId, 0))).join();
restartNode(1);
restartNode(2);
// Check that new replica from 'global' zone received the data and rebalance really happened.
assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000));
}
@Test
void testRebalanceTriggersRecoveryAfterReplicasUpdate() throws InterruptedException {
// The nodes from different regions/zones needed to implement the predictable way of nodes choice.
startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
cluster.doInSession(0, session -> {
session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=1, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'");
session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000));
assertFalse(containsPartition(cluster.node(2)));
// By this we guarantee, that there will no any partition data nodes, which will be available to perform the rebalance.
// To run the actual changePeersAsync we need the partition leader, which catch the metastore event about new pending keys.
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
cluster.doInSession(0, session -> {
session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2");
});
// Check that metastore node schedule the rebalance procedure.
assertTrue(waitForCondition(
(() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of(
Assignment.forPeer(node(2).name()),
Assignment.forPeer(node(1).name())))),
10_000));
// Remove the pending keys in a barbarian way. So, the rebalance can be triggered only by the recovery logic now.
Integer tableId = getTableId(node(0).catalogManager(), "TEST", new HybridClockImpl().nowLong());
node(0)
.metaStorageManager()
.remove(pendingPartAssignmentsKey(new TablePartitionId(tableId, 0))).join();
restartNode(1);
restartNode(2);
// Check that new replica from 'global' zone received the data and rebalance really happened.
assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000));
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21596")
void testRebalanceTriggersRecoveryWhenUpdatesWereProcessedByAnotherNodesAlready() throws Exception {
// The nodes from different regions/zones needed to implement the predictable way of nodes choice.
startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(3);
cluster.doInSession(0, session -> {
session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=1, DATA_NODES_FILTER='$[?(@.region == \"US\")]'");
session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'");
session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000));
assertFalse(containsPartition(cluster.node(2)));
stopNode(3);
cluster.doInSession(0, session -> {
session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
});
// Check that new replica from 'global' zone received the data and rebalance really happened.
assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000));
assertTrue(waitForCondition(
(() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of())),
10_000));
TablePartitionId tablePartitionId =
new TablePartitionId(
getTableId(node(0).catalogManager(),
"TEST",
new HybridClockImpl().nowLong()),
0
);
long pendingsKeysRevisionBeforeRecovery = node(0).metaStorageManager()
.get(pendingPartAssignmentsKey(tablePartitionId))
.get(10, TimeUnit.SECONDS).revision();
startNode(3, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
long pendingsKeysRevisionAfterRecovery = node(0).metaStorageManager()
.get(pendingPartAssignmentsKey(tablePartitionId))
.get(10, TimeUnit.SECONDS).revision();
// Check that recovered node doesn't produce new rebalances for already processed triggers.
assertEquals(pendingsKeysRevisionBeforeRecovery, pendingsKeysRevisionAfterRecovery);
}
private static Set<Assignment> getPartitionPendingClusterNodes(IgniteImpl node, int partNum) {
return Optional.ofNullable(getTableId(node.catalogManager(), "TEST", new HybridClockImpl().nowLong()))
.map(tableId -> partitionPendingAssignments(node.metaStorageManager(), tableId, partNum).join())
.orElse(Set.of());
}
private static CompletableFuture<Set<Assignment>> partitionPendingAssignments(
MetaStorageManager metaStorageManager,
int tableId,
int partitionNumber
) {
return metaStorageManager
.get(pendingPartAssignmentsKey(new TablePartitionId(tableId, partitionNumber)))
.thenApply(e -> (e.value() == null) ? null : Assignments.fromBytes(e.value()).nodes());
}
private static boolean containsPartition(Ignite node) {
TableManager tableManager = unwrapTableManager(node.tables());
MvPartitionStorage storage = tableManager.tableView("TEST")
.internalTable()
.storage()
.getMvPartition(0);
return storage != null && bypassingThreadAssertions(storage::rowsCount) != 0;
}
}