blob: ce9009e8c420fe15033701beeea291f910e37960 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Integration test for data nodes' filters functionality.
*/
public class ItDistributionZonesFiltersTest extends ClusterPerTestIntegrationTest {
private static final String ZONE_NAME = "TEST_ZONE";
private static final String TABLE_NAME = "table1";
@Language("HOCON")
private static final String NODE_ATTRIBUTES = "{region.attribute = US, storage.attribute = SSD}";
private static final String STORAGE_PROFILES = String.format("'%s, %s'", DEFAULT_ROCKSDB_PROFILE_NAME, DEFAULT_AIPERSIST_PROFILE_NAME);
@Language("HOCON")
private static final String STORAGE_PROFILES_CONFIGS = String.format(
"{%s.engine = rocksdb, %s.engine = aipersist}",
DEFAULT_ROCKSDB_PROFILE_NAME,
DEFAULT_AIPERSIST_PROFILE_NAME
);
private static final String COLUMN_KEY = "key";
private static final String COLUMN_VAL = "val";
private static final int TIMEOUT_MILLIS = 10_000;
@Language("HOCON")
private static String createStartConfig(@Language("HOCON") String nodeAttributes, @Language("HOCON") String storageProfiles) {
return "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder.netClusterNodes: [ {} ]\n"
+ " },"
+ " nodeAttributes.nodeAttributes: " + nodeAttributes + ",\n"
+ " storage.profiles: " + storageProfiles + ",\n"
+ " clientConnector.port: {},\n"
+ " rest.port: {}\n"
+ "}";
}
@Override
protected String getNodeBootstrapConfigTemplate() {
return createStartConfig(NODE_ATTRIBUTES, STORAGE_PROFILES_CONFIGS);
}
@Override
protected int initialNodes() {
return 1;
}
/**
* Tests the scenario when filter was applied to data nodes and stable key for rebalance was changed to that filtered value.
*
* @throws Exception If failed.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21387")
void testFilteredDataNodesPropagatedToStable() throws Exception {
String filter = "$[?(@.region == \"US\" && @.storage == \"SSD\")]";
// This node do not pass the filter
@Language("HOCON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"SSD\"}}";
IgniteImpl node = startNode(1, createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS));
node.sql().execute(null, createZoneSql(2, 3, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE, filter, STORAGE_PROFILES));
node.sql().execute(null, createTableSql());
MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
.getFieldValue(node, IgniteImpl.class, "metaStorageMgr");
TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "distributedTblMgr");
TableViewInternal table = (TableViewInternal) tableManager.table(TABLE_NAME);
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes().size(),
1,
TIMEOUT_MILLIS
);
@Language("HOCON") String secondNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
// This node pass the filter but storage profiles of a node do not match zone's storage profiles.
// TODO: https://issues.apache.org/jira/browse/IGNITE-21387 recovery of this node is failing,
// TODO: because there are no appropriate storage profile on the node
@Language("HOCON") String notMatchingProfiles = "{dummy:{engine:\"dummy\"},another_dummy:{engine:\"dummy\"}}";
startNode(2, createStartConfig(secondNodeAttributes, notMatchingProfiles));
// This node pass the filter and storage profiles of a node match zone's storage profiles.
startNode(3, createStartConfig(secondNodeAttributes, STORAGE_PROFILES_CONFIGS));
int zoneId = getZoneId(node);
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
(v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
4,
TIMEOUT_MILLIS
);
Entry dataNodesEntry1 = metaStorageManager.get(zoneDataNodesKey(zoneId)).get(5_000, MILLISECONDS);
assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= dataNodesEntry1.revision(), TIMEOUT_MILLIS));
// We check that two nodes that pass the filter and storage profiles are presented in the stable key.
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name(), node(3).name()),
TIMEOUT_MILLIS * 2
);
}
/**
* Tests the scenario when altering filter triggers immediate scale up so data nodes
* and stable key for rebalance is changed to the new value even if scale up timer is big enough.
*
* @throws Exception If failed.
*/
@Test
void testAlteringFiltersPropagatedDataNodesToStableImmediately() throws Exception {
String filter = "$[?(@.region == \"US\" && @.storage == \"SSD\")]";
IgniteImpl node0 = node(0);
node0.sql().execute(null, createZoneSql(2, 3, 10_000, 10_000, filter, STORAGE_PROFILES));
node0.sql().execute(null, createTableSql());
MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
.getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
TableViewInternal table = (TableViewInternal) tableManager.table(TABLE_NAME);
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
TIMEOUT_MILLIS
);
@Language("HOCON") String firstNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
// This node pass the filter
startNode(1, createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS));
// Expected size is 1 because we have timers equals to 10000, so no scale up will be propagated.
waitDataNodeAndListenersAreHandled(metaStorageManager, 1, getZoneId(node0));
node0.sql().execute(null, alterZoneSql(DEFAULT_FILTER));
// We check that all nodes that pass the filter are presented in the stable key because altering filter triggers immediate scale up.
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name(), node(1).name()),
TIMEOUT_MILLIS * 2
);
}
/**
* Tests the scenario when empty data nodes are not propagated to stable after filter is altered, because there are no node that
* matches the new filter.
*
* @throws Exception If failed.
*/
@Test
void testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter() throws Exception {
String filter = "$[?(@.region == \"US\" && @.storage == \"SSD\")]";
IgniteImpl node0 = node(0);
node0.sql().execute(null, createZoneSql(2, 3, 10_000, 10_000, filter, STORAGE_PROFILES));
node0.sql().execute(null, createTableSql());
MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
.getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
TableViewInternal table = (TableViewInternal) tableManager.table(TABLE_NAME);
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
TIMEOUT_MILLIS
);
@Language("HOCON") String firstNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
// This node pass the filter
startNode(1, createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS));
int zoneId = getZoneId(node0);
// Expected size is 2 because we have timers equals to 10000, so no scale up will be propagated.
waitDataNodeAndListenersAreHandled(metaStorageManager, 1, zoneId);
// There is no node that match the filter
String newFilter = "$[?(@.region == \"FOO\" && @.storage == \"BAR\")]";
node0.sql().execute(null, alterZoneSql(newFilter));
waitDataNodeAndListenersAreHandled(metaStorageManager, 2, zoneId);
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
TIMEOUT_MILLIS
);
}
/**
* Tests the scenario when removal node from the logical topology leads to empty data nodes, but this empty data nodes do not trigger
* rebalance. Data nodes become empty after applying corresponding filter.
*
* @throws Exception If failed.
*/
@Test
void testFilteredEmptyDataNodesDoNotTriggerRebalance() throws Exception {
String filter = "$[?(@.region == \"EU\" && @.storage == \"HDD\")]";
// This node do not pass the filter.
IgniteImpl node0 = node(0);
// This node passes the filter
@Language("HOCON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
IgniteImpl node1 = startNode(1, createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS));
node1.sql().execute(null, createZoneSql(1, 1, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE, filter, STORAGE_PROFILES));
MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils.getFieldValue(
node0,
IgniteImpl.class,
"metaStorageMgr"
);
int zoneId = getZoneId(node0);
waitDataNodeAndListenersAreHandled(metaStorageManager, 2, zoneId);
node1.sql().execute(null, createTableSql());
TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
TableViewInternal table = (TableViewInternal) tableManager.table(TABLE_NAME);
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
// Table was created after both nodes was up, so there wasn't any rebalance.
assertPendingAssignmentsWereNeverExist(metaStorageManager, partId);
// Stop node, that was only one, that passed the filter, so data nodes after filtering will be empty.
stopNode(1);
waitDataNodeAndListenersAreHandled(metaStorageManager, 1, zoneId);
// Check that pending are null, so there wasn't any rebalance.
assertPendingAssignmentsWereNeverExist(metaStorageManager, partId);
}
@Test
void testFilteredEmptyDataNodesDoNotTriggerRebalanceOnReplicaUpdate() throws Exception {
String filter = "$[?(@.region == \"EU\" && @.storage == \"HDD\")]";
// This node do not pass the filter.
IgniteImpl node0 = node(0);
// This node passes the filter
@Language("HOCON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
startNode(1, createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS));
node0.sql().execute(null, createZoneSql(1, 1, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE, filter, STORAGE_PROFILES));
MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils.getFieldValue(
node0,
IgniteImpl.class,
"metaStorageMgr"
);
int zoneId = getZoneId(node0);
waitDataNodeAndListenersAreHandled(metaStorageManager, 2, zoneId);
node0.sql().execute(null, createTableSql());
TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
TableViewInternal table = (TableViewInternal) tableManager.table(TABLE_NAME);
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
// Table was created after both nodes was up, so there wasn't any rebalance.
assertPendingAssignmentsWereNeverExist(metaStorageManager, partId);
// Stop node, that was only one, that passed the filter, so data nodes after filtering will be empty.
stopNode(1);
waitDataNodeAndListenersAreHandled(metaStorageManager, 1, zoneId);
// Check that stable and pending are null, so there wasn't any rebalance.
assertPendingAssignmentsWereNeverExist(metaStorageManager, partId);
node0.sql().execute(null, alterZoneSql(2));
CountDownLatch latch = new CountDownLatch(1);
// We need to be sure, that the first asynchronous catalog change of replica was handled,
// so we create a listener with a latch, and change replica again and wait for latch, so we can be sure that the first
// replica was handled.
node0.catalogManager().listen(CatalogEvent.ZONE_ALTER, parameters -> {
latch.countDown();
return falseCompletedFuture();
});
node0.sql().execute(null, alterZoneSql(3));
assertTrue(latch.await(10_000, MILLISECONDS));
// Check that stable and pending are null, so there wasn't any rebalance.
assertPendingAssignmentsWereNeverExist(metaStorageManager, partId);
}
private static void waitDataNodeAndListenersAreHandled(
MetaStorageManager metaStorageManager,
int expectedDataNodesSize,
int zoneId
) throws Exception {
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
(v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
expectedDataNodesSize,
TIMEOUT_MILLIS
);
ByteArray fakeKey = new ByteArray("Foo");
metaStorageManager.put(fakeKey, toBytes("Bar")).get(5_000, MILLISECONDS);
Entry fakeEntry = metaStorageManager.get(fakeKey).get(5_000, MILLISECONDS);
// We wait for all data nodes listeners are triggered and all their meta storage activity is done.
assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= fakeEntry.revision(), 5_000));
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
(v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
expectedDataNodesSize,
TIMEOUT_MILLIS
);
}
private static void assertPendingAssignmentsWereNeverExist(
MetaStorageManager metaStorageManager,
TablePartitionId partId
) throws InterruptedException, ExecutionException {
assertTrue(metaStorageManager.get(pendingPartAssignmentsKey(partId)).get().empty());
}
private static String createZoneSql(int partitions, int replicas, int scaleUp, int scaleDown, String filter, String storageProfiles) {
String sqlFormat = "CREATE ZONE \"%s\" WITH "
+ "\"REPLICAS\" = %s, "
+ "\"PARTITIONS\" = %s, "
+ "\"DATA_NODES_FILTER\" = '%s', "
+ "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = %s, "
+ "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = %s, "
+ "\"STORAGE_PROFILES\" = %s";
return String.format(sqlFormat, ZONE_NAME, replicas, partitions, filter, scaleUp, scaleDown, storageProfiles);
}
private static String alterZoneSql(String filter) {
return String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", ZONE_NAME, filter);
}
private static String alterZoneSql(int replicas) {
return String.format("ALTER ZONE \"%s\" SET \"REPLICAS\" = %s", ZONE_NAME, replicas);
}
private static String createTableSql() {
return String.format(
"CREATE TABLE %s(%s INT PRIMARY KEY, %s VARCHAR) WITH STORAGE_PROFILE='%s',PRIMARY_ZONE='%s'",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL, DEFAULT_AIPERSIST_PROFILE_NAME, ZONE_NAME
);
}
private static int getZoneId(IgniteImpl node) {
return DistributionZonesTestUtil.getZoneIdStrict(node.catalogManager(), ZONE_NAME, node.clock().nowLong());
}
}