blob: e724e981281019be1de1f454ea374ef652f6de36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.distributionzones;
import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopologyInMetastorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKeyPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKeyPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesRecoverableStateRevision;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.stream.Stream;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.NodeConfigWriteException;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStaleIds;
import org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.fixtures.NoOpCriticalWorkerRegistry;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests for checking {@link DistributionZoneManager} behavior after node's restart.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRestartTest {
private static final LogicalNode A = new LogicalNode(
new ClusterNodeImpl("1", "A", new NetworkAddress("localhost", 123)),
Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10"),
Map.of(),
List.of(DEFAULT_STORAGE_PROFILE)
);
private static final LogicalNode B = new LogicalNode(
new ClusterNodeImpl("2", "B", new NetworkAddress("localhost", 123)),
Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30"),
Map.of(),
List.of(DEFAULT_STORAGE_PROFILE)
);
private static final LogicalNode C = new LogicalNode(
new ClusterNodeImpl("3", "C", new NetworkAddress("localhost", 123)),
Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20"),
Map.of(),
List.of(DEFAULT_STORAGE_PROFILE)
);
private static final String ZONE_NAME = "zone1";
private MetaStorageManager metastore;
private volatile boolean startScaleUpBlocking;
private volatile boolean startScaleDownBlocking;
private volatile boolean startGlobalStateUpdateBlocking;
/**
* Start some of Ignite components that are able to serve as Ignite node for test purposes.
*
* @param idx Node index.
* @return Partial node.
*/
private PartialNode startPartialNode(int idx) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(dir);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = configurationString(idx);
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().schemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator, modules.local()),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
new NoOpCriticalWorkerRegistry(),
mock(FailureProcessor.class)
);
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var cmgManager = mock(ClusterManagementGroupManager.class);
when(cmgManager.logicalTopology()).thenAnswer(invocation -> completedFuture(logicalTopology.getLogicalTopology()));
when(cmgManager.startAsync()).thenReturn(nullCompletedFuture());
when(cmgManager.stopAsync()).thenReturn(nullCompletedFuture());
metastore = spy(StandaloneMetaStorageManager.create(
new TestRocksDbKeyValueStorage(name, workDir.resolve("metastorage"))
));
blockMetaStorageUpdates(metastore);
Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = (LongFunction<CompletableFuture<?>> function) ->
metastore.registerRevisionUpdateListener(function::apply);
var cfgStorage = new DistributedConfigurationStorage("test", metastore);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().schemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
Set<Validator<?, ?>> validators = new HashSet<>(modules.distributed().validators());
validators.remove(AuthenticationProvidersValidatorImpl.INSTANCE);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, validators)
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var clock = new HybridClockImpl();
var clockWaiter = new ClockWaiter(name, clock);
var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), new TestClockService(clock, clockWaiter));
ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler", logger()));
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
revisionUpdater,
metastore,
logicalTopologyService,
catalogManager,
rebalanceScheduler
);
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
assertThat(vault.startAsync(), willCompleteSuccessfully());
vault.putName(name);
assertThat(nodeCfgMgr.startAsync(), willCompleteSuccessfully());
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
nettyBootstrapFactory,
clusterSvc,
clusterStateStorage,
cmgManager,
metastore,
clusterCfgMgr,
clockWaiter,
catalogManager,
distributionZoneManager
);
for (IgniteComponent component : otherComponents) {
// TODO: IGNITE-22119 required to be able to wait on this future.
component.startAsync();
components.add(component);
}
PartialNode partialNode = partialNode(
name,
nodeCfgMgr,
clusterCfgMgr,
metastore,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry,
clock
);
partialNodes.add(partialNode);
return partialNode;
}
@AfterEach
public void afterTest() {
startScaleUpBlocking = false;
startScaleDownBlocking = false;
}
@Test
public void testNodeAttributesRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
node.logicalTopology().putNode(C);
createZone(getCatalogManager(node), ZONE_NAME, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE, null, DEFAULT_STORAGE_PROFILE);
int zoneId = getZoneId(node, ZONE_NAME);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
assertDataNodesFromManager(distributionZoneManager, metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
Set.of(A, B, C), TIMEOUT_MILLIS);
Map<String, NodeWithAttributes> nodeAttributesBeforeRestart = distributionZoneManager.nodesAttributes();
node.stop();
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
Map<String, NodeWithAttributes> nodeAttributesAfterRestart = distributionZoneManager.nodesAttributes();
assertEquals(3, nodeAttributesAfterRestart.size());
assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
}
@ParameterizedTest(name = "defaultZone={0}")
@ValueSource(booleans = {true, false})
public void testTopologyAugmentationMapRestoredAfterRestart(boolean defaultZone) throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
String zoneName = createZoneOrAlterDefaultZone(node, defaultZone, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
node.logicalTopology().putNode(B);
node.logicalTopology().putNode(C);
int zoneId = getZoneId(node, zoneName);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B, C),
TIMEOUT_MILLIS
);
ConcurrentSkipListMap<Long, Augmentation> nodeAttributesBeforeRestart =
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
node.stop();
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
ConcurrentSkipListMap<Long, Augmentation> nodeAttributesAfterRestart =
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
assertEquals(2, nodeAttributesAfterRestart.size());
assertEquals(
nodeAttributesBeforeRestart.values().stream().map(Augmentation::nodes).collect(toSet()),
nodeAttributesAfterRestart.values().stream().map(Augmentation::nodes).collect(toSet())
);
}
@Test
public void testLogicalTopologyRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
node.logicalTopology().putNode(C);
Set<NodeWithAttributes> logicalTopology = Stream.of(A, B, C)
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
DistributionZoneManager finalDistributionZoneManager = distributionZoneManager;
assertTrue(waitForCondition(() -> logicalTopology.equals(finalDistributionZoneManager.logicalTopology()), TIMEOUT_MILLIS));
node.stop();
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
assertEquals(logicalTopology, distributionZoneManager.logicalTopology());
}
@Test
public void testLogicalTopologyInterruptedEventRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
assertValueInStorage(metastore, zonesLastHandledTopology(), (v) -> v, null, TIMEOUT_MILLIS);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
Set<NodeWithAttributes> logicalTopology = Stream.of(A, B)
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
DistributionZoneManager finalDistributionZoneManager = distributionZoneManager;
assertTrue(waitForCondition(() -> logicalTopology.equals(finalDistributionZoneManager.logicalTopology()), TIMEOUT_MILLIS));
assertValueInStorage(metastore, zonesLastHandledTopology(), ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
int zoneId = getDefaultZoneId(node);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B),
TIMEOUT_MILLIS
);
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
startGlobalStateUpdateBlocking = true;
startScaleUpBlocking = true;
node.logicalTopology().putNode(C);
Set<NodeWithAttributes> newLogicalTopology = Stream.of(A, B, C)
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
assertTrue(waitForCondition(() -> newLogicalTopology.equals(finalDistributionZoneManager.logicalTopology()), TIMEOUT_MILLIS));
assertValueInStorage(metastore, zonesLastHandledTopology(), ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
node.stop();
startGlobalStateUpdateBlocking = false;
startScaleUpBlocking = false;
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
catalogManager = getCatalogManager(node);
assertEquals(newLogicalTopology, distributionZoneManager.logicalTopology());
assertValueInStorage(metastore, zonesLastHandledTopology(), ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B, C),
TIMEOUT_MILLIS
);
}
@Test
public void testFirstLogicalTopologyUpdateInterruptedEventRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
assertValueInStorage(metastore, zonesLastHandledTopology(), (v) -> v, null, TIMEOUT_MILLIS);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
DistributionZoneManager finalDistributionZoneManager = distributionZoneManager;
assertValueInStorage(metastore, zonesLastHandledTopology(), (v) -> v, null, TIMEOUT_MILLIS);
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
startGlobalStateUpdateBlocking = true;
startScaleUpBlocking = true;
node.logicalTopology().putNode(C);
Set<NodeWithAttributes> newLogicalTopology = Stream.of(C)
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
assertTrue(waitForCondition(() -> newLogicalTopology.equals(finalDistributionZoneManager.logicalTopology()), TIMEOUT_MILLIS));
assertValueInStorage(metastore, zonesLastHandledTopology(), (v) -> v, null, TIMEOUT_MILLIS);
node.stop();
startGlobalStateUpdateBlocking = false;
startScaleUpBlocking = false;
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
assertEquals(newLogicalTopology, distributionZoneManager.logicalTopology());
assertValueInStorage(metastore, zonesLastHandledTopology(), ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
int zoneId = getDefaultZoneId(node);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(C),
TIMEOUT_MILLIS
);
}
@Test
public void testCreationZoneWhenDataNodesAreDeletedIsNotSuccessful() throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
node.logicalTopology().putNode(C);
Set<NodeWithAttributes> logicalTopology = Stream.of(A, B, C)
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
DistributionZoneManager finalDistributionZoneManager = distributionZoneManager;
assertTrue(waitForCondition(() -> logicalTopology.equals(finalDistributionZoneManager.logicalTopology()), TIMEOUT_MILLIS));
int zoneId = getDefaultZoneId(node);
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
(v) -> dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), B.name(), C.name()),
TIMEOUT_MILLIS
);
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
byte[][] dataNodeKey = new byte[1][1];
// In this mock we catch invocation of DistributionZoneManager.initDataNodesAndTriggerKeysInMetaStorage, where condition is based
// on presence of data node key in ms. After that we make this data node as a tombstone, so when logic of creation of a zone is
// run, there won't be any initialisation of data nodes keys. We try to imitate concurrent removal of a zone.
doAnswer(invocation -> {
ByteArray dataNodeKeyForZone = new ByteArray(dataNodeKey[0]);
// Here we remove data nodes value for newly created zone, so it is tombstone
metastore.put(dataNodeKeyForZone, toBytes(toDataNodesMap(emptySet()))).get();
metastore.remove(dataNodeKeyForZone).get();
return invocation.callRealMethod();
}).when(metastore).invoke(argThat(iif -> {
If iif1 = MetaStorageWriteHandler.toIf(iif);
byte[][] keysFromIf = iif1.cond().keys();
Optional<byte[]> dataNodeKeyOptional = Arrays.stream(keysFromIf)
.filter(op -> startsWith(op, zoneDataNodesKey().bytes()))
.findFirst();
dataNodeKeyOptional.ifPresent(bytes -> dataNodeKey[0] = bytes);
return dataNodeKeyOptional.isPresent();
}));
createZone(getCatalogManager(node), "zone1", INFINITE_TIMER_VALUE, INFINITE_TIMER_VALUE, null, DEFAULT_STORAGE_PROFILE);
// Assert that after creation of a zone, data nodes are still tombstone, but not the logical topology, as for default zone.
assertThat(metastore.get(new ByteArray(dataNodeKey[0])).thenApply(Entry::tombstone), willBe(true));
}
@ParameterizedTest(name = "defaultZone={0}")
@ValueSource(booleans = {true, false})
public void testLocalDataNodesAreRestoredAfterRestart(boolean defaultZone) throws Exception {
PartialNode node = startPartialNode(0);
String zoneName = createZoneOrAlterDefaultZone(node, defaultZone, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
int zoneId = getZoneId(node, zoneName);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
assertDataNodesFromManager(distributionZoneManager, metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
Set.of(A, B), TIMEOUT_MILLIS);
node.logicalTopology().removeNodes(Set.of(B));
assertDataNodesFromManager(distributionZoneManager, metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
Set.of(A), TIMEOUT_MILLIS);
long revisionBeforeRestart = metastore.appliedRevision();
node.stop();
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
assertDataNodesFromManager(distributionZoneManager, metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
Set.of(A), TIMEOUT_MILLIS);
assertDataNodesFromManager(distributionZoneManager, () -> revisionBeforeRestart, catalogManager::latestCatalogVersion, zoneId,
Set.of(A), TIMEOUT_MILLIS);
}
@ParameterizedTest(name = "defaultZone={0}")
@ValueSource(booleans = {true, false})
public void testScaleUpTimerIsRestoredAfterRestart(boolean defaultZone) throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
assertLogicalTopologyInMetastorage(Set.of(A, B), metastore);
String zoneName = createZoneOrAlterDefaultZone(node, defaultZone, 1, 1);
int zoneId = getZoneId(node, zoneName);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B),
TIMEOUT_MILLIS
);
// Block scale up
CountDownLatch scaleUpLatch = blockScaleUpTaskExecution(zoneId, distributionZoneManager);
assertTrue(waitForCondition(() -> scaleUpLatch.getCount() == 1, TIMEOUT_MILLIS));
node.logicalTopology().putNode(C);
node.logicalTopology().removeNodes(Set.of(B));
node.logicalTopology().putNode(B);
assertLogicalTopologyInMetastorage(Set.of(A, B, C), metastore);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B),
TIMEOUT_MILLIS
);
node.stop();
scaleUpLatch.countDown();
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
catalogManager = getCatalogManager(node);
assertDataNodesFromManager(distributionZoneManager, metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
Set.of(A, B, C), TIMEOUT_MILLIS);
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
(v) -> dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), B.name(), C.name()),
TIMEOUT_MILLIS
);
}
@ParameterizedTest(name = "defaultZone={0}")
@ValueSource(booleans = {true, false})
public void testScaleDownTimerIsRestoredAfterRestart(boolean defaultZone) throws Exception {
PartialNode node = startPartialNode(0);
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
assertLogicalTopologyInMetastorage(Set.of(A, B), metastore);
DistributionZoneManager distributionZoneManager = getDistributionZoneManager(node);
CatalogManager catalogManager = getCatalogManager(node);
String zoneName = createZoneOrAlterDefaultZone(node, defaultZone, 1, 1);
int zoneId = getZoneId(node, zoneName);
// Block scale down
CountDownLatch scaleDownLatch = blockScaleDownTaskExecution(zoneId, distributionZoneManager);
assertTrue(waitForCondition(() -> scaleDownLatch.getCount() == 1, TIMEOUT_MILLIS));
node.logicalTopology().removeNodes(Set.of(B));
node.logicalTopology().putNode(C);
assertDataNodesFromManager(
distributionZoneManager,
metastore::appliedRevision,
catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, B),
TIMEOUT_MILLIS
);
node.stop();
scaleDownLatch.countDown();
node = startPartialNode(0);
catalogManager = getCatalogManager(node);
assertDataNodesFromManager(
getDistributionZoneManager(node),
metastore::appliedRevision, catalogManager::latestCatalogVersion,
zoneId,
Set.of(A, C),
TIMEOUT_MILLIS
);
metastore = findComponent(node.startedComponents(), MetaStorageManager.class);
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
(v) -> dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), C.name()),
TIMEOUT_MILLIS
);
}
private static String createZoneOrAlterDefaultZone(
PartialNode node,
boolean useDefaultZone,
int scaleUp,
int scaleDown
) throws Exception {
String zoneName;
if (useDefaultZone) {
CatalogZoneDescriptor defaultZone = getDefaultZone(getCatalogManager(node), node.clock().nowLong());
zoneName = defaultZone.name();
alterZone(node, zoneName, scaleUp, scaleDown, null);
ZoneState zoneState = getDistributionZoneManager(node).zonesState().get(defaultZone.id());
// This is needed because we want to wait for the end of scale up/down triggered by altering delays.
if (zoneState.scaleUpTask() != null) {
assertTrue(waitForCondition(() -> zoneState.scaleUpTask().isDone(), TIMEOUT_MILLIS));
}
if (zoneState.scaleDownTask() != null) {
assertTrue(waitForCondition(() -> zoneState.scaleDownTask().isDone(), TIMEOUT_MILLIS));
}
} else {
zoneName = ZONE_NAME;
createZone(getCatalogManager(node), zoneName, scaleUp, scaleDown, null, DEFAULT_STORAGE_PROFILE);
}
return zoneName;
}
private void blockMetaStorageUpdates(MetaStorageManager metaStorageManager) {
doThrow(new RuntimeException("Expected")).when(metaStorageManager).invoke(argThat(iif -> {
If iif1 = MetaStorageWriteHandler.toIf(iif);
byte[] keyScaleUpBytes = zoneScaleUpChangeTriggerKeyPrefix().bytes();
byte[] keyScaleDownBytes = zoneScaleDownChangeTriggerKeyPrefix().bytes();
byte[] keyGlobalStateBytes = zonesRecoverableStateRevision().bytes();
boolean isScaleUpKey = iif1.andThen().update().operations().stream().anyMatch(op -> startsWith(op.key(), keyScaleUpBytes));
boolean isScaleDownKey = iif1.andThen().update().operations().stream().anyMatch(op -> startsWith(op.key(), keyScaleDownBytes));
boolean isGlobalStateChangeKey = iif1.andThen().update().operations().stream()
.anyMatch(op -> startsWith(op.key(), keyGlobalStateBytes));
return isScaleUpKey && startScaleUpBlocking
|| isScaleDownKey && startScaleDownBlocking
|| isGlobalStateChangeKey && startGlobalStateUpdateBlocking;
}));
}
private static <T extends IgniteComponent> T getStartedComponent(PartialNode node, Class<T> componentClass) {
T component = findComponent(node.startedComponents(), componentClass);
assertNotNull(component);
return component;
}
private static DistributionZoneManager getDistributionZoneManager(PartialNode node) {
return getStartedComponent(node, DistributionZoneManager.class);
}
private static CatalogManager getCatalogManager(PartialNode node) {
return getStartedComponent(node, CatalogManager.class);
}
private static int getZoneId(PartialNode node, String zoneName) {
return DistributionZonesTestUtil.getZoneId(getCatalogManager(node), zoneName, node.clock().nowLong());
}
private static int getDefaultZoneId(PartialNode node) {
return DistributionZonesTestUtil.getDefaultZone(getCatalogManager(node), node.clock().nowLong()).id();
}
private static void alterZone(
PartialNode node,
String zoneName,
@Nullable Integer scaleUp,
@Nullable Integer scaleDown,
@Nullable String filter
) {
DistributionZonesTestUtil.alterZone(getCatalogManager(node), zoneName, scaleUp, scaleDown, filter);
}
/**
* Schedule a scale up task which block execution of another scale up tasks.
* Note that because of the executor of scale up/scale down tasks is single-threaded, this
* method also blocks scale down tasks execution.
*
* @return Latch to unblock execution of scale up tasks.
*/
private static CountDownLatch blockScaleUpTaskExecution(int zoneId, DistributionZoneManager distributionZoneManager) {
CountDownLatch scaleUpLatch = new CountDownLatch(2);
Runnable dummyScaleUpTask = () -> {
try {
scaleUpLatch.countDown();
scaleUpLatch.await(2 * TIMEOUT_MILLIS, MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
distributionZoneManager.zonesState().get(zoneId)
.rescheduleScaleUp(IMMEDIATE_TIMER_VALUE, dummyScaleUpTask, zoneId);
return scaleUpLatch;
}
/**
* Schedule a scale down task which block execution of another scale down tasks.
* Note that because of the executor of scale up/scale down tasks is single-threaded, this
* method also blocks scale up tasks execution.
*
* @return Latch to unblock execution of scale down tasks.
*/
private static CountDownLatch blockScaleDownTaskExecution(int zoneId, DistributionZoneManager distributionZoneManager) {
CountDownLatch scaleDownLatch = new CountDownLatch(2);
Runnable dummyScaleDownTask = () -> {
try {
scaleDownLatch.countDown();
scaleDownLatch.await(TIMEOUT_MILLIS, MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
distributionZoneManager.zonesState().get(zoneId)
.rescheduleScaleDown(IMMEDIATE_TIMER_VALUE, dummyScaleDownTask, zoneId);
return scaleDownLatch;
}
}