blob: 97a81caf0ffdd9caf3be20fc2832fb3a376bc6d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.rebalance;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
* Test suite for the rebalance.
*/
@SuppressWarnings("resource")
public class ItRebalanceTest extends IgniteIntegrationTest {
@WorkDirectory
private Path workDir;
private Cluster cluster;
private final HybridClock clock = new HybridClockImpl();
@BeforeEach
void createCluster(TestInfo testInfo) {
cluster = new Cluster(testInfo, workDir);
}
@AfterEach
void shutdownCluster() {
cluster.shutdown();
}
/**
* The test checks that data is rebalanced after node with replica is left and joined to the cluster.
*
* @throws Exception If failed.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20996")
@Test
void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
cluster.startAndInit(4);
createZone("TEST_ZONE", 1, 3);
// Creates table with 1 partition and 3 replicas.
createTestTable("TEST_TABLE", "TEST_ZONE");
TableViewInternal table = (TableViewInternal) cluster.node(0).tables().table("TEST_TABLE");
waitForStableAssignmentsInMetastore(Set.of(
nodeName(0),
nodeName(1),
nodeName(2)
), table.tableId());
BinaryRowEx row = marshalTuple(table, Tuple.create().set("id", 1).set("val", "value1"));
BinaryRowEx key = marshalTuple(table, Tuple.create().set("id", 1));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(0).node()), willBe(nullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(1).node()), willBe(nullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(2).node()), willBe(nullValue()));
assertThat(table.internalTable().insert(row, null), willCompleteSuccessfully());
assertThat(table.internalTable().get(key, clock.now(), cluster.node(0).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(1).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(2).node()), willBe(notNullValue()));
assertThat(
table.internalTable().get(key, clock.now(), cluster.node(3).node()),
willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
);
cluster.stopNode(2);
waitForStableAssignmentsInMetastore(Set.of(
nodeName(0),
nodeName(1),
nodeName(3)
), table.tableId());
assertThat(table.internalTable().get(key, clock.now(), cluster.node(0).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(1).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(3).node()), willBe(notNullValue()));
cluster.startNode(2);
waitForStableAssignmentsInMetastore(Set.of(
nodeName(0),
nodeName(1),
nodeName(2)
), table.tableId());
assertThat(table.internalTable().get(key, clock.now(), cluster.node(0).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(1).node()), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(), cluster.node(2).node()), willBe(notNullValue()));
assertThat(
table.internalTable().get(key, clock.now(), cluster.node(3).node()),
willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
);
}
/**
* Tests functionality of tables counters for partition rebalances per zone. Every partition from zone has tables counter when pending
* has changed, and we decrement it every time raft configuration for table's partition is rebalanced, and after this counter equals to
* zero rewrite stable.
*
* @throws Exception If failed
*/
@Test
void testRebalanceTablesCounterForZone() throws Exception {
cluster.startAndInit(3);
String zoneName = "ZONE";
createZone(zoneName, 3, 3);
Set<Integer> tableIds = new HashSet<>();
tableIds.add(createTestTable("TEST1", zoneName));
tableIds.add(createTestTable("TEST2", zoneName));
tableIds.add(createTestTable("TEST3", zoneName));
Set<String> allNodes = cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
for (Integer tableId : tableIds) {
waitForStableAssignmentsInMetastore(allNodes, tableId);
}
AtomicBoolean dropMessages = new AtomicBoolean(true);
cluster.runningNodes().forEach(
n -> n.dropMessages((nodeName, msg) -> msg instanceof ChangePeersAsyncRequest && dropMessages.get())
);
alterZone(zoneName, 3, 2);
CatalogManager catalogManager = cluster.aliveNode().catalogManager();
int zoneId = catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
waitForTablesCounterInMetastore(3, zoneId, 0);
waitForTablesCounterInMetastore(3, zoneId, 1);
waitForTablesCounterInMetastore(3, zoneId, 2);
dropMessages.set(false);
for (Integer tableId : tableIds) {
waitForStableAssignmentsInMetastore(2, tableId);
}
waitForTablesCounterInMetastore(0, zoneId, 0);
waitForTablesCounterInMetastore(0, zoneId, 1);
waitForTablesCounterInMetastore(0, zoneId, 2);
}
private static Row marshalTuple(TableViewInternal table, Tuple tuple) throws TupleMarshallerException {
SchemaRegistry schemaReg = table.schemaView();
var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());
return marshaller.marshal(tuple);
}
private void waitForStableAssignmentsInMetastore(Set<String> expectedNodes, int table) throws InterruptedException {
Set<String>[] lastAssignmentsHolderForLog = new Set[1];
assertTrue(waitForCondition(() -> {
Set<String> assignments =
await(partitionAssignments(cluster.aliveNode().metaStorageManager(), table, 0))
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toSet());
lastAssignmentsHolderForLog[0] = assignments;
return assignments.equals(expectedNodes);
}, 30000), "Expected nodes: " + expectedNodes + ", actual nodes: " + lastAssignmentsHolderForLog[0]);
}
private void waitForStableAssignmentsInMetastore(int expectedNodesNumber, int table) throws InterruptedException {
Set<String>[] lastAssignmentsHolderForLog = new Set[1];
assertTrue(waitForCondition(() -> {
Set<String> assignments =
await(partitionAssignments(cluster.aliveNode().metaStorageManager(), table, 0))
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toSet());
lastAssignmentsHolderForLog[0] = assignments;
return assignments.size() == expectedNodesNumber;
}, 30000), "Expected nodes: " + expectedNodesNumber + ", actual nodes size: " + lastAssignmentsHolderForLog[0].size());
}
private void waitForTablesCounterInMetastore(int expectedTablesNumber, int zoneId, int partitionNumber) throws InterruptedException {
Set<Integer>[] lastAssignmentsHolderForLog = new Set[1];
assertTrue(waitForCondition(() -> {
Set<Integer> tablesCounter = await(tablesCounter(cluster.aliveNode().metaStorageManager(), zoneId, partitionNumber));
lastAssignmentsHolderForLog[0] = tablesCounter;
return tablesCounter != null && tablesCounter.size() == expectedTablesNumber;
}, 30000), "Expected tables number: " + expectedTablesNumber + ", actual tables number: " + lastAssignmentsHolderForLog[0].size());
}
private static CompletableFuture<Set<Integer>> tablesCounter(
MetaStorageManager metaStorageManager,
int zoneId,
int partitionNumber
) {
return metaStorageManager
.get(tablesCounterKey(zoneId, partitionNumber))
.thenApply(e -> (e.value() == null) ? null : fromBytes(e.value()));
}
private String nodeName(int nodeIndex) {
return cluster.node(nodeIndex).name();
}
private void createZone(String zoneName, int partitions, int replicas) {
String sql1 = String.format("create zone %s with "
+ "partitions=%d, replicas=%d, "
+ "data_nodes_auto_adjust_scale_up=0, "
+ "data_nodes_auto_adjust_scale_down=0, "
+ "storage_profiles='%s'", zoneName, partitions, replicas, DEFAULT_STORAGE_PROFILE);
cluster.doInSession(0, session -> {
executeUpdate(sql1, session);
});
}
private void alterZone(String zoneName, int partitions, int replicas) {
String sql1 = String.format("alter zone %s set "
+ "partitions=%d, replicas=%d, "
+ "data_nodes_auto_adjust_scale_up=0, "
+ "data_nodes_auto_adjust_scale_down=0", zoneName, partitions, replicas);
cluster.doInSession(0, session -> {
executeUpdate(sql1, session);
});
}
private int createTestTable(String tableName, String zoneName) throws InterruptedException {
String sql2 = "create table " + tableName + " (id int primary key, val varchar(20))"
+ " with primary_zone='" + zoneName + "'";
cluster.doInSession(0, session -> {
executeUpdate(sql2, session);
});
CatalogManager catalogManager = cluster.aliveNode().catalogManager();
return catalogManager.catalog(catalogManager.latestCatalogVersion()).tables().stream()
.filter(t -> t.name().equals(tableName))
.findFirst().get().id();
}
}