blob: 6fdec030e47a2b34f4f0ff20f8c530576580a0da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PLANNED_KEY_REMOVED_EMPTY_PENDING;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PLANNED_KEY_REMOVED_EQUALS_PENDING;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PLANNED_KEY_UPDATED;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
/**
* Util class for methods needed for the rebalance process.
*/
public class RebalanceUtil {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(RebalanceUtil.class);
/**
* Status values for methods like
* {@link #updatePendingAssignmentsKeys(CatalogTableDescriptor, TablePartitionId, Collection, int, long, MetaStorageManager, int, Set)}
* or {@link #manualPartitionUpdate(TablePartitionId, Collection, Set, int, long, MetaStorageManager, Set)}.
*/
public enum UpdateStatus {
/**
* Return code of metastore multi-invoke which identifies,
* that pending key was updated to new value (i.e. there is no active rebalance at the moment of call).
*/
PENDING_KEY_UPDATED,
/**
* Return code of metastore multi-invoke which identifies,
* that planned key was updated to new value (i.e. there is an active rebalance at the moment of call).
*/
PLANNED_KEY_UPDATED,
/**
* Return code of metastore multi-invoke which identifies,
* that planned key was removed, because current rebalance is already have the same target.
*/
PLANNED_KEY_REMOVED_EQUALS_PENDING,
/**
* Return code of metastore multi-invoke which identifies,
* that planned key was removed, because current assignment is empty.
*/
PLANNED_KEY_REMOVED_EMPTY_PENDING,
/**
* Return code of metastore multi-invoke which identifies,
* that assignments do not need to be updated.
*/
ASSIGNMENT_NOT_UPDATED,
/**
* Return code of metastore multi-invoke which identifies,
* that this trigger event was already processed by another node and must be skipped.
*/
OUTDATED_UPDATE_RECEIVED;
private static final UpdateStatus[] VALUES = values();
public static UpdateStatus valueOf(int ordinal) {
return VALUES[ordinal];
}
}
/** Rebalance scheduler pool size. */
public static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Runtime.getRuntime().availableProcessors() * 3, 20);
/**
* Update keys that related to rebalance algorithm in Meta Storage. Keys are specific for partition.
*
* @param tableDescriptor Table descriptor.
* @param partId Unique identifier of a partition.
* @param dataNodes Data nodes.
* @param replicas Number of replicas for a table.
* @param revision Revision of Meta Storage that is specific for the assignment update.
* @param metaStorageMgr Meta Storage manager.
* @param partNum Partition id.
* @param tableCfgPartAssignments Table configuration assignments.
* @return Future representing result of updating keys in {@code metaStorageMgr}
*/
public static CompletableFuture<Void> updatePendingAssignmentsKeys(
CatalogTableDescriptor tableDescriptor,
TablePartitionId partId,
Collection<String> dataNodes,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
int partNum,
Set<Assignment> tableCfgPartAssignments
) {
ByteArray partChangeTriggerKey = pendingChangeTriggerKey(partId);
ByteArray partAssignmentsPendingKey = pendingPartAssignmentsKey(partId);
ByteArray partAssignmentsPlannedKey = plannedPartAssignmentsKey(partId);
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
Set<Assignment> partAssignments = AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
boolean isNewAssignments = !tableCfgPartAssignments.equals(partAssignments);
byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments);
// if empty(partition.change.trigger.revision) || partition.change.trigger.revision < event.revision:
// if empty(partition.assignments.pending)
// && ((isNewAssignments && empty(partition.assignments.stable))
// || (partition.assignments.stable != calcPartAssignments() && !empty(partition.assignments.stable))):
// partition.assignments.pending = calcPartAssignments()
// partition.change.trigger.revision = event.revision
// else:
// if partition.assignments.pending != calcPartAssignments && !empty(partition.assignments.pending)
// partition.assignments.planned = calcPartAssignments()
// partition.change.trigger.revision = event.revision
// else if partition.assignments.pending == calcPartAssignments
// remove(partition.assignments.planned)
// message after the metastorage invoke:
// "Remove planned key because current pending key has the same value."
// else if empty(partition.assignments.pending)
// remove(partition.assignments.planned)
// message after the metastorage invoke:
// "Remove planned key because pending is empty and calculated assignments are equal to current assignments."
// else:
// skip
Condition newAssignmentsCondition = exists(partAssignmentsStableKey).and(value(partAssignmentsStableKey).ne(partAssignmentsBytes));
if (isNewAssignments) {
newAssignmentsCondition = notExists(partAssignmentsStableKey).or(newAssignmentsCondition);
}
Iif iif = iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
iif(and(notExists(partAssignmentsPendingKey), newAssignmentsCondition),
ops(
put(partAssignmentsPendingKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
).yield(PENDING_KEY_UPDATED.ordinal()),
iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsBytes), exists(partAssignmentsPendingKey)),
ops(
put(partAssignmentsPlannedKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
).yield(PLANNED_KEY_UPDATED.ordinal()),
iif(value(partAssignmentsPendingKey).eq(partAssignmentsBytes),
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING.ordinal()),
iif(notExists(partAssignmentsPendingKey),
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING.ordinal()),
ops().yield(ASSIGNMENT_NOT_UPDATED.ordinal()))
))),
ops().yield(OUTDATED_UPDATE_RECEIVED.ordinal()));
return metaStorageMgr.invoke(iif).thenAccept(sr -> {
switch (UpdateStatus.valueOf(sr.getAsInt())) {
case PENDING_KEY_UPDATED:
LOG.info(
"Update metastore pending partitions key [key={}, partition={}, table={}/{}, newVal={}]",
partAssignmentsPendingKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments);
break;
case PLANNED_KEY_UPDATED:
LOG.info(
"Update metastore planned partitions key [key={}, partition={}, table={}/{}, newVal={}]",
partAssignmentsPlannedKey, partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments
);
break;
case PLANNED_KEY_REMOVED_EQUALS_PENDING:
LOG.info(
"Remove planned key because current pending key has the same value [key={}, partition={}, table={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments
);
break;
case PLANNED_KEY_REMOVED_EMPTY_PENDING:
LOG.info(
"Remove planned key because pending is empty and calculated assignments are equal to current assignments "
+ "[key={}, partition={}, table={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments
);
break;
case ASSIGNMENT_NOT_UPDATED:
LOG.debug(
"Assignments are not updated [key={}, partition={}, table={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments
);
break;
case OUTDATED_UPDATE_RECEIVED:
LOG.debug(
"Received outdated rebalance trigger event [revision={}, partition={}, table={}/{}]",
revision, partNum, tableDescriptor.id(), tableDescriptor.name());
break;
default:
throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
}
});
}
/**
* Triggers rebalance on all partitions of the provided table: that is, reads table assignments from
* the MetaStorage, computes new ones based on the current properties of the table, its zone and the
* provided data nodes, and, if the calculated assignments are different from the ones loaded from the
* MetaStorages, writes them as pending assignments.
*
* @param tableDescriptor Table descriptor.
* @param zoneDescriptor Zone descriptor.
* @param dataNodes Data nodes to use.
* @param storageRevision MetaStorage revision corresponding to this request.
* @param metaStorageManager MetaStorage manager used to read/write assignments.
* @return Array of futures, one per partition of the table; the futures complete when the described
* rebalance triggering completes.
*/
public static CompletableFuture<?>[] triggerAllTablePartitionsRebalance(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
Set<String> dataNodes,
long storageRevision,
MetaStorageManager metaStorageManager
) {
CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
metaStorageManager,
tableDescriptor.id(),
Set.of(),
zoneDescriptor.partitions()
);
CompletableFuture<?>[] futures = new CompletableFuture[zoneDescriptor.partitions()];
for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), partId);
int finalPartId = partId;
futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments ->
// TODO https://issues.apache.org/jira/browse/IGNITE-19763 We should distinguish empty stable assignments on
// TODO node recovery in case of interrupted table creation, and moving from empty assignments to non-empty.
tableAssignments.isEmpty() ? nullCompletedFuture() : updatePendingAssignmentsKeys(
tableDescriptor,
replicaGrpId,
dataNodes,
zoneDescriptor.replicas(),
storageRevision,
metaStorageManager,
finalPartId,
tableAssignments.get(finalPartId).nodes()
));
}
return futures;
}
/**
* Sets force assignments for the zone/table if it's required. The condition for force reassignment is the absence of stable
* assignments' majority within the set of currently alive nodes. In this case we calculate new assignments that include all alive
* stable nodes, and try to save ot with a {@link Assignments#force()} flag enabled.
*
* @param tableDescriptor Table descriptor.
* @param zoneDescriptor Zone descriptor.
* @param partitionIds Partitions IDs to force assignments for. If empty, reassigns all zone's partitions.
* @param dataNodes Current DZ data nodes.
* @param aliveNodesConsistentIds Set of alive nodes according to logical topology.
* @param revision Meta-storage revision to be associated with reassignment.
* @param metaStorageManager Meta-storage manager.
* @return A future that will be completed when reassignments data is written into a meta-storage, if that's required.
*/
public static CompletableFuture<?>[] forceAssignmentsUpdate(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
Set<Integer> partitionIds,
Set<String> dataNodes,
Set<String> aliveNodesConsistentIds,
long revision,
MetaStorageManager metaStorageManager
) {
CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = tableAssignments(
metaStorageManager,
tableDescriptor.id(),
partitionIds,
zoneDescriptor.partitions()
);
Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, aliveNodesConsistentIds);
int[] ids = partitionIds.isEmpty()
? IntStream.range(0, zoneDescriptor.partitions()).toArray()
: partitionIds.stream().mapToInt(Integer::intValue).toArray();
CompletableFuture<?>[] futures = new CompletableFuture[ids.length];
for (int i = 0; i < ids.length; i++) {
TablePartitionId replicaGrpId = new TablePartitionId(tableDescriptor.id(), ids[i]);
futures[i] = tableAssignmentsFut.thenCompose(tableAssignments ->
tableAssignments.isEmpty() ? nullCompletedFuture() : manualPartitionUpdate(
replicaGrpId,
aliveDataNodes,
aliveNodesConsistentIds,
zoneDescriptor.replicas(),
revision,
metaStorageManager,
tableAssignments.get(replicaGrpId.partitionId()).nodes()
)).thenAccept(res -> {
LOG.info("Partition {} returned {} status on reset attempt", replicaGrpId, UpdateStatus.valueOf(res));
}
);
}
return futures;
}
private static CompletableFuture<Integer> manualPartitionUpdate(
TablePartitionId partId,
Collection<String> aliveDataNodes,
Set<String> aliveNodesConsistentIds,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
Set<Assignment> currentAssignments
) {
// TODO https://issues.apache.org/jira/browse/IGNITE-21303
// This is a naive approach that doesn't exclude nodes in error state, if they exist.
Set<Assignment> partAssignments = new HashSet<>();
for (Assignment currentAssignment : currentAssignments) {
if (aliveNodesConsistentIds.contains(currentAssignment.consistentId())) {
partAssignments.add(currentAssignment);
}
}
if (partAssignments.size() >= (replicas / 2 + 1)) {
return CompletableFuture.completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
}
Set<Assignment> calcAssignments = AffinityUtils.calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), replicas);
for (Assignment calcAssignment : calcAssignments) {
if (partAssignments.size() == replicas) {
break;
}
partAssignments.add(calcAssignment);
}
if (currentAssignments.equals(calcAssignments)) {
return CompletableFuture.completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
}
byte[] partAssignmentsBytes = Assignments.forced(partAssignments).toBytes();
byte[] revisionBytes = ByteUtils.longToBytes(revision);
ByteArray partChangeTriggerKey = pendingChangeTriggerKey(partId);
ByteArray partAssignmentsPendingKey = pendingPartAssignmentsKey(partId);
ByteArray partAssignmentsPlannedKey = plannedPartAssignmentsKey(partId);
Iif iif = iif(
notExists(partChangeTriggerKey).or(value(partChangeTriggerKey).lt(revisionBytes)),
iif(
value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
ops(
put(partAssignmentsPendingKey, partAssignmentsBytes),
put(partChangeTriggerKey, revisionBytes),
remove(partAssignmentsPlannedKey)
).yield(PENDING_KEY_UPDATED.ordinal()),
ops().yield(ASSIGNMENT_NOT_UPDATED.ordinal())
),
ops().yield(OUTDATED_UPDATE_RECEIVED.ordinal())
);
return metaStorageMgr.invoke(iif).thenApply(StatementResult::getAsInt);
}
/** Key prefix for pending assignments. */
public static final String PENDING_ASSIGNMENTS_PREFIX = "assignments.pending.";
/** Key prefix for stable assignments. */
public static final String STABLE_ASSIGNMENTS_PREFIX = "assignments.stable.";
/** Key prefix for switch reduce assignments. */
public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = "assignments.switch.reduce.";
/** Key prefix for switch append assignments. */
public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = "assignments.switch.append.";
/** Key prefix for counter of rebalances of tables from a zone that are associated with the specified partition. */
private static final String TABLES_COUNTER_PREFIX = "tables.counter.";
/** Key prefix for a raft configuration that was applied during rebalance of the specified partition form a table. */
private static final String RAFT_CONF_APPLIED_PREFIX = "assignments.raft.conf.applied.";
/**
* Key that is needed for skipping stale events of pending key change.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray pendingChangeTriggerKey(TablePartitionId partId) {
return new ByteArray(partId + "pending.change.trigger");
}
/**
* Key that is needed for skipping stale events of stable key change.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray stableChangeTriggerKey(TablePartitionId partId) {
return new ByteArray(partId + "stable.change.trigger");
}
/**
* Key that is needed for the rebalance algorithm.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray pendingPartAssignmentsKey(TablePartitionId partId) {
return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + partId);
}
/**
* Key that is needed for the rebalance algorithm.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId) {
return new ByteArray("assignments.planned." + partId);
}
/**
* Key that is needed for the rebalance algorithm.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray stablePartAssignmentsKey(TablePartitionId partId) {
return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
}
/**
* Key that is needed for the rebalance algorithm.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray switchReduceKey(TablePartitionId partId) {
return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
}
/**
* Key that is needed for the rebalance algorithm.
*
* @param partId Unique identifier of a partition.
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance documentation</a>
*/
public static ByteArray switchAppendKey(TablePartitionId partId) {
return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
}
/**
* ByteArray key for a counter of rebalances of tables from a zone that are associated with the specified partition.
*
* @param zoneId Identifier of a zone.
* @param partId Unique identifier of a partition.
* @return Key for a partition.
*/
public static ByteArray tablesCounterKey(int zoneId, int partId) {
return new ByteArray(TABLES_COUNTER_PREFIX + zoneId + "_part_" + partId);
}
/**
* ByteArray prefix for counter of rebalances of tables from a zone that are associated with the specified partition.
*
* @return Prefix for a counter of rebalances of tables partition.
*/
public static ByteArray tablesCounterPrefixKey() {
return new ByteArray(TABLES_COUNTER_PREFIX);
}
/**
* ByteArray key for a raft configuration that was applied during rebalance of the specified partition form a table.
*
* @param partId Unique identifier of a partition.
* @return Key for a applied raft configuration.
*/
public static ByteArray raftConfigurationAppliedKey(TablePartitionId partId) {
return new ByteArray(RAFT_CONF_APPLIED_PREFIX + partId);
}
/**
* Extract table id from a metastorage key of partition.
*
* @param key Key.
* @param prefix Key prefix.
* @return Table id.
*/
public static int extractTableId(byte[] key, String prefix) {
String strKey = new String(key, StandardCharsets.UTF_8);
return Integer.parseInt(strKey.substring(prefix.length(), strKey.indexOf("_part_")));
}
/**
* Extract table id from a metastorage key of partition.
*
* @param key Key.
* @param prefix Key prefix.
* @return Table id.
*/
public static int extractZoneId(byte[] key, String prefix) {
String strKey = new String(key, StandardCharsets.UTF_8);
return Integer.parseInt(strKey.substring(prefix.length()));
}
/**
* Extract zone id from a metastorage key {@link RebalanceUtil#tablesCounterKey}.
*
* @param key Key.
* @return Table id.
*/
static int extractZoneIdFromTablesCounter(byte[] key) {
String strKey = new String(key, StandardCharsets.UTF_8);
return Integer.parseInt(strKey.substring(TABLES_COUNTER_PREFIX.length(), strKey.indexOf("_part_")));
}
/**
* Extract partition number from the rebalance key of partition.
*
* @param key Key.
* @return Partition number.
*/
public static int extractPartitionNumber(byte[] key) {
var strKey = new String(key, StandardCharsets.UTF_8);
return Integer.parseInt(strKey.substring(strKey.indexOf("_part_") + "_part_".length()));
}
/**
* Checks if an error is recoverable, so we can retry a rebalance intent.
*
* @param t The throwable.
* @return {@code True} if this is a recoverable exception.
*/
public static boolean recoverable(Throwable t) {
// As long as we don't have a general failure handler, we assume that all errors are recoverable.
return true;
}
/**
* Removes nodes from set of nodes.
*
* @param minuend Set to remove nodes from.
* @param subtrahend Set of nodes to be removed.
* @return Result of the subtraction.
*/
public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(toSet());
}
/**
* Adds nodes to the set of nodes.
*
* @param op1 First operand.
* @param op2 Second operand.
* @return Result of the addition.
*/
public static <T> Set<T> union(Set<T> op1, Set<T> op2) {
var res = new HashSet<>(op1);
res.addAll(op2);
return res;
}
/**
* Returns an intersection of two set of nodes.
*
* @param op1 First operand.
* @param op2 Second operand.
* @return Result of the intersection.
*/
public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
return op1.stream().filter(op2::contains).collect(toSet());
}
/**
* Returns partition assignments from meta storage.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table ID.
* @param partitionId Partition ID.
* @return Future with partition assignments as a value.
*/
public static CompletableFuture<Set<Assignment>> partitionAssignments(
MetaStorageManager metaStorageManager,
int tableId,
int partitionId
) {
return metaStorageManager
.get(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionId)))
.thenApply(e -> (e.value() == null) ? null : Assignments.fromBytes(e.value()).nodes());
}
/**
* Returns partition assignments from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
* @param partitionNumber Partition number.
* @param revision Revision.
* @return Returns partition assignments from meta storage locally or {@code null} if assignments is absent.
*/
@Nullable
public static Set<Assignment> partitionAssignmentsGetLocally(
MetaStorageManager metaStorageManager,
int tableId,
int partitionNumber,
long revision
) {
Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionNumber)), revision);
return (entry == null || entry.empty() || entry.tombstone()) ? null : Assignments.fromBytes(entry.value()).nodes();
}
/**
* Returns table assignments for table partitions from meta storage.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
* @param partitionIds IDs of partitions to get assignments for. If empty, get all partition assignments.
* @param numberOfPartitions Number of partitions. Ignored if partition IDs are specified.
* @return Future with table assignments as a value.
*/
static CompletableFuture<Map<Integer, Assignments>> tableAssignments(
MetaStorageManager metaStorageManager,
int tableId,
Set<Integer> partitionIds,
int numberOfPartitions
) {
Map<ByteArray, Integer> partitionKeysToPartitionNumber = new HashMap<>();
Collection<Integer> ids = partitionIds.isEmpty()
? IntStream.range(0, numberOfPartitions).boxed().collect(toList())
: partitionIds;
for (Integer partId : ids) {
partitionKeysToPartitionNumber.put(stablePartAssignmentsKey(new TablePartitionId(tableId, partId)), partId);
}
return metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
.thenApply(entries -> {
if (entries.isEmpty()) {
return Map.of();
}
Map<Integer, Assignments> result = new HashMap<>();
int numberOfMsPartitions = 0;
for (var mapEntry : entries.entrySet()) {
Entry entry = mapEntry.getValue();
if (!entry.empty() && !entry.tombstone()) {
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), Assignments.fromBytes(entry.value()));
numberOfMsPartitions++;
}
}
assert numberOfMsPartitions == 0 || numberOfMsPartitions == entries.size()
: "Invalid number of stable partition entries received from meta storage [received="
+ numberOfMsPartitions + ", numberOfPartitions=" + entries.size() + ", tableId=" + tableId + "].";
return numberOfMsPartitions == 0 ? Map.of() : result;
});
}
/**
* Returns table assignments for all table partitions from meta storage locally. Assignments must be present.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
* @param numberOfPartitions Number of partitions.
* @param revision Revision.
* @return Future with table assignments as a value.
*/
public static List<Assignments> tableAssignmentsGetLocally(
MetaStorageManager metaStorageManager,
int tableId,
int numberOfPartitions,
long revision
) {
return IntStream.range(0, numberOfPartitions)
.mapToObj(p -> {
Entry e = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, p)), revision);
assert e != null && !e.empty() && !e.tombstone() : e;
return Assignments.fromBytes(e.value());
})
.collect(toList());
}
}