| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.table.distributed; |
| |
| import static java.util.Collections.unmodifiableMap; |
| import static java.util.concurrent.CompletableFuture.allOf; |
| import static java.util.concurrent.CompletableFuture.completedFuture; |
| import static java.util.concurrent.CompletableFuture.failedFuture; |
| import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId; |
| import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION; |
| import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; |
| import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.recoverable; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey; |
| import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssignmentsKeys; |
| |
| import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.BiFunction; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.configuration.ConfigurationChangeException; |
| import org.apache.ignite.configuration.ConfigurationProperty; |
| import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; |
| import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; |
| import org.apache.ignite.configuration.schemas.table.TableChange; |
| import org.apache.ignite.configuration.schemas.table.TableConfiguration; |
| import org.apache.ignite.configuration.schemas.table.TableView; |
| import org.apache.ignite.configuration.schemas.table.TablesConfiguration; |
| import org.apache.ignite.configuration.validation.ConfigurationValidationException; |
| import org.apache.ignite.internal.affinity.AffinityUtils; |
| import org.apache.ignite.internal.baseline.BaselineManager; |
| import org.apache.ignite.internal.causality.VersionedValue; |
| import org.apache.ignite.internal.configuration.schema.ExtendedTableChange; |
| import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration; |
| import org.apache.ignite.internal.configuration.schema.ExtendedTableView; |
| import org.apache.ignite.internal.configuration.util.ConfigurationUtil; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.manager.EventListener; |
| import org.apache.ignite.internal.manager.IgniteComponent; |
| import org.apache.ignite.internal.manager.Producer; |
| import org.apache.ignite.internal.metastorage.MetaStorageManager; |
| import org.apache.ignite.internal.metastorage.client.Entry; |
| import org.apache.ignite.internal.metastorage.client.WatchEvent; |
| import org.apache.ignite.internal.metastorage.client.WatchListener; |
| import org.apache.ignite.internal.raft.Loza; |
| import org.apache.ignite.internal.raft.server.RaftGroupEventsListener; |
| import org.apache.ignite.internal.raft.server.RaftGroupOptions; |
| import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactory; |
| import org.apache.ignite.internal.schema.SchemaDescriptor; |
| import org.apache.ignite.internal.schema.SchemaManager; |
| import org.apache.ignite.internal.schema.SchemaUtils; |
| import org.apache.ignite.internal.schema.event.SchemaEvent; |
| import org.apache.ignite.internal.schema.event.SchemaEventParameters; |
| import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl; |
| import org.apache.ignite.internal.storage.DataStorageManager; |
| import org.apache.ignite.internal.storage.MvPartitionStorage; |
| import org.apache.ignite.internal.storage.engine.MvTableStorage; |
| import org.apache.ignite.internal.table.IgniteTablesInternal; |
| import org.apache.ignite.internal.table.InternalTable; |
| import org.apache.ignite.internal.table.TableImpl; |
| import org.apache.ignite.internal.table.distributed.raft.PartitionListener; |
| import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener; |
| import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory; |
| import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; |
| import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore; |
| import org.apache.ignite.internal.table.event.TableEvent; |
| import org.apache.ignite.internal.table.event.TableEventParameters; |
| import org.apache.ignite.internal.thread.NamedThreadFactory; |
| import org.apache.ignite.internal.tx.TxManager; |
| import org.apache.ignite.internal.util.ByteUtils; |
| import org.apache.ignite.internal.util.IgniteObjectName; |
| import org.apache.ignite.internal.util.IgniteSpinBusyLock; |
| import org.apache.ignite.lang.ByteArray; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteException; |
| import org.apache.ignite.lang.IgniteInternalException; |
| import org.apache.ignite.lang.IgniteStringFormatter; |
| import org.apache.ignite.lang.IgniteSystemProperties; |
| import org.apache.ignite.lang.NodeStoppingException; |
| import org.apache.ignite.lang.TableAlreadyExistsException; |
| import org.apache.ignite.lang.TableNotFoundException; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.network.NetworkAddress; |
| import org.apache.ignite.network.TopologyService; |
| import org.apache.ignite.raft.client.Peer; |
| import org.apache.ignite.raft.client.service.RaftGroupListener; |
| import org.apache.ignite.raft.client.service.RaftGroupService; |
| import org.apache.ignite.raft.jraft.entity.PeerId; |
| import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; |
| import org.apache.ignite.raft.jraft.util.Utils; |
| import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet; |
| import org.apache.ignite.table.Table; |
| import org.apache.ignite.table.manager.IgniteTables; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.annotations.TestOnly; |
| |
| /** |
| * Table manager. |
| */ |
| public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal, |
| IgniteComponent { |
| // TODO get rid of this in future? IGNITE-17307 |
| /** Timeout to complete the tablesByIdVv on revision update. */ |
| private static final long TABLES_COMPLETE_TIMEOUT = 120; |
| |
| /** The logger. */ |
| private static final IgniteLogger LOG = Loggers.forClass(TableManager.class); |
| |
| /** |
| * If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be |
| * skipped, and the local property will be returned. |
| * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage, |
| * TODO: will be removed after fix of the issue. |
| */ |
| private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY"); |
| |
| /** Tables configuration. */ |
| private final TablesConfiguration tablesCfg; |
| |
| /** Raft manager. */ |
| private final Loza raftMgr; |
| |
| /** Baseline manager. */ |
| private final BaselineManager baselineMgr; |
| |
| /** Transaction manager. */ |
| private final TxManager txManager; |
| |
| /** Meta storage manager. */ |
| private final MetaStorageManager metaStorageMgr; |
| |
| /** Data storage manager. */ |
| private final DataStorageManager dataStorageMgr; |
| |
| /** Here a table future stores during creation (until the table can be provided to client). */ |
| private final Map<UUID, CompletableFuture<Table>> tableCreateFuts = new ConcurrentHashMap<>(); |
| |
| /** Versioned store for tables by id. */ |
| private final VersionedValue<Map<UUID, TableImpl>> tablesByIdVv; |
| |
| /** Set of futures that should complete before completion of {@link #tablesByIdVv}, after completion this set is cleared. */ |
| private final Set<CompletableFuture<?>> beforeTablesVvComplete = new ConcurrentHashSet<>(); |
| |
| /** |
| * {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails, |
| * so we can stop resources associated with the table. |
| */ |
| private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new ConcurrentHashMap<>(); |
| |
| /** Resolver that resolves a network address to node id. */ |
| private final Function<NetworkAddress, String> netAddrResolver; |
| |
| /** Resolver that resolves a network address to cluster node. */ |
| private final Function<NetworkAddress, ClusterNode> clusterNodeResolver; |
| |
| /** Busy lock to stop synchronously. */ |
| private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); |
| |
| /** Prevents double stopping the component. */ |
| private final AtomicBoolean stopGuard = new AtomicBoolean(); |
| |
| /** Schema manager. */ |
| private final SchemaManager schemaManager; |
| |
| /** Executor for scheduling retries of a rebalance. */ |
| private final ScheduledExecutorService rebalanceScheduler; |
| |
| /** Separate executor for IO operations like partition storage initialization |
| * or partition raft group meta data persisting. |
| */ |
| private final ExecutorService ioExecutor; |
| |
| /** Rebalance scheduler pool size. */ |
| private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20); |
| |
| /** |
| * Creates a new table manager. |
| * |
| * @param registry Registry for versioned values. |
| * @param tablesCfg Tables configuration. |
| * @param raftMgr Raft manager. |
| * @param baselineMgr Baseline manager. |
| * @param txManager Transaction manager. |
| * @param dataStorageMgr Data storage manager. |
| * @param schemaManager Schema manager. |
| */ |
| public TableManager( |
| Consumer<Function<Long, CompletableFuture<?>>> registry, |
| TablesConfiguration tablesCfg, |
| Loza raftMgr, |
| BaselineManager baselineMgr, |
| TopologyService topologyService, |
| TxManager txManager, |
| DataStorageManager dataStorageMgr, |
| MetaStorageManager metaStorageMgr, |
| SchemaManager schemaManager |
| ) { |
| this.tablesCfg = tablesCfg; |
| this.raftMgr = raftMgr; |
| this.baselineMgr = baselineMgr; |
| this.txManager = txManager; |
| this.dataStorageMgr = dataStorageMgr; |
| this.metaStorageMgr = metaStorageMgr; |
| this.schemaManager = schemaManager; |
| |
| netAddrResolver = addr -> { |
| ClusterNode node = topologyService.getByAddress(addr); |
| |
| if (node == null) { |
| throw new IllegalStateException("Can't resolve ClusterNode by its networkAddress=" + addr); |
| } |
| |
| return node.id(); |
| }; |
| |
| clusterNodeResolver = topologyService::getByAddress; |
| |
| tablesByIdVv = new VersionedValue<>(null, HashMap::new); |
| |
| registry.accept(token -> { |
| List<CompletableFuture<?>> futures = new ArrayList<>(beforeTablesVvComplete); |
| |
| beforeTablesVvComplete.clear(); |
| |
| return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) |
| .orTimeout(TABLES_COMPLETE_TIMEOUT, TimeUnit.SECONDS) |
| .whenComplete((v, e) -> { |
| if (!busyLock.enterBusy()) { |
| if (e != null) { |
| LOG.warn("Error occurred while updating tables and stopping components.", e); |
| // Stop of the components has been started, so we do nothing and resources of tablesByIdVv will be |
| // freed in the logic of TableManager stop. We cannot complete tablesByIdVv exceptionally because |
| // we will lose a context of tables. |
| } |
| return; |
| } |
| try { |
| if (e != null) { |
| LOG.warn("Error occurred while updating tables.", e); |
| if (e instanceof CompletionException) { |
| Throwable th = e.getCause(); |
| // Case when stopping of the previous component has been started and related futures completed |
| // exceptionally |
| if (th instanceof NodeStoppingException || (th.getCause() != null |
| && th.getCause() instanceof NodeStoppingException)) { |
| // Stop of the components has been started so we do nothing and resources will be freed in the |
| // logic of TableManager stop |
| return; |
| } |
| } |
| // TODO: https://issues.apache.org/jira/browse/IGNITE-17515 |
| tablesByIdVv.completeExceptionally(token, e); |
| } |
| |
| //Normal scenario, when all related futures for tablesByIdVv are completed and we can complete tablesByIdVv |
| tablesByIdVv.complete(token); |
| |
| tablesToStopInCaseOfError.clear(); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| }); |
| }); |
| |
| rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, |
| new NamedThreadFactory("rebalance-scheduler", LOG)); |
| |
| ioExecutor = new ThreadPoolExecutor( |
| Math.min(Utils.cpus() * 3, 25), |
| Integer.MAX_VALUE, |
| 100, |
| TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<>(), |
| new NamedThreadFactory("tableManager-io", LOG)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void start() { |
| tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas); |
| |
| registerRebalanceListeners(); |
| |
| ((ExtendedTableConfiguration) tablesCfg.tables().any()).assignments().listen(this::onUpdateAssignments); |
| |
| tablesCfg.tables().listenElements(new ConfigurationNamedListListener<>() { |
| @Override |
| public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<TableView> ctx) { |
| return onTableCreate(ctx); |
| } |
| |
| @Override |
| public CompletableFuture<?> onRename(String oldName, String newName, ConfigurationNotificationEvent<TableView> ctx) { |
| // TODO: IGNITE-15485 Support table rename operation. |
| |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| @Override |
| public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<TableView> ctx) { |
| return onTableDelete(ctx); |
| } |
| }); |
| |
| schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() { |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Boolean> notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable exception) { |
| if (tablesByIdVv.latest().get(parameters.tableId()) != null) { |
| fireEvent( |
| TableEvent.ALTER, |
| new TableEventParameters(parameters.causalityToken(), tablesByIdVv.latest().get(parameters.tableId())), null |
| ); |
| } |
| |
| return completedFuture(false); |
| } |
| }); |
| } |
| |
| /** |
| * Listener of table create configuration change. |
| * |
| * @param ctx Table configuration context. |
| * @return A future. |
| */ |
| private CompletableFuture<?> onTableCreate(ConfigurationNotificationEvent<TableView> ctx) { |
| if (!busyLock.enterBusy()) { |
| String tblName = ctx.newValue().name(); |
| UUID tblId = ((ExtendedTableView) ctx.newValue()).id(); |
| |
| fireEvent(TableEvent.CREATE, |
| new TableEventParameters(ctx.storageRevision(), tblId, tblName), |
| new NodeStoppingException() |
| ); |
| |
| return failedFuture(new NodeStoppingException()); |
| } |
| |
| try { |
| return createTableLocally( |
| ctx.storageRevision(), |
| ctx.newValue().name(), |
| ((ExtendedTableView) ctx.newValue()).id(), |
| ctx.newValue().partitions() |
| ); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Listener of table drop configuration change. |
| * |
| * @param ctx Table configuration context. |
| * @return A future. |
| */ |
| private CompletableFuture<?> onTableDelete(ConfigurationNotificationEvent<TableView> ctx) { |
| if (!busyLock.enterBusy()) { |
| String tblName = ctx.oldValue().name(); |
| UUID tblId = ((ExtendedTableView) ctx.oldValue()).id(); |
| |
| fireEvent( |
| TableEvent.DROP, |
| new TableEventParameters(ctx.storageRevision(), tblId, tblName), |
| new NodeStoppingException() |
| ); |
| |
| return failedFuture(new NodeStoppingException()); |
| } |
| |
| try { |
| dropTableLocally( |
| ctx.storageRevision(), |
| ctx.oldValue().name(), |
| ((ExtendedTableView) ctx.oldValue()).id(), |
| (List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments()) |
| ); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| /** |
| * Listener of replicas configuration changes. |
| * |
| * @param replicasCtx Replicas configuration event context. |
| * @return A future, which will be completed, when event processed by listener. |
| */ |
| private CompletableFuture<?> onUpdateReplicas(ConfigurationNotificationEvent<Integer> replicasCtx) { |
| if (!busyLock.enterBusy()) { |
| return CompletableFuture.completedFuture(new NodeStoppingException()); |
| } |
| |
| try { |
| if (replicasCtx.oldValue() != null && replicasCtx.oldValue() > 0) { |
| TableConfiguration tblCfg = replicasCtx.config(TableConfiguration.class); |
| |
| LOG.info("Received update for replicas number [table={}, oldNumber={}, newNumber={}]", |
| tblCfg.name().value(), replicasCtx.oldValue(), replicasCtx.newValue()); |
| |
| int partCnt = tblCfg.partitions().value(); |
| |
| int newReplicas = replicasCtx.newValue(); |
| |
| CompletableFuture<?>[] futures = new CompletableFuture<?>[partCnt]; |
| |
| for (int i = 0; i < partCnt; i++) { |
| String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i); |
| |
| futures[i] = updatePendingAssignmentsKeys( |
| tblCfg.name().value(), partId, baselineMgr.nodes(), |
| partCnt, newReplicas, |
| replicasCtx.storageRevision(), metaStorageMgr, i); |
| } |
| |
| return CompletableFuture.allOf(futures); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Listener of assignment configuration changes. |
| * |
| * @param assignmentsCtx Assignment configuration context. |
| * @return A future. |
| */ |
| private CompletableFuture<?> onUpdateAssignments(ConfigurationNotificationEvent<byte[]> assignmentsCtx) { |
| if (!busyLock.enterBusy()) { |
| return failedFuture(new NodeStoppingException()); |
| } |
| |
| try { |
| updateAssignmentInternal(assignmentsCtx); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| |
| return completedFuture(null); |
| } |
| |
| /** |
| * Updates or creates partition raft groups. |
| * |
| * @param assignmentsCtx Change assignment event. |
| */ |
| private void updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> assignmentsCtx) { |
| ExtendedTableConfiguration tblCfg = assignmentsCtx.config(ExtendedTableConfiguration.class); |
| |
| UUID tblId = tblCfg.id().value(); |
| |
| long causalityToken = assignmentsCtx.storageRevision(); |
| |
| List<List<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null : |
| (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.oldValue()); |
| |
| List<List<ClusterNode>> newAssignments = (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.newValue()); |
| |
| // Empty assignments might be a valid case if tables are created from within cluster init HOCON |
| // configuration, which is not supported now. |
| assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tblId); |
| |
| int partitions = newAssignments.size(); |
| |
| CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions]; |
| |
| // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes |
| // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will |
| // TODO: be exact same amount of partitions and replicas for both old and new assignments |
| for (int i = 0; i < partitions; i++) { |
| int partId = i; |
| |
| List<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptyList() : |
| oldAssignments.get(partId); |
| |
| List<ClusterNode> newPartAssignment = newAssignments.get(partId); |
| |
| // Create new raft nodes according to new assignments. |
| tablesByIdVv.update(causalityToken, (tablesById, e) -> { |
| if (e != null) { |
| return failedFuture(e); |
| } |
| |
| InternalTable internalTbl = tablesById.get(tblId).internalTable(); |
| |
| // TODO: IGNITE-17197 Remove assert after the ticket is resolved. |
| assert internalTbl.storage() instanceof MvTableStorage : |
| "Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName(); |
| |
| // start new nodes, only if it is table creation |
| // other cases will be covered by rebalance logic |
| List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptyList(); |
| |
| String grpId = partitionRaftGroupName(tblId, partId); |
| |
| CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null); |
| |
| if (raftMgr.shouldHaveRaftGroupLocally(nodes)) { |
| startGroupFut = CompletableFuture |
| .supplyAsync( |
| () -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor) |
| .thenComposeAsync((partitionStorage) -> { |
| RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, partitionStorage, |
| newPartAssignment); |
| |
| try { |
| raftMgr.startRaftGroupNode( |
| grpId, |
| newPartAssignment, |
| new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)), |
| new RebalanceRaftGroupEventsListener( |
| metaStorageMgr, |
| tablesCfg.tables().get(tablesById.get(tblId).name()), |
| grpId, |
| partId, |
| busyLock, |
| movePartition(() -> internalTbl.partitionRaftGroupService(partId)), |
| rebalanceScheduler |
| ), |
| groupOptions |
| ); |
| |
| return CompletableFuture.completedFuture(null); |
| } catch (NodeStoppingException ex) { |
| return CompletableFuture.failedFuture(ex); |
| } |
| }, ioExecutor); |
| } |
| |
| futures[partId] = startGroupFut |
| .thenComposeAsync((v) -> { |
| try { |
| return raftMgr.startRaftGroupService(grpId, newPartAssignment); |
| } catch (NodeStoppingException ex) { |
| return CompletableFuture.failedFuture(ex); |
| } |
| }, ioExecutor) |
| .thenAccept( |
| updatedRaftGroupService -> ((InternalTableImpl) internalTbl) |
| .updateInternalTableRaftGroupService(partId, updatedRaftGroupService) |
| ).exceptionally(th -> { |
| LOG.warn("Unable to update raft groups on the node", th); |
| |
| return null; |
| }); |
| |
| return completedFuture(tablesById); |
| }); |
| } |
| |
| CompletableFuture.allOf(futures).join(); |
| } |
| |
| private RaftGroupOptions groupOptionsForPartition( |
| InternalTable internalTbl, |
| MvPartitionStorage partitionStorage, |
| List<ClusterNode> peers |
| ) { |
| RaftGroupOptions raftGroupOptions; |
| |
| if (internalTbl.storage().isVolatile()) { |
| raftGroupOptions = RaftGroupOptions.forVolatileStores() |
| .setLogStorageFactory(new VolatileLogStorageFactory()) |
| .raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage()); |
| } else { |
| raftGroupOptions = RaftGroupOptions.forPersistentStores(); |
| } |
| |
| //TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17420 |
| raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory( |
| partitionStorage, |
| peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()), |
| List.of() |
| )); |
| |
| return raftGroupOptions; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void stop() { |
| if (!stopGuard.compareAndSet(false, true)) { |
| return; |
| } |
| |
| busyLock.block(); |
| |
| Map<UUID, TableImpl> tables = tablesByIdVv.latest(); |
| |
| cleanUpTablesResources(tables); |
| |
| cleanUpTablesResources(tablesToStopInCaseOfError); |
| |
| tablesToStopInCaseOfError.clear(); |
| |
| shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS); |
| shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Stops resources that are related to provided tables. |
| * |
| * @param tables Tables to stop. |
| */ |
| private void cleanUpTablesResources(Map<UUID, TableImpl> tables) { |
| for (TableImpl table : tables.values()) { |
| try { |
| for (int p = 0; p < table.internalTable().partitions(); p++) { |
| raftMgr.stopRaftGroup(partitionRaftGroupName(table.tableId(), p)); |
| } |
| |
| table.internalTable().storage().stop(); |
| table.internalTable().close(); |
| } catch (Exception e) { |
| LOG.info("Unable to stop table [name={}]", e, table.name()); |
| } |
| } |
| } |
| |
| /** |
| * Gets a list of the current table assignments. |
| * |
| * <p>Returns a list where on the i-th place resides a node id that considered as a leader for |
| * the i-th partition on the moment of invocation. |
| * |
| * @param tableId Unique id of a table. |
| * @return List of the current assignments. |
| */ |
| public List<String> assignments(UUID tableId) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) { |
| throw new NodeStoppingException(); |
| } |
| try { |
| return table(tableId).internalTable().assignments(); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Creates local structures for a table. |
| * |
| * @param causalityToken Causality token. |
| * @param name Table name. |
| * @param tblId Table id. |
| * @param partitions Count of partitions. |
| * @return Future that will be completed when local changes related to the table creation are applied. |
| */ |
| private CompletableFuture<?> createTableLocally(long causalityToken, String name, UUID tblId, int partitions) { |
| TableConfiguration tableCfg = tablesCfg.tables().get(name); |
| |
| MvTableStorage tableStorage = dataStorageMgr.engine(tableCfg.dataStorage()).createMvTable(tableCfg); |
| |
| tableStorage.start(); |
| |
| InternalTableImpl internalTable = new InternalTableImpl(name, tblId, new Int2ObjectOpenHashMap<>(partitions), |
| partitions, netAddrResolver, clusterNodeResolver, txManager, tableStorage); |
| |
| var table = new TableImpl(internalTable); |
| |
| tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> { |
| if (e != null) { |
| return failedFuture(e); |
| } |
| |
| var val = new HashMap<>(previous); |
| |
| val.put(tblId, table); |
| |
| return completedFuture(val); |
| })); |
| |
| CompletableFuture<?> schemaFut = schemaManager.schemaRegistry(causalityToken, tblId) |
| .thenAccept(schema -> inBusyLock(busyLock, () -> table.schemaView(schema))) |
| .thenCompose( |
| v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table))) |
| ); |
| |
| beforeTablesVvComplete.add(schemaFut); |
| |
| tablesToStopInCaseOfError.put(tblId, table); |
| |
| // TODO should be reworked in IGNITE-16763 |
| return tablesByIdVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table))); |
| } |
| |
| /** |
| * Completes appropriate future to return result from API {@link TableManager#createTable(String, Consumer)}. |
| * |
| * @param table Table. |
| */ |
| private void completeApiCreateFuture(TableImpl table) { |
| CompletableFuture<Table> tblFut = tableCreateFuts.get(table.tableId()); |
| |
| if (tblFut != null) { |
| tblFut.complete(table); |
| |
| tableCreateFuts.values().removeIf(fut -> fut == tblFut); |
| } |
| } |
| |
| /** |
| * Drops local structures for a table. |
| * |
| * @param causalityToken Causality token. |
| * @param name Table name. |
| * @param tblId Table id. |
| * @param assignment Affinity assignment. |
| */ |
| private void dropTableLocally(long causalityToken, String name, UUID tblId, List<List<ClusterNode>> assignment) { |
| try { |
| int partitions = assignment.size(); |
| |
| for (int p = 0; p < partitions; p++) { |
| raftMgr.stopRaftGroup(partitionRaftGroupName(tblId, p)); |
| } |
| |
| tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { |
| if (e != null) { |
| return failedFuture(e); |
| } |
| |
| var map = new HashMap<>(previousVal); |
| |
| map.remove(tblId); |
| |
| return completedFuture(map); |
| })); |
| |
| TableImpl table = tablesByIdVv.latest().get(tblId); |
| |
| assert table != null : IgniteStringFormatter.format("There is no table with the name specified [name={}, id={}]", |
| name, tblId); |
| |
| table.internalTable().storage().destroy(); |
| |
| CompletableFuture<?> fut = schemaManager.dropRegistry(causalityToken, table.tableId()) |
| .thenCompose( |
| v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table))) |
| ); |
| |
| beforeTablesVvComplete.add(fut); |
| } catch (Exception e) { |
| fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tblId, name), e); |
| } |
| } |
| |
| /** |
| * Compounds a RAFT group unique name. |
| * |
| * @param tblId Table identifier. |
| * @param partition Number of table partitions. |
| * @return A RAFT group name. |
| */ |
| @NotNull |
| private String partitionRaftGroupName(UUID tblId, int partition) { |
| return tblId + "_part_" + partition; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public Table createTable(String name, Consumer<TableChange> tableInitChange) { |
| return join(createTableAsync(name, tableInitChange)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| return createTableAsyncInternal(IgniteObjectName.parseCanonicalName(name), tableInitChange); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Internal method that creates a new table with the given {@code name} asynchronously. If a table with the same name already exists, |
| * a future will be completed with {@link TableAlreadyExistsException}. |
| * |
| * @param name Table name. |
| * @param tableInitChange Table changer. |
| * @return Future representing pending completion of the operation. |
| * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: |
| * <ul> |
| * <li>the node is stopping.</li> |
| * </ul> |
| * @see TableAlreadyExistsException |
| */ |
| private CompletableFuture<Table> createTableAsyncInternal(String name, Consumer<TableChange> tableInitChange) { |
| CompletableFuture<Table> tblFut = new CompletableFuture<>(); |
| |
| tableAsyncInternal(name).thenAccept(tbl -> { |
| if (tbl != null) { |
| tblFut.completeExceptionally(new TableAlreadyExistsException(name)); |
| } else { |
| tablesCfg.change(tablesChange -> tablesChange.changeTables(tablesListChange -> { |
| if (tablesListChange.get(name) != null) { |
| throw new TableAlreadyExistsException(name); |
| } |
| |
| tablesListChange.create(name, (tableChange) -> { |
| tableChange.changeDataStorage( |
| dataStorageMgr.defaultTableDataStorageConsumer(tablesChange.defaultDataStorage()) |
| ); |
| |
| tableInitChange.accept(tableChange); |
| |
| var extConfCh = ((ExtendedTableChange) tableChange); |
| |
| int intTableId = tablesChange.globalIdCounter() + 1; |
| tablesChange.changeGlobalIdCounter(intTableId); |
| |
| extConfCh.changeTableId(intTableId); |
| |
| tableCreateFuts.put(extConfCh.id(), tblFut); |
| |
| // Affinity assignments calculation. |
| extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments( |
| baselineMgr.nodes(), |
| tableChange.partitions(), |
| tableChange.replicas()))) |
| // Table schema preparation. |
| .changeSchemas(schemasCh -> schemasCh.create( |
| String.valueOf(INITIAL_SCHEMA_VERSION), |
| schemaCh -> { |
| SchemaDescriptor schemaDesc; |
| |
| //TODO IGNITE-15747 Remove try-catch and force configuration |
| // validation here to ensure a valid configuration passed to |
| // prepareSchemaDescriptor() method. |
| try { |
| schemaDesc = SchemaUtils.prepareSchemaDescriptor( |
| ((ExtendedTableView) tableChange).schemas().size(), |
| tableChange); |
| } catch (IllegalArgumentException ex) { |
| throw new ConfigurationValidationException(ex.getMessage()); |
| } |
| |
| schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc)); |
| } |
| )); |
| }); |
| })).exceptionally(t -> { |
| Throwable ex = getRootCause(t); |
| |
| if (ex instanceof TableAlreadyExistsException) { |
| tblFut.completeExceptionally(ex); |
| } else { |
| LOG.debug("Unable to create table [name={}]", ex, name); |
| |
| tblFut.completeExceptionally(ex); |
| |
| tableCreateFuts.values().removeIf(fut -> fut == tblFut); |
| } |
| |
| return null; |
| }); |
| } |
| }); |
| |
| return tblFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void alterTable(String name, Consumer<TableChange> tableChange) { |
| join(alterTableAsync(name, tableChange)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| return alterTableAsyncInternal(IgniteObjectName.parseCanonicalName(name), tableChange); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Internal method that alters a cluster table. If an appropriate table does not exist, a future will be |
| * completed with {@link TableNotFoundException}. |
| * |
| * @param name Table name. |
| * @param tableChange Table changer. |
| * @return Future representing pending completion of the operation. |
| * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: |
| * <ul> |
| * <li>the node is stopping.</li> |
| * </ul> |
| * @see TableNotFoundException |
| */ |
| @NotNull |
| private CompletableFuture<Void> alterTableAsyncInternal(String name, Consumer<TableChange> tableChange) { |
| CompletableFuture<Void> tblFut = new CompletableFuture<>(); |
| |
| tableAsync(name).thenAccept(tbl -> { |
| if (tbl == null) { |
| tblFut.completeExceptionally(new TableNotFoundException(name)); |
| } else { |
| TableImpl tblImpl = (TableImpl) tbl; |
| |
| tablesCfg.tables().change(ch -> { |
| if (ch.get(name) == null) { |
| throw new TableNotFoundException(name); |
| } |
| |
| ch.update(name, tblCh -> { |
| tableChange.accept(tblCh); |
| |
| ((ExtendedTableChange) tblCh).changeSchemas(schemasCh -> |
| schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> { |
| ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value(); |
| |
| SchemaDescriptor descriptor; |
| |
| //TODO IGNITE-15747 Remove try-catch and force configuration validation |
| // here to ensure a valid configuration passed to prepareSchemaDescriptor() method. |
| try { |
| descriptor = SchemaUtils.prepareSchemaDescriptor( |
| ((ExtendedTableView) tblCh).schemas().size(), |
| tblCh); |
| |
| descriptor.columnMapping(SchemaUtils.columnMapper( |
| tblImpl.schemaView().schema(currTableView.schemas().size()), |
| currTableView, |
| descriptor, |
| tblCh)); |
| } catch (IllegalArgumentException ex) { |
| // Convert unexpected exceptions here, |
| // because validation actually happens later, |
| // when bulk configuration update is applied. |
| ConfigurationValidationException e = |
| new ConfigurationValidationException(ex.getMessage()); |
| |
| e.addSuppressed(ex); |
| |
| throw e; |
| } |
| |
| schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor)); |
| })); |
| } |
| ); |
| }).whenComplete((res, t) -> { |
| if (t != null) { |
| Throwable ex = getRootCause(t); |
| |
| if (ex instanceof TableNotFoundException) { |
| tblFut.completeExceptionally(ex); |
| } else { |
| LOG.debug("Unable to modify table [name={}]", ex, name); |
| |
| tblFut.completeExceptionally(ex); |
| } |
| } else { |
| tblFut.complete(res); |
| } |
| }); |
| } |
| }); |
| |
| return tblFut; |
| } |
| |
| /** |
| * Gets a cause exception for a client. |
| * |
| * @param t Exception wrapper. |
| * @return A root exception which will be acceptable to throw for public API. |
| */ |
| //TODO: IGNITE-16051 Implement exception converter for public API. |
| private @NotNull IgniteException getRootCause(Throwable t) { |
| Throwable ex; |
| |
| if (t instanceof CompletionException) { |
| if (t.getCause() instanceof ConfigurationChangeException) { |
| ex = t.getCause().getCause(); |
| } else { |
| ex = t.getCause(); |
| } |
| |
| } else { |
| ex = t; |
| } |
| |
| return ex instanceof IgniteException ? (IgniteException) ex : new IgniteException(ex); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void dropTable(String name) { |
| join(dropTableAsync(name)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Void> dropTableAsync(String name) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| return dropTableAsyncInternal(IgniteObjectName.parseCanonicalName(name)); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Internal method that drops a table with the name specified. If appropriate table does not be found, a future will be |
| * completed with {@link TableNotFoundException}. |
| * |
| * @param name Table name. |
| * @return Future representing pending completion of the operation. |
| * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: |
| * <ul> |
| * <li>the node is stopping.</li> |
| * </ul> |
| * @see TableNotFoundException |
| */ |
| @NotNull |
| private CompletableFuture<Void> dropTableAsyncInternal(String name) { |
| CompletableFuture<Void> dropTblFut = new CompletableFuture<>(); |
| |
| tableAsyncInternal(name).thenAccept(tbl -> { |
| // In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such |
| // distributed table and the local config has lagged behind. |
| if (tbl == null) { |
| dropTblFut.completeExceptionally(new TableNotFoundException(name)); |
| } else { |
| tablesCfg.tables() |
| .change(change -> { |
| if (change.get(name) == null) { |
| throw new TableNotFoundException(name); |
| } |
| |
| change.delete(name); |
| }) |
| .whenComplete((res, t) -> { |
| if (t != null) { |
| Throwable ex = getRootCause(t); |
| |
| if (ex instanceof TableNotFoundException) { |
| dropTblFut.completeExceptionally(ex); |
| } else { |
| LOG.debug("Unable to drop table [name={}]", ex, name); |
| |
| dropTblFut.completeExceptionally(ex); |
| } |
| } else { |
| dropTblFut.complete(res); |
| } |
| }); |
| } |
| }); |
| |
| return dropTblFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public List<Table> tables() { |
| return join(tablesAsync()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<List<Table>> tablesAsync() { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| return tablesAsyncInternal(); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Internal method for getting table. |
| * |
| * @return Future representing pending completion of the operation. |
| */ |
| private CompletableFuture<List<Table>> tablesAsyncInternal() { |
| // TODO: IGNITE-16288 directTableIds should use async configuration API |
| return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, this::directTableIds)) |
| .thenCompose(tableIds -> inBusyLock(busyLock, () -> { |
| var tableFuts = new CompletableFuture[tableIds.size()]; |
| |
| var i = 0; |
| |
| for (UUID tblId : tableIds) { |
| tableFuts[i++] = tableAsyncInternal(tblId, false); |
| } |
| |
| return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> { |
| var tables = new ArrayList<Table>(tableIds.size()); |
| |
| try { |
| for (var fut : tableFuts) { |
| var table = fut.get(); |
| |
| if (table != null) { |
| tables.add((Table) table); |
| } |
| } |
| } catch (Throwable t) { |
| throw new CompletionException(t); |
| } |
| |
| return tables; |
| })); |
| })); |
| } |
| |
| /** |
| * Collects a list of direct table ids. |
| * |
| * @return A list of direct table ids. |
| */ |
| private List<UUID> directTableIds() { |
| return ConfigurationUtil.internalIds(directProxy(tablesCfg.tables())); |
| } |
| |
| /** |
| * Gets direct id of table with {@code tblName}. |
| * |
| * @param tblName Name of the table. |
| * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found. |
| */ |
| @Nullable |
| private UUID directTableId(String tblName) { |
| try { |
| ExtendedTableConfiguration exTblCfg = ((ExtendedTableConfiguration) directProxy(tablesCfg.tables()).get(tblName)); |
| |
| if (exTblCfg == null) { |
| return null; |
| } else { |
| return exTblCfg.id().value(); |
| } |
| } catch (NoSuchElementException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * Actual tables map. |
| * |
| * @return Actual tables map. |
| */ |
| @TestOnly |
| public Map<UUID, TableImpl> latestTables() { |
| return unmodifiableMap(tablesByIdVv.latest()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public Table table(String name) { |
| return join(tableAsync(name)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public TableImpl table(UUID id) throws NodeStoppingException { |
| return join(tableAsync(id)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Table> tableAsync(String name) { |
| return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name)) |
| .thenApply(Function.identity()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<TableImpl> tableAsync(UUID id) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| return tableAsyncInternal(id, true); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public TableImpl tableImpl(String name) { |
| return join(tableImplAsync(name)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<TableImpl> tableImplAsync(String name) { |
| return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name)); |
| } |
| |
| /** |
| * Gets a table by name, if it was created before. Doesn't parse canonical name. |
| * |
| * @param name Table name. |
| * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. |
| * */ |
| public CompletableFuture<TableImpl> tableAsyncInternal(String name) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| try { |
| UUID tableId = directTableId(name); |
| |
| if (tableId == null) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| return tableAsyncInternal(tableId, false); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Internal method for getting table by id. |
| * |
| * @param id Table id. |
| * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false} otherwise. |
| * @return Future representing pending completion of the operation. |
| */ |
| public CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean checkConfiguration) { |
| if (checkConfiguration && !isTableConfigured(id)) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| var tbl = tablesByIdVv.latest().get(id); |
| |
| if (tbl != null) { |
| return CompletableFuture.completedFuture(tbl); |
| } |
| |
| CompletableFuture<TableImpl> getTblFut = new CompletableFuture<>(); |
| |
| EventListener<TableEventParameters> clo = new EventListener<>() { |
| @Override |
| public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) { |
| if (!id.equals(parameters.tableId())) { |
| return completedFuture(false); |
| } |
| |
| if (e == null) { |
| tablesByIdVv.get(parameters.causalityToken()).thenRun(() -> getTblFut.complete(parameters.table())); |
| } else { |
| getTblFut.completeExceptionally(e); |
| } |
| |
| return completedFuture(true); |
| } |
| |
| @Override |
| public void remove(@NotNull Throwable e) { |
| getTblFut.completeExceptionally(e); |
| } |
| }; |
| |
| listen(TableEvent.CREATE, clo); |
| |
| tbl = tablesByIdVv.latest().get(id); |
| |
| if (tbl != null && getTblFut.complete(tbl) || !isTableConfigured(id) && getTblFut.complete(null)) { |
| removeListener(TableEvent.CREATE, clo, null); |
| } |
| |
| return getTblFut; |
| } |
| |
| /** |
| * Checks that the table is configured with specific id. |
| * |
| * @param id Table id. |
| * @return True when the table is configured into cluster, false otherwise. |
| */ |
| private boolean isTableConfigured(UUID id) { |
| try { |
| ((ExtendedTableConfiguration) getByInternalId(directProxy(tablesCfg.tables()), id)).id().value(); |
| |
| return true; |
| } catch (NoSuchElementException e) { |
| return false; |
| } |
| } |
| |
| /** |
| * Waits for future result and return, or unwraps {@link CompletionException} to {@link IgniteException} if failed. |
| * |
| * @param future Completable future. |
| * @return Future result. |
| */ |
| private <T> T join(CompletableFuture<T> future) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteException(new NodeStoppingException()); |
| } |
| |
| try { |
| return future.join(); |
| } catch (CompletionException ex) { |
| throw convertThrowable(ex.getCause()); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Convert to public throwable. |
| * |
| * @param th Throwable. |
| * @return Public throwable. |
| */ |
| private RuntimeException convertThrowable(Throwable th) { |
| if (th instanceof RuntimeException) { |
| return (RuntimeException) th; |
| } |
| |
| return new IgniteException(th); |
| } |
| |
| /** |
| * Register the new meta storage listener for changes in the rebalance-specific keys. |
| */ |
| private void registerRebalanceListeners() { |
| metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), new WatchListener() { |
| @Override |
| public boolean onUpdate(@NotNull WatchEvent evt) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteInternalException(new NodeStoppingException()); |
| } |
| |
| try { |
| assert evt.single(); |
| |
| Entry pendingAssignmentsWatchEvent = evt.entryEvent().newEntry(); |
| |
| if (pendingAssignmentsWatchEvent.value() == null) { |
| return true; |
| } |
| |
| int part = extractPartitionNumber(pendingAssignmentsWatchEvent.key()); |
| UUID tblId = extractTableId(pendingAssignmentsWatchEvent.key(), PENDING_ASSIGNMENTS_PREFIX); |
| |
| String partId = partitionRaftGroupName(tblId, part); |
| |
| // Assignments of the pending rebalance that we received through the meta storage watch mechanism. |
| List<ClusterNode> newPeers = ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value())); |
| |
| var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join(); |
| |
| assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision() |
| : "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision."; |
| |
| TableImpl tbl = tablesByIdVv.latest().get(tblId); |
| |
| ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name()); |
| |
| // TODO: IGNITE-17197 Remove assert after the ticket is resolved. |
| assert tbl.internalTable().storage() instanceof MvTableStorage : |
| "Only multi version storages are supported. Current storage is a " |
| + tbl.internalTable().storage().getClass().getName(); |
| |
| // Stable assignments from the meta store, which revision is bounded by the current pending event. |
| byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId), |
| pendingAssignmentsWatchEvent.revision()).join().value(); |
| |
| List<ClusterNode> assignments = stableAssignments == null |
| // This is for the case when the first rebalance occurs. |
| ? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part) |
| : (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments); |
| |
| ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember(); |
| |
| var deltaPeers = newPeers.stream() |
| .filter(p -> !assignments.contains(p)) |
| .collect(Collectors.toList()); |
| |
| try { |
| LOG.info("Received update on pending assignments. Check if new raft group should be started" |
| + " [key={}, partition={}, table={}, localMemberAddress={}]", |
| pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address()); |
| |
| if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) { |
| MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part); |
| |
| RaftGroupOptions groupOptions = groupOptionsForPartition(tbl.internalTable(), partitionStorage, assignments); |
| |
| RaftGroupListener raftGrpLsnr = new PartitionListener( |
| tblId, |
| new VersionedRowStore(partitionStorage, txManager) |
| ); |
| |
| RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener( |
| metaStorageMgr, |
| tblCfg, |
| partId, |
| part, |
| busyLock, |
| movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)), |
| rebalanceScheduler |
| ); |
| |
| raftMgr.startRaftGroupNode( |
| partId, |
| assignments, |
| raftGrpLsnr, |
| raftGrpEvtsLsnr, |
| groupOptions |
| ); |
| } |
| } catch (NodeStoppingException e) { |
| // no-op |
| } |
| |
| // Do not change peers of the raft group if this is a stale event. |
| // Note that we start raft node before for the sake of the consistency in a starting and stopping raft nodes. |
| if (pendingAssignmentsWatchEvent.revision() < pendingAssignments.revision()) { |
| return true; |
| } |
| |
| var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList()); |
| |
| RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(part); |
| |
| IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join(); |
| |
| // run update of raft configuration if this node is a leader |
| if (localMember.address().equals(leaderWithTerm.get1().address())) { |
| LOG.info("Current node={} is the leader of partition raft group={}. " |
| + "Initiate rebalance process for partition={}, table={}", |
| localMember.address(), partId, part, tbl.name()); |
| |
| partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join(); |
| } |
| |
| return true; |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| @Override |
| public void onError(@NotNull Throwable e) { |
| LOG.warn("Unable to process pending assignments event", e); |
| } |
| }); |
| |
| metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), new WatchListener() { |
| @Override |
| public boolean onUpdate(@NotNull WatchEvent evt) { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteInternalException(new NodeStoppingException()); |
| } |
| |
| try { |
| assert evt.single(); |
| |
| Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry(); |
| |
| if (stableAssignmentsWatchEvent.value() == null) { |
| return true; |
| } |
| |
| int part = extractPartitionNumber(stableAssignmentsWatchEvent.key()); |
| UUID tblId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX); |
| |
| String partId = partitionRaftGroupName(tblId, part); |
| |
| var stableAssignments = (List<ClusterNode>) ByteUtils.fromBytes(stableAssignmentsWatchEvent.value()); |
| |
| byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(partId), |
| stableAssignmentsWatchEvent.revision()).join().value(); |
| |
| List<ClusterNode> pendingAssignments = pendingFromMetastorage == null |
| ? Collections.emptyList() |
| : (List<ClusterNode>) ByteUtils.fromBytes(pendingFromMetastorage); |
| |
| try { |
| ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember(); |
| |
| if (!stableAssignments.contains(localMember) && !pendingAssignments.contains(localMember)) { |
| raftMgr.stopRaftGroup(partId); |
| } |
| } catch (NodeStoppingException e) { |
| // no-op |
| } |
| |
| return true; |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| @Override |
| public void onError(@NotNull Throwable e) { |
| LOG.warn("Unable to process stable assignments event", e); |
| } |
| }); |
| } |
| |
| /** |
| * Performs {@link RaftGroupService#changePeersAsync(java.util.List, long)} on a provided raft group service of a partition, so nodes |
| * of the corresponding raft group can be reconfigured. |
| * Retry mechanism is applied to repeat {@link RaftGroupService#changePeersAsync(java.util.List, long)} if previous one |
| * failed with some exception. |
| * |
| * @param raftGroupServiceSupplier Raft groups service of a partition. |
| * @return Function which performs {@link RaftGroupService#changePeersAsync(java.util.List, long)}. |
| */ |
| BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartition(Supplier<RaftGroupService> raftGroupServiceSupplier) { |
| return (List<Peer> peers, Long term) -> { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteInternalException(new NodeStoppingException()); |
| } |
| try { |
| return raftGroupServiceSupplier.get().changePeersAsync(peers, term).handleAsync((resp, err) -> { |
| if (!busyLock.enterBusy()) { |
| throw new IgniteInternalException(new NodeStoppingException()); |
| } |
| try { |
| if (err != null) { |
| if (recoverable(err)) { |
| LOG.debug("Recoverable error received during changePeersAsync invocation, retrying", err); |
| } else { |
| // TODO: Ideally, rebalance, which has initiated this invocation should be canceled, |
| // TODO: https://issues.apache.org/jira/browse/IGNITE-17056 |
| // TODO: Also it might be reasonable to delegate such exceptional case to a general failure handler. |
| // TODO: At the moment, we repeat such intents as well. |
| LOG.debug("Unrecoverable error received during changePeersAsync invocation, retrying", err); |
| } |
| return movePartition(raftGroupServiceSupplier).apply(peers, term); |
| } |
| |
| return CompletableFuture.<Void>completedFuture(null); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| }, rebalanceScheduler).thenCompose(Function.identity()); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| }; |
| } |
| |
| /** |
| * Gets a direct accessor for the configuration distributed property. |
| * If the metadata access only locally configured the method will return local property accessor. |
| * |
| * @param property Distributed configuration property to receive direct access. |
| * @param <T> Type of the property accessor. |
| * @return An accessor for distributive property. |
| * @see #getMetadataLocallyOnly |
| */ |
| private <T extends ConfigurationProperty<?>> T directProxy(T property) { |
| return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property); |
| } |
| } |