blob: 2a3ccecec1b44e1163f86017d9808c94f4b68dc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.utils.RebalanceUtil.recoverable;
import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssignmentsKeys;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.causality.VersionedValue;
import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
import org.apache.ignite.internal.configuration.schema.ExtendedTableView;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactory;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteObjectName;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Table manager.
*/
public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
IgniteComponent {
// TODO get rid of this in future? IGNITE-17307
/** Timeout to complete the tablesByIdVv on revision update. */
private static final long TABLES_COMPLETE_TIMEOUT = 120;
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(TableManager.class);
/**
* If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be
* skipped, and the local property will be returned.
* TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
* TODO: will be removed after fix of the issue.
*/
private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
/** Tables configuration. */
private final TablesConfiguration tablesCfg;
/** Raft manager. */
private final Loza raftMgr;
/** Baseline manager. */
private final BaselineManager baselineMgr;
/** Transaction manager. */
private final TxManager txManager;
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
/** Data storage manager. */
private final DataStorageManager dataStorageMgr;
/** Here a table future stores during creation (until the table can be provided to client). */
private final Map<UUID, CompletableFuture<Table>> tableCreateFuts = new ConcurrentHashMap<>();
/** Versioned store for tables by id. */
private final VersionedValue<Map<UUID, TableImpl>> tablesByIdVv;
/** Set of futures that should complete before completion of {@link #tablesByIdVv}, after completion this set is cleared. */
private final Set<CompletableFuture<?>> beforeTablesVvComplete = new ConcurrentHashSet<>();
/**
* {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails,
* so we can stop resources associated with the table.
*/
private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new ConcurrentHashMap<>();
/** Resolver that resolves a network address to node id. */
private final Function<NetworkAddress, String> netAddrResolver;
/** Resolver that resolves a network address to cluster node. */
private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
/** Schema manager. */
private final SchemaManager schemaManager;
/** Executor for scheduling retries of a rebalance. */
private final ScheduledExecutorService rebalanceScheduler;
/** Separate executor for IO operations like partition storage initialization
* or partition raft group meta data persisting.
*/
private final ExecutorService ioExecutor;
/** Rebalance scheduler pool size. */
private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
/**
* Creates a new table manager.
*
* @param registry Registry for versioned values.
* @param tablesCfg Tables configuration.
* @param raftMgr Raft manager.
* @param baselineMgr Baseline manager.
* @param txManager Transaction manager.
* @param dataStorageMgr Data storage manager.
* @param schemaManager Schema manager.
*/
public TableManager(
Consumer<Function<Long, CompletableFuture<?>>> registry,
TablesConfiguration tablesCfg,
Loza raftMgr,
BaselineManager baselineMgr,
TopologyService topologyService,
TxManager txManager,
DataStorageManager dataStorageMgr,
MetaStorageManager metaStorageMgr,
SchemaManager schemaManager
) {
this.tablesCfg = tablesCfg;
this.raftMgr = raftMgr;
this.baselineMgr = baselineMgr;
this.txManager = txManager;
this.dataStorageMgr = dataStorageMgr;
this.metaStorageMgr = metaStorageMgr;
this.schemaManager = schemaManager;
netAddrResolver = addr -> {
ClusterNode node = topologyService.getByAddress(addr);
if (node == null) {
throw new IllegalStateException("Can't resolve ClusterNode by its networkAddress=" + addr);
}
return node.id();
};
clusterNodeResolver = topologyService::getByAddress;
tablesByIdVv = new VersionedValue<>(null, HashMap::new);
registry.accept(token -> {
List<CompletableFuture<?>> futures = new ArrayList<>(beforeTablesVvComplete);
beforeTablesVvComplete.clear();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}))
.orTimeout(TABLES_COMPLETE_TIMEOUT, TimeUnit.SECONDS)
.whenComplete((v, e) -> {
if (!busyLock.enterBusy()) {
if (e != null) {
LOG.warn("Error occurred while updating tables and stopping components.", e);
// Stop of the components has been started, so we do nothing and resources of tablesByIdVv will be
// freed in the logic of TableManager stop. We cannot complete tablesByIdVv exceptionally because
// we will lose a context of tables.
}
return;
}
try {
if (e != null) {
LOG.warn("Error occurred while updating tables.", e);
if (e instanceof CompletionException) {
Throwable th = e.getCause();
// Case when stopping of the previous component has been started and related futures completed
// exceptionally
if (th instanceof NodeStoppingException || (th.getCause() != null
&& th.getCause() instanceof NodeStoppingException)) {
// Stop of the components has been started so we do nothing and resources will be freed in the
// logic of TableManager stop
return;
}
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-17515
tablesByIdVv.completeExceptionally(token, e);
}
//Normal scenario, when all related futures for tablesByIdVv are completed and we can complete tablesByIdVv
tablesByIdVv.complete(token);
tablesToStopInCaseOfError.clear();
} finally {
busyLock.leaveBusy();
}
});
});
rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
new NamedThreadFactory("rebalance-scheduler", LOG));
ioExecutor = new ThreadPoolExecutor(
Math.min(Utils.cpus() * 3, 25),
Integer.MAX_VALUE,
100,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("tableManager-io", LOG));
}
/** {@inheritDoc} */
@Override
public void start() {
tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas);
registerRebalanceListeners();
((ExtendedTableConfiguration) tablesCfg.tables().any()).assignments().listen(this::onUpdateAssignments);
tablesCfg.tables().listenElements(new ConfigurationNamedListListener<>() {
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<TableView> ctx) {
return onTableCreate(ctx);
}
@Override
public CompletableFuture<?> onRename(String oldName, String newName, ConfigurationNotificationEvent<TableView> ctx) {
// TODO: IGNITE-15485 Support table rename operation.
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<TableView> ctx) {
return onTableDelete(ctx);
}
});
schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable exception) {
if (tablesByIdVv.latest().get(parameters.tableId()) != null) {
fireEvent(
TableEvent.ALTER,
new TableEventParameters(parameters.causalityToken(), tablesByIdVv.latest().get(parameters.tableId())), null
);
}
return completedFuture(false);
}
});
}
/**
* Listener of table create configuration change.
*
* @param ctx Table configuration context.
* @return A future.
*/
private CompletableFuture<?> onTableCreate(ConfigurationNotificationEvent<TableView> ctx) {
if (!busyLock.enterBusy()) {
String tblName = ctx.newValue().name();
UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
fireEvent(TableEvent.CREATE,
new TableEventParameters(ctx.storageRevision(), tblId, tblName),
new NodeStoppingException()
);
return failedFuture(new NodeStoppingException());
}
try {
return createTableLocally(
ctx.storageRevision(),
ctx.newValue().name(),
((ExtendedTableView) ctx.newValue()).id(),
ctx.newValue().partitions()
);
} finally {
busyLock.leaveBusy();
}
}
/**
* Listener of table drop configuration change.
*
* @param ctx Table configuration context.
* @return A future.
*/
private CompletableFuture<?> onTableDelete(ConfigurationNotificationEvent<TableView> ctx) {
if (!busyLock.enterBusy()) {
String tblName = ctx.oldValue().name();
UUID tblId = ((ExtendedTableView) ctx.oldValue()).id();
fireEvent(
TableEvent.DROP,
new TableEventParameters(ctx.storageRevision(), tblId, tblName),
new NodeStoppingException()
);
return failedFuture(new NodeStoppingException());
}
try {
dropTableLocally(
ctx.storageRevision(),
ctx.oldValue().name(),
((ExtendedTableView) ctx.oldValue()).id(),
(List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
);
} finally {
busyLock.leaveBusy();
}
return CompletableFuture.completedFuture(null);
}
/**
* Listener of replicas configuration changes.
*
* @param replicasCtx Replicas configuration event context.
* @return A future, which will be completed, when event processed by listener.
*/
private CompletableFuture<?> onUpdateReplicas(ConfigurationNotificationEvent<Integer> replicasCtx) {
if (!busyLock.enterBusy()) {
return CompletableFuture.completedFuture(new NodeStoppingException());
}
try {
if (replicasCtx.oldValue() != null && replicasCtx.oldValue() > 0) {
TableConfiguration tblCfg = replicasCtx.config(TableConfiguration.class);
LOG.info("Received update for replicas number [table={}, oldNumber={}, newNumber={}]",
tblCfg.name().value(), replicasCtx.oldValue(), replicasCtx.newValue());
int partCnt = tblCfg.partitions().value();
int newReplicas = replicasCtx.newValue();
CompletableFuture<?>[] futures = new CompletableFuture<?>[partCnt];
for (int i = 0; i < partCnt; i++) {
String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
futures[i] = updatePendingAssignmentsKeys(
tblCfg.name().value(), partId, baselineMgr.nodes(),
partCnt, newReplicas,
replicasCtx.storageRevision(), metaStorageMgr, i);
}
return CompletableFuture.allOf(futures);
} else {
return CompletableFuture.completedFuture(null);
}
} finally {
busyLock.leaveBusy();
}
}
/**
* Listener of assignment configuration changes.
*
* @param assignmentsCtx Assignment configuration context.
* @return A future.
*/
private CompletableFuture<?> onUpdateAssignments(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
try {
updateAssignmentInternal(assignmentsCtx);
} finally {
busyLock.leaveBusy();
}
return completedFuture(null);
}
/**
* Updates or creates partition raft groups.
*
* @param assignmentsCtx Change assignment event.
*/
private void updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
ExtendedTableConfiguration tblCfg = assignmentsCtx.config(ExtendedTableConfiguration.class);
UUID tblId = tblCfg.id().value();
long causalityToken = assignmentsCtx.storageRevision();
List<List<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null :
(List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.oldValue());
List<List<ClusterNode>> newAssignments = (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.newValue());
// Empty assignments might be a valid case if tables are created from within cluster init HOCON
// configuration, which is not supported now.
assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tblId);
int partitions = newAssignments.size();
CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
// TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
// TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
// TODO: be exact same amount of partitions and replicas for both old and new assignments
for (int i = 0; i < partitions; i++) {
int partId = i;
List<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptyList() :
oldAssignments.get(partId);
List<ClusterNode> newPartAssignment = newAssignments.get(partId);
// Create new raft nodes according to new assignments.
tablesByIdVv.update(causalityToken, (tablesById, e) -> {
if (e != null) {
return failedFuture(e);
}
InternalTable internalTbl = tablesById.get(tblId).internalTable();
// TODO: IGNITE-17197 Remove assert after the ticket is resolved.
assert internalTbl.storage() instanceof MvTableStorage :
"Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName();
// start new nodes, only if it is table creation
// other cases will be covered by rebalance logic
List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptyList();
String grpId = partitionRaftGroupName(tblId, partId);
CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture
.supplyAsync(
() -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
.thenComposeAsync((partitionStorage) -> {
RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, partitionStorage,
newPartAssignment);
try {
raftMgr.startRaftGroupNode(
grpId,
newPartAssignment,
new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)),
new RebalanceRaftGroupEventsListener(
metaStorageMgr,
tablesCfg.tables().get(tablesById.get(tblId).name()),
grpId,
partId,
busyLock,
movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
rebalanceScheduler
),
groupOptions
);
return CompletableFuture.completedFuture(null);
} catch (NodeStoppingException ex) {
return CompletableFuture.failedFuture(ex);
}
}, ioExecutor);
}
futures[partId] = startGroupFut
.thenComposeAsync((v) -> {
try {
return raftMgr.startRaftGroupService(grpId, newPartAssignment);
} catch (NodeStoppingException ex) {
return CompletableFuture.failedFuture(ex);
}
}, ioExecutor)
.thenAccept(
updatedRaftGroupService -> ((InternalTableImpl) internalTbl)
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
).exceptionally(th -> {
LOG.warn("Unable to update raft groups on the node", th);
return null;
});
return completedFuture(tablesById);
});
}
CompletableFuture.allOf(futures).join();
}
private RaftGroupOptions groupOptionsForPartition(
InternalTable internalTbl,
MvPartitionStorage partitionStorage,
List<ClusterNode> peers
) {
RaftGroupOptions raftGroupOptions;
if (internalTbl.storage().isVolatile()) {
raftGroupOptions = RaftGroupOptions.forVolatileStores()
.setLogStorageFactory(new VolatileLogStorageFactory())
.raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage());
} else {
raftGroupOptions = RaftGroupOptions.forPersistentStores();
}
//TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17420
raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory(
partitionStorage,
peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
List.of()
));
return raftGroupOptions;
}
/** {@inheritDoc} */
@Override
public void stop() {
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
Map<UUID, TableImpl> tables = tablesByIdVv.latest();
cleanUpTablesResources(tables);
cleanUpTablesResources(tablesToStopInCaseOfError);
tablesToStopInCaseOfError.clear();
shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
}
/**
* Stops resources that are related to provided tables.
*
* @param tables Tables to stop.
*/
private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
for (TableImpl table : tables.values()) {
try {
for (int p = 0; p < table.internalTable().partitions(); p++) {
raftMgr.stopRaftGroup(partitionRaftGroupName(table.tableId(), p));
}
table.internalTable().storage().stop();
table.internalTable().close();
} catch (Exception e) {
LOG.info("Unable to stop table [name={}]", e, table.name());
}
}
}
/**
* Gets a list of the current table assignments.
*
* <p>Returns a list where on the i-th place resides a node id that considered as a leader for
* the i-th partition on the moment of invocation.
*
* @param tableId Unique id of a table.
* @return List of the current assignments.
*/
public List<String> assignments(UUID tableId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
return table(tableId).internalTable().assignments();
} finally {
busyLock.leaveBusy();
}
}
/**
* Creates local structures for a table.
*
* @param causalityToken Causality token.
* @param name Table name.
* @param tblId Table id.
* @param partitions Count of partitions.
* @return Future that will be completed when local changes related to the table creation are applied.
*/
private CompletableFuture<?> createTableLocally(long causalityToken, String name, UUID tblId, int partitions) {
TableConfiguration tableCfg = tablesCfg.tables().get(name);
MvTableStorage tableStorage = dataStorageMgr.engine(tableCfg.dataStorage()).createMvTable(tableCfg);
tableStorage.start();
InternalTableImpl internalTable = new InternalTableImpl(name, tblId, new Int2ObjectOpenHashMap<>(partitions),
partitions, netAddrResolver, clusterNodeResolver, txManager, tableStorage);
var table = new TableImpl(internalTable);
tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
var val = new HashMap<>(previous);
val.put(tblId, table);
return completedFuture(val);
}));
CompletableFuture<?> schemaFut = schemaManager.schemaRegistry(causalityToken, tblId)
.thenAccept(schema -> inBusyLock(busyLock, () -> table.schemaView(schema)))
.thenCompose(
v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table)))
);
beforeTablesVvComplete.add(schemaFut);
tablesToStopInCaseOfError.put(tblId, table);
// TODO should be reworked in IGNITE-16763
return tablesByIdVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
}
/**
* Completes appropriate future to return result from API {@link TableManager#createTable(String, Consumer)}.
*
* @param table Table.
*/
private void completeApiCreateFuture(TableImpl table) {
CompletableFuture<Table> tblFut = tableCreateFuts.get(table.tableId());
if (tblFut != null) {
tblFut.complete(table);
tableCreateFuts.values().removeIf(fut -> fut == tblFut);
}
}
/**
* Drops local structures for a table.
*
* @param causalityToken Causality token.
* @param name Table name.
* @param tblId Table id.
* @param assignment Affinity assignment.
*/
private void dropTableLocally(long causalityToken, String name, UUID tblId, List<List<ClusterNode>> assignment) {
try {
int partitions = assignment.size();
for (int p = 0; p < partitions; p++) {
raftMgr.stopRaftGroup(partitionRaftGroupName(tblId, p));
}
tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
var map = new HashMap<>(previousVal);
map.remove(tblId);
return completedFuture(map);
}));
TableImpl table = tablesByIdVv.latest().get(tblId);
assert table != null : IgniteStringFormatter.format("There is no table with the name specified [name={}, id={}]",
name, tblId);
table.internalTable().storage().destroy();
CompletableFuture<?> fut = schemaManager.dropRegistry(causalityToken, table.tableId())
.thenCompose(
v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)))
);
beforeTablesVvComplete.add(fut);
} catch (Exception e) {
fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tblId, name), e);
}
}
/**
* Compounds a RAFT group unique name.
*
* @param tblId Table identifier.
* @param partition Number of table partitions.
* @return A RAFT group name.
*/
@NotNull
private String partitionRaftGroupName(UUID tblId, int partition) {
return tblId + "_part_" + partition;
}
/** {@inheritDoc} */
@Override
public Table createTable(String name, Consumer<TableChange> tableInitChange) {
return join(createTableAsync(name, tableInitChange));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return createTableAsyncInternal(IgniteObjectName.parseCanonicalName(name), tableInitChange);
} finally {
busyLock.leaveBusy();
}
}
/**
* Internal method that creates a new table with the given {@code name} asynchronously. If a table with the same name already exists,
* a future will be completed with {@link TableAlreadyExistsException}.
*
* @param name Table name.
* @param tableInitChange Table changer.
* @return Future representing pending completion of the operation.
* @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when:
* <ul>
* <li>the node is stopping.</li>
* </ul>
* @see TableAlreadyExistsException
*/
private CompletableFuture<Table> createTableAsyncInternal(String name, Consumer<TableChange> tableInitChange) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
tableAsyncInternal(name).thenAccept(tbl -> {
if (tbl != null) {
tblFut.completeExceptionally(new TableAlreadyExistsException(name));
} else {
tablesCfg.change(tablesChange -> tablesChange.changeTables(tablesListChange -> {
if (tablesListChange.get(name) != null) {
throw new TableAlreadyExistsException(name);
}
tablesListChange.create(name, (tableChange) -> {
tableChange.changeDataStorage(
dataStorageMgr.defaultTableDataStorageConsumer(tablesChange.defaultDataStorage())
);
tableInitChange.accept(tableChange);
var extConfCh = ((ExtendedTableChange) tableChange);
int intTableId = tablesChange.globalIdCounter() + 1;
tablesChange.changeGlobalIdCounter(intTableId);
extConfCh.changeTableId(intTableId);
tableCreateFuts.put(extConfCh.id(), tblFut);
// Affinity assignments calculation.
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
baselineMgr.nodes(),
tableChange.partitions(),
tableChange.replicas())))
// Table schema preparation.
.changeSchemas(schemasCh -> schemasCh.create(
String.valueOf(INITIAL_SCHEMA_VERSION),
schemaCh -> {
SchemaDescriptor schemaDesc;
//TODO IGNITE-15747 Remove try-catch and force configuration
// validation here to ensure a valid configuration passed to
// prepareSchemaDescriptor() method.
try {
schemaDesc = SchemaUtils.prepareSchemaDescriptor(
((ExtendedTableView) tableChange).schemas().size(),
tableChange);
} catch (IllegalArgumentException ex) {
throw new ConfigurationValidationException(ex.getMessage());
}
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
}
));
});
})).exceptionally(t -> {
Throwable ex = getRootCause(t);
if (ex instanceof TableAlreadyExistsException) {
tblFut.completeExceptionally(ex);
} else {
LOG.debug("Unable to create table [name={}]", ex, name);
tblFut.completeExceptionally(ex);
tableCreateFuts.values().removeIf(fut -> fut == tblFut);
}
return null;
});
}
});
return tblFut;
}
/** {@inheritDoc} */
@Override
public void alterTable(String name, Consumer<TableChange> tableChange) {
join(alterTableAsync(name, tableChange));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return alterTableAsyncInternal(IgniteObjectName.parseCanonicalName(name), tableChange);
} finally {
busyLock.leaveBusy();
}
}
/**
* Internal method that alters a cluster table. If an appropriate table does not exist, a future will be
* completed with {@link TableNotFoundException}.
*
* @param name Table name.
* @param tableChange Table changer.
* @return Future representing pending completion of the operation.
* @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when:
* <ul>
* <li>the node is stopping.</li>
* </ul>
* @see TableNotFoundException
*/
@NotNull
private CompletableFuture<Void> alterTableAsyncInternal(String name, Consumer<TableChange> tableChange) {
CompletableFuture<Void> tblFut = new CompletableFuture<>();
tableAsync(name).thenAccept(tbl -> {
if (tbl == null) {
tblFut.completeExceptionally(new TableNotFoundException(name));
} else {
TableImpl tblImpl = (TableImpl) tbl;
tablesCfg.tables().change(ch -> {
if (ch.get(name) == null) {
throw new TableNotFoundException(name);
}
ch.update(name, tblCh -> {
tableChange.accept(tblCh);
((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
SchemaDescriptor descriptor;
//TODO IGNITE-15747 Remove try-catch and force configuration validation
// here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
try {
descriptor = SchemaUtils.prepareSchemaDescriptor(
((ExtendedTableView) tblCh).schemas().size(),
tblCh);
descriptor.columnMapping(SchemaUtils.columnMapper(
tblImpl.schemaView().schema(currTableView.schemas().size()),
currTableView,
descriptor,
tblCh));
} catch (IllegalArgumentException ex) {
// Convert unexpected exceptions here,
// because validation actually happens later,
// when bulk configuration update is applied.
ConfigurationValidationException e =
new ConfigurationValidationException(ex.getMessage());
e.addSuppressed(ex);
throw e;
}
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
}));
}
);
}).whenComplete((res, t) -> {
if (t != null) {
Throwable ex = getRootCause(t);
if (ex instanceof TableNotFoundException) {
tblFut.completeExceptionally(ex);
} else {
LOG.debug("Unable to modify table [name={}]", ex, name);
tblFut.completeExceptionally(ex);
}
} else {
tblFut.complete(res);
}
});
}
});
return tblFut;
}
/**
* Gets a cause exception for a client.
*
* @param t Exception wrapper.
* @return A root exception which will be acceptable to throw for public API.
*/
//TODO: IGNITE-16051 Implement exception converter for public API.
private @NotNull IgniteException getRootCause(Throwable t) {
Throwable ex;
if (t instanceof CompletionException) {
if (t.getCause() instanceof ConfigurationChangeException) {
ex = t.getCause().getCause();
} else {
ex = t.getCause();
}
} else {
ex = t;
}
return ex instanceof IgniteException ? (IgniteException) ex : new IgniteException(ex);
}
/** {@inheritDoc} */
@Override
public void dropTable(String name) {
join(dropTableAsync(name));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> dropTableAsync(String name) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return dropTableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
} finally {
busyLock.leaveBusy();
}
}
/**
* Internal method that drops a table with the name specified. If appropriate table does not be found, a future will be
* completed with {@link TableNotFoundException}.
*
* @param name Table name.
* @return Future representing pending completion of the operation.
* @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when:
* <ul>
* <li>the node is stopping.</li>
* </ul>
* @see TableNotFoundException
*/
@NotNull
private CompletableFuture<Void> dropTableAsyncInternal(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
tableAsyncInternal(name).thenAccept(tbl -> {
// In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such
// distributed table and the local config has lagged behind.
if (tbl == null) {
dropTblFut.completeExceptionally(new TableNotFoundException(name));
} else {
tablesCfg.tables()
.change(change -> {
if (change.get(name) == null) {
throw new TableNotFoundException(name);
}
change.delete(name);
})
.whenComplete((res, t) -> {
if (t != null) {
Throwable ex = getRootCause(t);
if (ex instanceof TableNotFoundException) {
dropTblFut.completeExceptionally(ex);
} else {
LOG.debug("Unable to drop table [name={}]", ex, name);
dropTblFut.completeExceptionally(ex);
}
} else {
dropTblFut.complete(res);
}
});
}
});
return dropTblFut;
}
/** {@inheritDoc} */
@Override
public List<Table> tables() {
return join(tablesAsync());
}
/** {@inheritDoc} */
@Override
public CompletableFuture<List<Table>> tablesAsync() {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return tablesAsyncInternal();
} finally {
busyLock.leaveBusy();
}
}
/**
* Internal method for getting table.
*
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
// TODO: IGNITE-16288 directTableIds should use async configuration API
return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, this::directTableIds))
.thenCompose(tableIds -> inBusyLock(busyLock, () -> {
var tableFuts = new CompletableFuture[tableIds.size()];
var i = 0;
for (UUID tblId : tableIds) {
tableFuts[i++] = tableAsyncInternal(tblId, false);
}
return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> {
var tables = new ArrayList<Table>(tableIds.size());
try {
for (var fut : tableFuts) {
var table = fut.get();
if (table != null) {
tables.add((Table) table);
}
}
} catch (Throwable t) {
throw new CompletionException(t);
}
return tables;
}));
}));
}
/**
* Collects a list of direct table ids.
*
* @return A list of direct table ids.
*/
private List<UUID> directTableIds() {
return ConfigurationUtil.internalIds(directProxy(tablesCfg.tables()));
}
/**
* Gets direct id of table with {@code tblName}.
*
* @param tblName Name of the table.
* @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found.
*/
@Nullable
private UUID directTableId(String tblName) {
try {
ExtendedTableConfiguration exTblCfg = ((ExtendedTableConfiguration) directProxy(tablesCfg.tables()).get(tblName));
if (exTblCfg == null) {
return null;
} else {
return exTblCfg.id().value();
}
} catch (NoSuchElementException e) {
return null;
}
}
/**
* Actual tables map.
*
* @return Actual tables map.
*/
@TestOnly
public Map<UUID, TableImpl> latestTables() {
return unmodifiableMap(tablesByIdVv.latest());
}
/** {@inheritDoc} */
@Override
public Table table(String name) {
return join(tableAsync(name));
}
/** {@inheritDoc} */
@Override
public TableImpl table(UUID id) throws NodeStoppingException {
return join(tableAsync(id));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Table> tableAsync(String name) {
return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name))
.thenApply(Function.identity());
}
/** {@inheritDoc} */
@Override
public CompletableFuture<TableImpl> tableAsync(UUID id) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return tableAsyncInternal(id, true);
} finally {
busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override
public TableImpl tableImpl(String name) {
return join(tableImplAsync(name));
}
/** {@inheritDoc} */
@Override
public CompletableFuture<TableImpl> tableImplAsync(String name) {
return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
}
/**
* Gets a table by name, if it was created before. Doesn't parse canonical name.
*
* @param name Table name.
* @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation.
* */
public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
UUID tableId = directTableId(name);
if (tableId == null) {
return CompletableFuture.completedFuture(null);
}
return tableAsyncInternal(tableId, false);
} finally {
busyLock.leaveBusy();
}
}
/**
* Internal method for getting table by id.
*
* @param id Table id.
* @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false} otherwise.
* @return Future representing pending completion of the operation.
*/
public CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean checkConfiguration) {
if (checkConfiguration && !isTableConfigured(id)) {
return CompletableFuture.completedFuture(null);
}
var tbl = tablesByIdVv.latest().get(id);
if (tbl != null) {
return CompletableFuture.completedFuture(tbl);
}
CompletableFuture<TableImpl> getTblFut = new CompletableFuture<>();
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override
public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
if (!id.equals(parameters.tableId())) {
return completedFuture(false);
}
if (e == null) {
tablesByIdVv.get(parameters.causalityToken()).thenRun(() -> getTblFut.complete(parameters.table()));
} else {
getTblFut.completeExceptionally(e);
}
return completedFuture(true);
}
@Override
public void remove(@NotNull Throwable e) {
getTblFut.completeExceptionally(e);
}
};
listen(TableEvent.CREATE, clo);
tbl = tablesByIdVv.latest().get(id);
if (tbl != null && getTblFut.complete(tbl) || !isTableConfigured(id) && getTblFut.complete(null)) {
removeListener(TableEvent.CREATE, clo, null);
}
return getTblFut;
}
/**
* Checks that the table is configured with specific id.
*
* @param id Table id.
* @return True when the table is configured into cluster, false otherwise.
*/
private boolean isTableConfigured(UUID id) {
try {
((ExtendedTableConfiguration) getByInternalId(directProxy(tablesCfg.tables()), id)).id().value();
return true;
} catch (NoSuchElementException e) {
return false;
}
}
/**
* Waits for future result and return, or unwraps {@link CompletionException} to {@link IgniteException} if failed.
*
* @param future Completable future.
* @return Future result.
*/
private <T> T join(CompletableFuture<T> future) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
return future.join();
} catch (CompletionException ex) {
throw convertThrowable(ex.getCause());
} finally {
busyLock.leaveBusy();
}
}
/**
* Convert to public throwable.
*
* @param th Throwable.
* @return Public throwable.
*/
private RuntimeException convertThrowable(Throwable th) {
if (th instanceof RuntimeException) {
return (RuntimeException) th;
}
return new IgniteException(th);
}
/**
* Register the new meta storage listener for changes in the rebalance-specific keys.
*/
private void registerRebalanceListeners() {
metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), new WatchListener() {
@Override
public boolean onUpdate(@NotNull WatchEvent evt) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(new NodeStoppingException());
}
try {
assert evt.single();
Entry pendingAssignmentsWatchEvent = evt.entryEvent().newEntry();
if (pendingAssignmentsWatchEvent.value() == null) {
return true;
}
int part = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
UUID tblId = extractTableId(pendingAssignmentsWatchEvent.key(), PENDING_ASSIGNMENTS_PREFIX);
String partId = partitionRaftGroupName(tblId, part);
// Assignments of the pending rebalance that we received through the meta storage watch mechanism.
List<ClusterNode> newPeers = ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value()));
var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join();
assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision()
: "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision.";
TableImpl tbl = tablesByIdVv.latest().get(tblId);
ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
// TODO: IGNITE-17197 Remove assert after the ticket is resolved.
assert tbl.internalTable().storage() instanceof MvTableStorage :
"Only multi version storages are supported. Current storage is a "
+ tbl.internalTable().storage().getClass().getName();
// Stable assignments from the meta store, which revision is bounded by the current pending event.
byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId),
pendingAssignmentsWatchEvent.revision()).join().value();
List<ClusterNode> assignments = stableAssignments == null
// This is for the case when the first rebalance occurs.
? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
: (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments);
ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
var deltaPeers = newPeers.stream()
.filter(p -> !assignments.contains(p))
.collect(Collectors.toList());
try {
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part);
RaftGroupOptions groupOptions = groupOptionsForPartition(tbl.internalTable(), partitionStorage, assignments);
RaftGroupListener raftGrpLsnr = new PartitionListener(
tblId,
new VersionedRowStore(partitionStorage, txManager)
);
RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
metaStorageMgr,
tblCfg,
partId,
part,
busyLock,
movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
rebalanceScheduler
);
raftMgr.startRaftGroupNode(
partId,
assignments,
raftGrpLsnr,
raftGrpEvtsLsnr,
groupOptions
);
}
} catch (NodeStoppingException e) {
// no-op
}
// Do not change peers of the raft group if this is a stale event.
// Note that we start raft node before for the sake of the consistency in a starting and stopping raft nodes.
if (pendingAssignmentsWatchEvent.revision() < pendingAssignments.revision()) {
return true;
}
var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(part);
IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
// run update of raft configuration if this node is a leader
if (localMember.address().equals(leaderWithTerm.get1().address())) {
LOG.info("Current node={} is the leader of partition raft group={}. "
+ "Initiate rebalance process for partition={}, table={}",
localMember.address(), partId, part, tbl.name());
partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
}
return true;
} finally {
busyLock.leaveBusy();
}
}
@Override
public void onError(@NotNull Throwable e) {
LOG.warn("Unable to process pending assignments event", e);
}
});
metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), new WatchListener() {
@Override
public boolean onUpdate(@NotNull WatchEvent evt) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(new NodeStoppingException());
}
try {
assert evt.single();
Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
if (stableAssignmentsWatchEvent.value() == null) {
return true;
}
int part = extractPartitionNumber(stableAssignmentsWatchEvent.key());
UUID tblId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
String partId = partitionRaftGroupName(tblId, part);
var stableAssignments = (List<ClusterNode>) ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(partId),
stableAssignmentsWatchEvent.revision()).join().value();
List<ClusterNode> pendingAssignments = pendingFromMetastorage == null
? Collections.emptyList()
: (List<ClusterNode>) ByteUtils.fromBytes(pendingFromMetastorage);
try {
ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
if (!stableAssignments.contains(localMember) && !pendingAssignments.contains(localMember)) {
raftMgr.stopRaftGroup(partId);
}
} catch (NodeStoppingException e) {
// no-op
}
return true;
} finally {
busyLock.leaveBusy();
}
}
@Override
public void onError(@NotNull Throwable e) {
LOG.warn("Unable to process stable assignments event", e);
}
});
}
/**
* Performs {@link RaftGroupService#changePeersAsync(java.util.List, long)} on a provided raft group service of a partition, so nodes
* of the corresponding raft group can be reconfigured.
* Retry mechanism is applied to repeat {@link RaftGroupService#changePeersAsync(java.util.List, long)} if previous one
* failed with some exception.
*
* @param raftGroupServiceSupplier Raft groups service of a partition.
* @return Function which performs {@link RaftGroupService#changePeersAsync(java.util.List, long)}.
*/
BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartition(Supplier<RaftGroupService> raftGroupServiceSupplier) {
return (List<Peer> peers, Long term) -> {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(new NodeStoppingException());
}
try {
return raftGroupServiceSupplier.get().changePeersAsync(peers, term).handleAsync((resp, err) -> {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(new NodeStoppingException());
}
try {
if (err != null) {
if (recoverable(err)) {
LOG.debug("Recoverable error received during changePeersAsync invocation, retrying", err);
} else {
// TODO: Ideally, rebalance, which has initiated this invocation should be canceled,
// TODO: https://issues.apache.org/jira/browse/IGNITE-17056
// TODO: Also it might be reasonable to delegate such exceptional case to a general failure handler.
// TODO: At the moment, we repeat such intents as well.
LOG.debug("Unrecoverable error received during changePeersAsync invocation, retrying", err);
}
return movePartition(raftGroupServiceSupplier).apply(peers, term);
}
return CompletableFuture.<Void>completedFuture(null);
} finally {
busyLock.leaveBusy();
}
}, rebalanceScheduler).thenCompose(Function.identity());
} finally {
busyLock.leaveBusy();
}
};
}
/**
* Gets a direct accessor for the configuration distributed property.
* If the metadata access only locally configured the method will return local property accessor.
*
* @param property Distributed configuration property to receive direct access.
* @param <T> Type of the property accessor.
* @return An accessor for distributive property.
* @see #getMetadataLocallyOnly
*/
private <T extends ConfigurationProperty<?>> T directProxy(T property) {
return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
}
}