blob: 120fa0187f14303e6a5120efe7c1d4a181abc06a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.distributionzones;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseStorageProfiles;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.DropZoneCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* Utils to manage distribution zones inside tests.
*/
public class DistributionZonesTestUtil {
/**
* Creates distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param partitions Zone number of partitions.
* @param replicas Zone number of replicas.
* @param storageProfile Data storage, {@code null} if not set.
*/
public static void createZoneWithStorageProfile(
CatalogManager catalogManager,
String zoneName,
int partitions,
int replicas,
@Nullable String storageProfile
) {
createZone(catalogManager, zoneName, partitions, replicas, null, null, null, storageProfile);
}
/**
* Creates distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param partitions Zone number of partitions.
* @param replicas Zone number of replicas.
*/
public static void createZone(CatalogManager catalogManager, String zoneName, int partitions, int replicas) {
createZone(catalogManager, zoneName, partitions, replicas, null, null, null, DEFAULT_STORAGE_PROFILE);
}
/**
* Creates a distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param dataNodesAutoAdjustScaleUp Timeout in seconds between node added topology event itself and data nodes switch,
* {@code null} if not set.
* @param dataNodesAutoAdjustScaleDown Timeout in seconds between node left topology event itself and data nodes switch,
* {@code null} if not set.
* @param filter Nodes filter, {@code null} if not set.
*/
public static void createZone(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer dataNodesAutoAdjustScaleUp,
@Nullable Integer dataNodesAutoAdjustScaleDown,
@Nullable String filter
) {
createZone(
catalogManager,
zoneName,
null,
null,
dataNodesAutoAdjustScaleUp,
dataNodesAutoAdjustScaleDown,
filter,
DEFAULT_STORAGE_PROFILE
);
}
/**
* Creates a distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param dataNodesAutoAdjustScaleUp Timeout in seconds between node added topology event itself and data nodes switch,
* {@code null} if not set.
* @param dataNodesAutoAdjustScaleDown Timeout in seconds between node left topology event itself and data nodes switch,
* {@code null} if not set.
* @param filter Nodes filter, {@code null} if not set.
* @param storageProfiles Storage profiles.
*/
public static void createZone(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer dataNodesAutoAdjustScaleUp,
@Nullable Integer dataNodesAutoAdjustScaleDown,
@Nullable String filter,
String storageProfiles
) {
createZone(
catalogManager,
zoneName,
null,
null,
dataNodesAutoAdjustScaleUp,
dataNodesAutoAdjustScaleDown,
filter,
storageProfiles
);
}
private static void createZone(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer partitions,
@Nullable Integer replicas,
@Nullable Integer dataNodesAutoAdjustScaleUp,
@Nullable Integer dataNodesAutoAdjustScaleDown,
@Nullable String filter,
String storageProfiles
) {
CreateZoneCommandBuilder builder = CreateZoneCommand.builder().zoneName(zoneName);
if (partitions != null) {
builder.partitions(partitions);
}
if (replicas != null) {
builder.replicas(replicas);
}
if (dataNodesAutoAdjustScaleUp != null) {
builder.dataNodesAutoAdjustScaleUp(dataNodesAutoAdjustScaleUp);
}
if (dataNodesAutoAdjustScaleDown != null) {
builder.dataNodesAutoAdjustScaleDown(dataNodesAutoAdjustScaleDown);
}
if (filter != null) {
builder.filter(filter);
}
assertNotNull(storageProfiles);
builder.storageProfilesParams(parseStorageProfiles(storageProfiles));
assertThat(catalogManager.execute(builder.build()), willCompleteSuccessfully());
}
/**
* Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)} in storage with set of LogicalNodes as input.
*
* @param zoneId Zone id.
* @param clusterNodes Data nodes.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertDataNodesFromLogicalNodesInStorage(
int zoneId,
@Nullable Set<LogicalNode> clusterNodes,
KeyValueStorage keyValueStorage
) throws InterruptedException {
Set<Node> nodes = clusterNodes == null
? null
: clusterNodes.stream().map(n -> new Node(n.name(), n.id())).collect(toSet());
assertValueInStorage(
keyValueStorage,
zoneDataNodesKey(zoneId).bytes(),
value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
nodes,
2000
);
}
/**
* Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)} in storage.
*
* @param zoneId Zone id.
* @param nodes Data nodes.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertDataNodesInStorage(
int zoneId,
@Nullable Set<Node> nodes,
KeyValueStorage keyValueStorage
) throws InterruptedException {
assertValueInStorage(
keyValueStorage,
zoneDataNodesKey(zoneId).bytes(),
value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
nodes,
2000
);
}
/**
* Asserts {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} revision.
*
* @param revision Revision.
* @param zoneId Zone id.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertZoneScaleUpChangeTriggerKey(
@Nullable Long revision,
int zoneId,
KeyValueStorage keyValueStorage
) throws InterruptedException {
assertValueInStorage(
keyValueStorage,
zoneScaleUpChangeTriggerKey(zoneId).bytes(),
ByteUtils::bytesToLong,
revision,
2000
);
}
/**
* Asserts {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} revision.
*
* @param revision Revision.
* @param zoneId Zone id.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertZoneScaleDownChangeTriggerKey(
@Nullable Long revision,
int zoneId,
KeyValueStorage keyValueStorage
) throws InterruptedException {
assertValueInStorage(
keyValueStorage,
zoneScaleDownChangeTriggerKey(zoneId).bytes(),
ByteUtils::bytesToLong,
revision,
2000
);
}
/**
* Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
*
* @param clusterNodes Expected cluster nodes.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertLogicalTopology(
@Nullable Set<LogicalNode> clusterNodes,
KeyValueStorage keyValueStorage
) throws InterruptedException {
Set<NodeWithAttributes> nodes = clusterNodes == null
? null
: clusterNodes.stream().map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes())).collect(toSet());
assertValueInStorage(
keyValueStorage,
zonesLogicalTopologyKey().bytes(),
ByteUtils::fromBytes,
nodes,
1000
);
}
/**
* Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value in Meta Storage.
*
* @param clusterNodes Expected cluster nodes.
* @param metaStorageManager Meta Storage manager.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertLogicalTopologyInMetastorage(
@Nullable Set<LogicalNode> clusterNodes,
MetaStorageManager metaStorageManager
) throws InterruptedException {
Set<NodeWithAttributes> nodes = clusterNodes == null
? null
: clusterNodes.stream()
.map(n -> new NodeWithAttributes(n.name(), n.id(), n.userAttributes(), n.storageProfiles()))
.collect(toSet());
assertValueInStorage(
metaStorageManager,
zonesLogicalTopologyKey(),
ByteUtils::fromBytes,
nodes,
1000
);
}
/**
* Asserts {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} value.
*
* @param topVer Topology version.
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
public static void assertLogicalTopologyVersion(@Nullable Long topVer, KeyValueStorage keyValueStorage) throws InterruptedException {
assertValueInStorage(
keyValueStorage,
zonesLogicalTopologyVersionKey().bytes(),
ByteUtils::bytesToLong,
topVer,
1000
);
}
/**
* Asserts value from the storage.
*
* @param keyValueStorage Key-value storage.
* @param key Key of value to check.
* @param valueTransformer Function that is applied to value from the storage.
* @param expectedValue Expected value.
* @param timeoutMillis Timeout in milliseconds.
* @param <T> A type of value from storage.
* @throws InterruptedException If interrupted.
*/
public static <T> void assertValueInStorage(
KeyValueStorage keyValueStorage,
byte[] key,
Function<byte[], T> valueTransformer,
@Nullable T expectedValue,
long timeoutMillis
) throws InterruptedException {
boolean success = waitForCondition(() -> {
byte[] storageValue = keyValueStorage.get(key).value();
T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
return Objects.equals(actualValue, expectedValue);
}, timeoutMillis);
// We do a second check simply to print a nice error message in case the condition above is not achieved.
if (!success) {
byte[] storageValue = keyValueStorage.get(key).value();
assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
}
}
/**
* Asserts value from the meta storage.
*
* @param metaStorageManager Meta Storage manager.
* @param key Key of value to check.
* @param valueTransformer Function that is applied to value from the meta storage.
* @param expectedValue Expected value.
* @param timeoutMillis Timeout in milliseconds.
* @param <T> A type of value from the meta storage.
* @throws InterruptedException If interrupted.
*/
public static <T> void assertValueInStorage(
MetaStorageManager metaStorageManager,
ByteArray key,
Function<byte[], T> valueTransformer,
@Nullable T expectedValue,
long timeoutMillis
) throws InterruptedException {
boolean success = waitForCondition(() -> {
byte[] storageValue = new byte[0];
try {
storageValue = metaStorageManager.get(key).get().value();
} catch (InterruptedException | ExecutionException e) {
fail();
}
T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
return Objects.equals(actualValue, expectedValue);
}, timeoutMillis);
// We do a second check simply to print a nice error message in case the condition above is not achieved.
if (!success) {
byte[] storageValue = new byte[0];
try {
storageValue = metaStorageManager.get(key).get().value();
} catch (ExecutionException e) {
fail();
}
assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
}
}
/**
* Asserts data nodes from the distribution zone manager.
*
* @param distributionZoneManager Distribution zone manager.
* @param zoneId Zone id.
* @param expectedValue Expected value.
* @param timeoutMillis Timeout in milliseconds.
* @throws InterruptedException If interrupted.
*/
public static void assertDataNodesFromManager(
DistributionZoneManager distributionZoneManager,
Supplier<Long> causalityToken,
Supplier<Integer> catalogVersion,
int zoneId,
@Nullable Set<LogicalNode> expectedValue,
long timeoutMillis
) throws InterruptedException, ExecutionException, TimeoutException {
Set<String> expectedValueNames =
expectedValue == null ? null : expectedValue.stream().map(ClusterNode::name).collect(toSet());
boolean success = waitForCondition(() -> {
Set<String> dataNodes = null;
try {
dataNodes = distributionZoneManager.dataNodes(causalityToken.get(), catalogVersion.get(), zoneId).get(5, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore
}
return Objects.equals(dataNodes, expectedValueNames);
}, timeoutMillis);
// We do a second check simply to print a nice error message in case the condition above is not achieved.
if (!success) {
Set<String> dataNodes = distributionZoneManager.dataNodes(causalityToken.get(), catalogVersion.get(), zoneId)
.get(5, TimeUnit.SECONDS);
assertThat(dataNodes, is(expectedValueNames));
}
}
/**
* Alters a distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param dataNodesAutoAdjustScaleUp Timeout in seconds between node added topology event itself and data nodes switch,
* {@code null} if not set.
* @param dataNodesAutoAdjustScaleDown Timeout in seconds between node left topology event itself and data nodes switch,
* {@code null} if not set.
* @param filter Nodes filter, {@code null} if not set.
*/
public static void alterZone(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer dataNodesAutoAdjustScaleUp,
@Nullable Integer dataNodesAutoAdjustScaleDown,
@Nullable String filter
) {
alterZone(catalogManager, zoneName, null, dataNodesAutoAdjustScaleUp, dataNodesAutoAdjustScaleDown, filter);
}
/**
* Alters a distribution zone in the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
* @param replicas New number of zone replicas.
*/
public static void alterZone(CatalogManager catalogManager, String zoneName, int replicas) {
alterZone(catalogManager, zoneName, replicas, null, null, null);
}
private static void alterZone(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer replicas,
@Nullable Integer dataNodesAutoAdjustScaleUp,
@Nullable Integer dataNodesAutoAdjustScaleDown,
@Nullable String filter
) {
AlterZoneCommandBuilder builder = AlterZoneCommand.builder().zoneName(zoneName);
if (replicas != null) {
builder.replicas(replicas);
}
if (dataNodesAutoAdjustScaleUp != null) {
builder.dataNodesAutoAdjustScaleUp(dataNodesAutoAdjustScaleUp);
}
if (dataNodesAutoAdjustScaleDown != null) {
builder.dataNodesAutoAdjustScaleDown(dataNodesAutoAdjustScaleDown);
}
if (filter != null) {
builder.filter(filter);
}
assertThat(catalogManager.execute(builder.build()), willCompleteSuccessfully());
}
/**
* Drops a distribution zone from the catalog.
*
* @param catalogManager Catalog manager.
* @param zoneName Zone name.
*/
public static void dropZone(CatalogManager catalogManager, String zoneName) {
CatalogCommand dropCommand = DropZoneCommand.builder()
.zoneName(zoneName)
.build();
assertThat(catalogManager.execute(dropCommand), willCompleteSuccessfully());
}
/**
* Returns distributed zone by ID from catalog, {@code null} if zone is absent.
*
* @param catalogService Catalog service.
* @param zoneId Zone ID.
* @param timestamp Timestamp.
*/
public static @Nullable CatalogZoneDescriptor getZoneById(CatalogService catalogService, int zoneId, long timestamp) {
return catalogService.zone(zoneId, timestamp);
}
/**
* Returns distributed zone ID form catalog, {@code null} if zone is absent.
*
* @param catalogService Catalog service.
* @param zoneName Distributed zone name.
* @param timestamp Timestamp.
*/
public static @Nullable Integer getZoneId(CatalogService catalogService, String zoneName, long timestamp) {
CatalogZoneDescriptor zone = catalogService.zone(zoneName, timestamp);
return zone == null ? null : zone.id();
}
/** Returns default distribution zone. */
public static CatalogZoneDescriptor getDefaultZone(CatalogService catalogService, long timestamp) {
Catalog catalog = catalogService.catalog(catalogService.activeCatalogVersion(timestamp));
Objects.requireNonNull(catalog);
return Objects.requireNonNull(catalog.defaultZone());
}
/**
* Returns distributed zone ID form catalog.
*
* @param catalogService Catalog service.
* @param zoneName Distributed zone name.
* @param timestamp Timestamp.
* @throws AssertionError If zone is absent.
*/
public static int getZoneIdStrict(CatalogService catalogService, String zoneName, long timestamp) {
Integer zoneId = getZoneId(catalogService, zoneName, timestamp);
assertNotNull(zoneId, "zoneName=" + zoneName + ", timestamp=" + timestamp);
return zoneId;
}
}