| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; |
| import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.apache.ignite.thread.OomExceptionHandler; |
| import org.jetbrains.annotations.Nullable; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_WAL; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; |
| import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; |
| import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED; |
| |
| /** |
| * Write-ahead log state manager. Manages WAL enable and disable. |
| */ |
| public class WalStateManager extends GridCacheSharedManagerAdapter { |
| /** */ |
| public static final String ENABLE_DURABILITY_AFTER_REBALANCING = "enable-durability-rebalance-finished-"; |
| |
| /** History size for to track stale messages. */ |
| private static final int HIST_SIZE = 1000; |
| |
| /** @see IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING */ |
| public static final boolean DFLT_DISABLE_WAL_DURING_REBALANCING = true; |
| |
| /** ID history for discovery messages. */ |
| private final GridBoundedConcurrentLinkedHashSet<T2<UUID, Boolean>> discoMsgIdHist = |
| new GridBoundedConcurrentLinkedHashSet<>(HIST_SIZE); |
| |
| /** History of already completed operations. */ |
| private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds = |
| new GridBoundedConcurrentLinkedHashSet<>(HIST_SIZE); |
| |
| /** Client futures. */ |
| private final Map<UUID, GridFutureAdapter<Boolean>> userFuts = new HashMap<>(); |
| |
| /** Finished results awaiting discovery finish message. */ |
| private final Map<UUID, WalStateResult> ress = new HashMap<>(); |
| |
| /** Active distributed processes. */ |
| private final Map<UUID, WalStateDistributedProcess> procs = new HashMap<>(); |
| |
| /** Pending results created on cache processor start based on available discovery data. */ |
| private final Collection<WalStateResult> initialRess = new LinkedList<>(); |
| |
| /** Pending acknowledge messages (i.e. received before node completed it's local part). */ |
| private final Collection<WalStateAckMessage> pendingAcks = new HashSet<>(); |
| |
| /** Whether this is a server node. */ |
| private final boolean srv; |
| |
| /** IO message listener. */ |
| private final GridMessageListener ioLsnr; |
| |
| /** Operation mutex. */ |
| private final Object mux = new Object(); |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Current coordinator node. */ |
| private ClusterNode crdNode; |
| |
| /** Disconnected flag. */ |
| private boolean disconnected; |
| |
| /** */ |
| private volatile WALDisableContext walDisableContext; |
| |
| /** Denies or allows WAL disabling. */ |
| private volatile boolean prohibitDisabling; |
| |
| /** |
| * Constructor. |
| * |
| * @param kernalCtx Kernal context. |
| */ |
| public WalStateManager(GridKernalContext kernalCtx) { |
| if (kernalCtx != null) { |
| IgniteConfiguration cfg = kernalCtx.config(); |
| |
| boolean client = cfg.isClientMode() != null && cfg.isClientMode(); |
| |
| srv = !client && !cfg.isDaemon(); |
| |
| log = kernalCtx.log(WalStateManager.class); |
| } |
| else { |
| srv = false; |
| |
| log = null; |
| } |
| |
| if (srv) { |
| ioLsnr = new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| if (msg instanceof WalStateAckMessage) { |
| WalStateAckMessage msg0 = (WalStateAckMessage)msg; |
| |
| msg0.senderNodeId(nodeId); |
| |
| onAck(msg0); |
| } |
| else |
| U.warn(log, "Unexpected IO message (will ignore): " + msg); |
| } |
| }; |
| } |
| else |
| ioLsnr = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void start0() throws IgniteCheckedException { |
| if (srv) |
| cctx.kernalContext().io().addMessageListener(TOPIC_WAL, ioLsnr); |
| |
| walDisableContext = new WALDisableContext( |
| cctx.cache().context().database(), |
| cctx.pageStore(), |
| log |
| ); |
| |
| cctx.kernalContext().internalSubscriptionProcessor().registerMetastorageListener(walDisableContext); |
| |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel) { |
| if (srv) |
| cctx.kernalContext().io().removeMessageListener(TOPIC_WAL, ioLsnr); |
| } |
| |
| /** |
| * Callback invoked when caches info is collected inside cache processor start routine. Discovery is not |
| * active at this point. |
| */ |
| public void onCachesInfoCollected() { |
| if (!srv) |
| return; |
| |
| synchronized (mux) { |
| // Process top pending requests. |
| for (CacheGroupDescriptor grpDesc : cacheProcessor().cacheGroupDescriptors().values()) { |
| CacheGroupContext cctx = cacheProcessor().cacheGroup(grpDesc.groupId()); |
| |
| if (cctx != null) |
| cctx.globalWalEnabled(grpDesc.walEnabled()); |
| |
| for (WalStateProposeMessage msg : grpDesc.walChangeRequests()) { |
| if (msg != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Processing WAL state message on start: " + msg); |
| |
| boolean enabled = grpDesc.walEnabled(); |
| |
| WalStateResult res; |
| |
| if (F.eq(enabled, msg.enable())) |
| res = new WalStateResult(msg, false); |
| else |
| res = new WalStateResult(msg, true); |
| |
| initialRess.add(res); |
| |
| addResult(res); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handle cache processor kernal start. At this point we already collected discovery data from other nodes |
| * (discovery already active), but exchange worker is not active yet. We need to iterate over available group |
| * descriptors and perform top operations, taking in count that no cache operations are possible at this point, |
| * so checkpoint is not needed. |
| */ |
| public void onKernalStart() { |
| if (!srv) |
| return; |
| |
| synchronized (mux) { |
| for (WalStateResult res : initialRess) { |
| onCompletedLocally(res); |
| |
| if (res.changed()) { |
| WalStateProposeMessage propMsg = res.message(); |
| |
| CacheGroupContext grpCtx = cctx.cache().cacheGroup(propMsg.groupId()); |
| |
| if (grpCtx != null) |
| grpCtx.globalWalEnabled(propMsg.enable()); |
| } |
| } |
| |
| initialRess.clear(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture reconnectFut) { |
| Collection<GridFutureAdapter<Boolean>> userFuts0; |
| |
| synchronized (mux) { |
| assert !disconnected; |
| |
| disconnected = true; |
| |
| userFuts0 = new ArrayList<>(userFuts.values()); |
| |
| userFuts.clear(); |
| } |
| |
| for (GridFutureAdapter<Boolean> userFut : userFuts0) |
| completeWithError(userFut, "Client node was disconnected from topology (operation result is unknown)."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReconnected(boolean active) { |
| synchronized (mux) { |
| assert disconnected; |
| |
| disconnected = false; |
| } |
| } |
| |
| /** |
| * Denies or allows WAL disabling with subsequent {@link #init(Collection, boolean)} call. |
| * |
| * @param val denial status. |
| */ |
| public void prohibitWALDisabling(boolean val) { |
| prohibitDisabling = val; |
| } |
| |
| /** |
| * Reports whether WAL disabling with subsequent {@link #init(Collection, boolean)} is denied. |
| * |
| * @return denial status. |
| */ |
| public boolean prohibitWALDisabling() { |
| return prohibitDisabling; |
| } |
| |
| /** |
| * Change WAL mode. |
| * |
| * @param cacheNames Cache names. |
| * @param enabled Enabled flag. |
| * @return Future completed when operation finished. |
| */ |
| public IgniteInternalFuture<Boolean> changeWalMode(Collection<String> cacheNames, boolean enabled) { |
| cctx.tm().checkEmptyTransactions(() -> |
| String.format("Cache WAL mode cannot be changed within lock or transaction " + |
| "[cacheNames=%s, walEnabled=%s]", cacheNames, enabled)); |
| |
| LT.warn(log, "Cache WAL mode may only be changed on stable topology: see https://issues.apache.org/jira/browse/IGNITE-13976"); |
| LT.warn(log, " ^-- No nodes may leave or join cluster while changing WAL mode."); |
| LT.warn(log, " ^-- All baseline nodes should be present."); |
| LT.warn(log, " ^-- Failure to observe these conditions may cause cache to be stuck in inconsistent state."); |
| LT.warn(log, " ^-- You may need to destroy affected cache if that happens."); |
| |
| return init(cacheNames, enabled); |
| } |
| |
| /** |
| * Initiate WAL mode change operation. |
| * |
| * @param cacheNames Cache names. |
| * @param enabled Enabled flag. |
| * @return Future completed when operation finished. |
| */ |
| private IgniteInternalFuture<Boolean> init(Collection<String> cacheNames, boolean enabled) { |
| if (!enabled && prohibitDisabling) |
| return errorFuture("WAL disabling is prohibited."); |
| |
| if (F.isEmpty(cacheNames)) |
| return errorFuture("Cache names cannot be empty."); |
| |
| synchronized (mux) { |
| if (disconnected) |
| return errorFuture("Failed to initiate WAL mode change because client node is disconnected."); |
| |
| // Prepare cache and group infos. |
| Map<String, IgniteUuid> caches = new HashMap<>(cacheNames.size()); |
| |
| CacheGroupDescriptor grpDesc = null; |
| |
| for (String cacheName : cacheNames) { |
| DynamicCacheDescriptor cacheDesc = cacheProcessor().cacheDescriptor(cacheName); |
| |
| if (cacheDesc == null) |
| return errorFuture("Cache doesn't exist: " + cacheName); |
| |
| caches.put(cacheName, cacheDesc.deploymentId()); |
| |
| CacheGroupDescriptor curGrpDesc = cacheDesc.groupDescriptor(); |
| |
| if (grpDesc == null) |
| grpDesc = curGrpDesc; |
| else if (!F.eq(grpDesc.deploymentId(), curGrpDesc.deploymentId())) { |
| return errorFuture("Cannot change WAL mode for caches from different cache groups [" + |
| "cache1=" + cacheNames.iterator().next() + ", grp1=" + grpDesc.groupName() + |
| ", cache2=" + cacheName + ", grp2=" + curGrpDesc.groupName() + ']'); |
| } |
| } |
| |
| assert grpDesc != null; |
| |
| HashSet<String> grpCaches = new HashSet<>(grpDesc.caches().keySet()); |
| |
| grpCaches.removeAll(cacheNames); |
| |
| if (!grpCaches.isEmpty()) { |
| return errorFuture("Cannot change WAL mode because not all cache names belonging to the group are " + |
| "provided [group=" + grpDesc.groupName() + ", missingCaches=" + grpCaches + ']'); |
| } |
| |
| // WAL mode change makes sense only for persistent groups. |
| if (!grpDesc.persistenceEnabled()) |
| return errorFuture("Cannot change WAL mode because persistence is not enabled for cache(s) [" + |
| "caches=" + cacheNames + ", dataRegion=" + grpDesc.config().getDataRegionName() + ']'); |
| |
| // Send request. |
| final UUID opId = UUID.randomUUID(); |
| |
| GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>(); |
| |
| fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> fut) { |
| synchronized (mux) { |
| userFuts.remove(opId); |
| } |
| } |
| }); |
| |
| WalStateProposeMessage msg = new WalStateProposeMessage(opId, grpDesc.groupId(), grpDesc.deploymentId(), |
| cctx.localNodeId(), caches, enabled); |
| |
| userFuts.put(opId, fut); |
| |
| try { |
| cctx.discovery().sendCustomEvent(msg); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initiated WAL state change operation: " + msg); |
| } |
| catch (Exception e) { |
| IgniteCheckedException e0 = |
| new IgniteCheckedException("Failed to initiate WAL mode change due to unexpected exception.", e); |
| |
| fut.onDone(e0); |
| } |
| |
| return fut; |
| } |
| } |
| |
| /** |
| * Change local WAL state before exchange is done. This method will disable WAL for groups without partitions |
| * in OWNING state if such feature is enabled. |
| * |
| * @param fut Exchange future. |
| */ |
| public void disableGroupDurabilityForPreloading(GridDhtPartitionsExchangeFuture fut) { |
| if (fut.changedBaseline() |
| && IgniteSystemProperties.getBoolean(IGNITE_PENDING_TX_TRACKER_ENABLED) |
| || !IgniteSystemProperties.getBoolean(IGNITE_DISABLE_WAL_DURING_REBALANCING, DFLT_DISABLE_WAL_DURING_REBALANCING)) |
| return; |
| |
| Collection<CacheGroupContext> grpContexts = cctx.cache().cacheGroups(); |
| |
| for (CacheGroupContext grp : grpContexts) { |
| if (!grp.affinityNode() || !(grp.persistenceEnabled() || grp.cdcEnabled()) |
| || !grp.localWalEnabled() || !grp.rebalanceEnabled() || !grp.shared().isRebalanceEnabled()) |
| continue; |
| |
| List<GridDhtLocalPartition> locParts = grp.topology().localPartitions(); |
| |
| int cnt = 0; |
| |
| for (GridDhtLocalPartition locPart : locParts) { |
| if (locPart.state() == MOVING || locPart.state() == RENTING) |
| cnt++; |
| } |
| |
| if (!locParts.isEmpty() && cnt == locParts.size()) |
| grp.localWalEnabled(false, true); |
| } |
| } |
| |
| /** |
| * Handle propose message in discovery thread. |
| * |
| * @param msg Message. |
| */ |
| public void onProposeDiscovery(WalStateProposeMessage msg) { |
| if (isDuplicate(msg)) |
| return; |
| |
| synchronized (mux) { |
| if (disconnected) |
| return; |
| |
| // Validate current caches state before deciding whether to process message further. |
| if (validateProposeDiscovery(msg)) { |
| if (log.isDebugEnabled()) |
| log.debug("WAL state change message is valid (will continue processing): " + msg); |
| |
| CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId()); |
| |
| assert grpDesc != null; |
| |
| IgnitePredicate<ClusterNode> nodeFilter = grpDesc.config().getNodeFilter(); |
| |
| boolean affNode = srv && (nodeFilter == null || nodeFilter.apply(cctx.localNode())); |
| |
| msg.affinityNode(affNode); |
| |
| if (grpDesc.addWalChangeRequest(msg)) { |
| msg.exchangeMessage(msg); |
| |
| if (log.isDebugEnabled()) |
| log.debug("WAL state change message will be processed in exchange thread: " + msg); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("WAL state change message is added to pending set and will be processed later: " + |
| msg); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("WAL state change message is invalid (will ignore): " + msg); |
| } |
| } |
| } |
| |
| /** |
| * Validate propose message. |
| * |
| * @param msg Message. |
| * @return {@code True} if message should be processed further, {@code false} if no further processing is needed. |
| */ |
| private boolean validateProposeDiscovery(WalStateProposeMessage msg) { |
| GridFutureAdapter<Boolean> userFut = userFuts.get(msg.operationId()); |
| |
| String errMsg = validate(msg); |
| |
| if (errMsg != null) { |
| completeWithError(userFut, errMsg); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Validate propose message. |
| * |
| * @param msg Message. |
| * @return Error message or {@code null} if everything is OK. |
| */ |
| @Nullable private String validate(WalStateProposeMessage msg) { |
| // Is group still there? |
| CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId()); |
| |
| if (grpDesc == null) |
| return "Failed to change WAL mode because some caches no longer exist: " + msg.caches().keySet(); |
| |
| // Are specified caches still there? |
| for (Map.Entry<String, IgniteUuid> cache : msg.caches().entrySet()) { |
| String cacheName = cache.getKey(); |
| |
| DynamicCacheDescriptor cacheDesc = cacheProcessor().cacheDescriptor(cacheName); |
| |
| if (cacheDesc == null || !F.eq(cacheDesc.deploymentId(), cache.getValue())) |
| return "Cache doesn't exist: " + cacheName; |
| } |
| |
| // Are there any new caches in the group? |
| HashSet<String> grpCacheNames = new HashSet<>(grpDesc.caches().keySet()); |
| |
| grpCacheNames.removeAll(msg.caches().keySet()); |
| |
| if (!grpCacheNames.isEmpty()) { |
| return "Cannot change WAL mode because not all cache names belonging to the " + |
| "group are provided [group=" + grpDesc.groupName() + ", missingCaches=" + grpCacheNames + ']'; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Handle propose message which is synchronized with other cache state actions through exchange thread. |
| * If operation is no-op (i.e. state is not changed), then no additional processing is needed, and coordinator will |
| * trigger finish request right away. Otherwise all nodes start asynchronous checkpoint flush, and send responses |
| * to coordinator. Once all responses are received, coordinator node will trigger finish message. |
| * |
| * @param msg Message. |
| */ |
| public void onProposeExchange(WalStateProposeMessage msg) { |
| if (!srv) |
| return; |
| |
| synchronized (mux) { |
| WalStateResult res = null; |
| |
| if (msg.affinityNode()) { |
| // Affinity node, normal processing. |
| CacheGroupContext grpCtx = cacheProcessor().cacheGroup(msg.groupId()); |
| |
| if (grpCtx == null) { |
| // Related caches were destroyed concurrently. |
| res = new WalStateResult(msg, "Failed to change WAL mode because some caches " + |
| "no longer exist: " + msg.caches().keySet()); |
| } |
| else { |
| if (F.eq(msg.enable(), grpCtx.globalWalEnabled())) |
| // Nothing changed -> no-op. |
| res = new WalStateResult(msg, false); |
| else { |
| // Initiate a checkpoint. |
| CheckpointProgress cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId()); |
| |
| if (cpFut != null) { |
| try { |
| // Wait for checkpoint mark synchronously before releasing the control. |
| cpFut.futureFor(LOCK_RELEASED).get(); |
| |
| if (msg.enable()) { |
| grpCtx.globalWalEnabled(true); |
| |
| // Enable: it is enough to release cache operations once mark is finished because |
| // not-yet-flushed dirty pages have been logged. |
| WalStateChangeWorker worker = new WalStateChangeWorker(msg, cpFut); |
| |
| IgniteThread thread = new IgniteThread(worker); |
| |
| thread.setUncaughtExceptionHandler(new OomExceptionHandler( |
| cctx.kernalContext())); |
| |
| thread.start(); |
| } |
| else { |
| // Disable: not-yet-flushed operations are not logged, so wait for them |
| // synchronously in exchange thread. Otherwise, we cannot define a point in |
| // when it is safe to continue cache operations. |
| res = awaitCheckpoint(cpFut, msg); |
| |
| // WAL state is persisted after checkpoint if finished. Otherwise in case of crash |
| // and restart we will think that WAL is enabled, but data might be corrupted. |
| grpCtx.globalWalEnabled(false); |
| } |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to change WAL mode due to unexpected exception [" + |
| "msg=" + msg + ']', e); |
| |
| res = new WalStateResult(msg, "Failed to change WAL mode due to unexpected " + |
| "exception (see server logs for more information): " + e.getMessage()); |
| } |
| } |
| else { |
| res = new WalStateResult(msg, "Failed to initiate a checkpoint (checkpoint thread " + |
| "is not available)."); |
| } |
| } |
| } |
| } |
| else { |
| // We cannot know result on non-affinity server node, so just complete operation with "false" flag, |
| // which will be ignored anyway. |
| res = new WalStateResult(msg, false); |
| } |
| |
| if (res != null) { |
| addResult(res); |
| |
| onCompletedLocally(res); |
| } |
| } |
| } |
| |
| /** |
| * Handle local operation completion. |
| * |
| * @param res Result. |
| */ |
| private void onCompletedLocally(WalStateResult res) { |
| assert res != null; |
| |
| synchronized (mux) { |
| ClusterNode crdNode = coordinator(); |
| |
| UUID opId = res.message().operationId(); |
| |
| WalStateAckMessage msg = new WalStateAckMessage(opId, res.message().affinityNode(), |
| res.changed(), res.errorMessage()); |
| |
| // Handle distributed completion. |
| if (crdNode.isLocal()) { |
| Collection<ClusterNode> srvNodes = cctx.discovery().aliveServerNodes(); |
| |
| Collection<UUID> srvNodeIds = new ArrayList<>(srvNodes.size()); |
| |
| for (ClusterNode srvNode : srvNodes) { |
| if (cctx.discovery().alive(srvNode)) |
| srvNodeIds.add(srvNode.id()); |
| } |
| |
| WalStateDistributedProcess proc = new WalStateDistributedProcess(res.message(), srvNodeIds); |
| |
| procs.put(res.message().operationId(), proc); |
| |
| unwindPendingAcks(proc); |
| |
| proc.onNodeFinished(cctx.localNodeId(), msg); |
| |
| sendFinishMessageIfNeeded(proc); |
| } |
| else { |
| // Just send message to coordinator. |
| try { |
| cctx.kernalContext().io().sendToGridTopic(crdNode, TOPIC_WAL, msg, SYSTEM_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Failed to send ack message to coordinator node [opId=" + opId + |
| ", node=" + crdNode.id() + ']'); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Unwind pending ack messages for the given distributed process. |
| * |
| * @param proc Process. |
| */ |
| private void unwindPendingAcks(WalStateDistributedProcess proc) { |
| assert Thread.holdsLock(mux); |
| |
| Iterator<WalStateAckMessage> iter = pendingAcks.iterator(); |
| |
| while (iter.hasNext()) { |
| WalStateAckMessage ackMsg = iter.next(); |
| |
| if (F.eq(proc.operationId(), ackMsg.operationId())) { |
| proc.onNodeFinished(ackMsg.senderNodeId(), ackMsg); |
| |
| iter.remove(); |
| } |
| } |
| } |
| |
| /** |
| * Handle ack message. |
| * |
| * @param msg Ack message. |
| */ |
| public void onAck(WalStateAckMessage msg) { |
| synchronized (mux) { |
| if (completedOpIds.contains(msg.operationId())) |
| // Skip stale messages. |
| return; |
| |
| WalStateDistributedProcess proc = procs.get(msg.operationId()); |
| |
| if (proc == null) |
| // If process if not initialized yet, add to pending set. |
| pendingAcks.add(msg); |
| else { |
| // Notify process on node completion. |
| proc.onNodeFinished(msg.senderNodeId(), msg); |
| |
| sendFinishMessageIfNeeded(proc); |
| } |
| } |
| } |
| |
| /** |
| * Send finish message for the given distributed process if needed. |
| * |
| * @param proc Process. |
| */ |
| private void sendFinishMessageIfNeeded(WalStateDistributedProcess proc) { |
| if (proc.completed()) |
| sendFinishMessage(proc.prepareFinishMessage()); |
| } |
| |
| /** |
| * Send finish message. |
| * |
| * @param finishMsg Finish message. |
| */ |
| private void sendFinishMessage(WalStateFinishMessage finishMsg) { |
| try { |
| cctx.discovery().sendCustomEvent(finishMsg); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to send WAL mode change finish message due to unexpected exception: " + finishMsg, e); |
| } |
| } |
| |
| /** |
| * Handle finish message in discovery thread. |
| * |
| * @param msg Message. |
| */ |
| public void onFinishDiscovery(WalStateFinishMessage msg) { |
| if (isDuplicate(msg)) |
| return; |
| |
| synchronized (mux) { |
| if (disconnected) |
| return; |
| |
| // Complete user future, if any. |
| GridFutureAdapter<Boolean> userFut = userFuts.get(msg.operationId()); |
| |
| if (userFut != null) { |
| if (msg.errorMessage() != null) |
| completeWithError(userFut, msg.errorMessage()); |
| else |
| complete(userFut, msg.changed()); |
| } |
| |
| // Clear pending data. |
| WalStateResult res = ress.remove(msg.operationId()); |
| |
| if (res == null && srv) |
| U.warn(log, "Received finish message for unknown operation (will ignore): " + msg.operationId()); |
| |
| procs.remove(msg.operationId()); |
| |
| CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId()); |
| |
| if (grpDesc != null && F.eq(grpDesc.deploymentId(), msg.groupDeploymentId())) { |
| // Toggle WAL mode in descriptor. |
| if (msg.changed()) |
| grpDesc.walEnabled(!grpDesc.walEnabled()); |
| |
| // Remove now-outdated message from the queue. |
| WalStateProposeMessage oldProposeMsg = grpDesc.nextWalChangeRequest(); |
| |
| assert oldProposeMsg != null; |
| assert F.eq(oldProposeMsg.operationId(), msg.operationId()); |
| |
| grpDesc.removeWalChangeRequest(); |
| |
| // Move next message to exchange thread. |
| WalStateProposeMessage nextProposeMsg = grpDesc.nextWalChangeRequest(); |
| |
| if (nextProposeMsg != null) |
| msg.exchangeMessage(nextProposeMsg); |
| } |
| |
| if (srv) { |
| // Remember operation ID to handle duplicates. |
| completedOpIds.add(msg.operationId()); |
| |
| // Remove possible stale messages. |
| Iterator<WalStateAckMessage> ackIter = pendingAcks.iterator(); |
| |
| while (ackIter.hasNext()) { |
| WalStateAckMessage ackMsg = ackIter.next(); |
| |
| if (F.eq(ackMsg.operationId(), msg.operationId())) |
| ackIter.remove(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handle node leave event. |
| * |
| * @param nodeId Node ID. |
| */ |
| public void onNodeLeft(UUID nodeId) { |
| if (!srv) |
| return; |
| |
| synchronized (mux) { |
| if (crdNode == null) { |
| assert ress.isEmpty(); |
| assert procs.isEmpty(); |
| |
| return; |
| } |
| |
| if (F.eq(crdNode.id(), nodeId)) { |
| // Coordinator exited, re-send to new, or initialize new distirbuted processes. |
| crdNode = null; |
| |
| for (WalStateResult res : ress.values()) |
| onCompletedLocally(res); |
| } |
| else if (F.eq(cctx.localNodeId(), crdNode.id())) { |
| // Notify distributed processes on node leave. |
| for (Map.Entry<UUID, WalStateDistributedProcess> procEntry : procs.entrySet()) { |
| WalStateDistributedProcess proc = procEntry.getValue(); |
| |
| proc.onNodeLeft(nodeId); |
| |
| sendFinishMessageIfNeeded(proc); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create future with error. |
| * |
| * @param errMsg Error message. |
| * @return Future. |
| */ |
| @SuppressWarnings("Convert2Diamond") |
| private static IgniteInternalFuture<Boolean> errorFuture(String errMsg) { |
| return new GridFinishedFuture<Boolean>(new IgniteCheckedException(errMsg)); |
| } |
| |
| /** |
| * Complete user future with normal result. |
| * |
| * @param userFut User future. |
| * @param res Result. |
| */ |
| private static void complete(@Nullable GridFutureAdapter<Boolean> userFut, boolean res) { |
| if (userFut != null) |
| userFut.onDone(res); |
| } |
| |
| /** |
| * Complete user future with error. |
| * |
| * @param errMsg Error message. |
| */ |
| private static void completeWithError(@Nullable GridFutureAdapter<Boolean> userFut, String errMsg) { |
| if (userFut != null) |
| userFut.onDone(new IgniteCheckedException(errMsg)); |
| } |
| |
| /** |
| * @return Cache processor. |
| */ |
| private GridCacheProcessor cacheProcessor() { |
| return cctx.cache(); |
| } |
| |
| /** |
| * Get current coordinator node. |
| * |
| * @return Coordinator node. |
| */ |
| private ClusterNode coordinator() { |
| assert Thread.holdsLock(mux); |
| |
| if (crdNode != null) |
| return crdNode; |
| else { |
| ClusterNode res = null; |
| |
| for (ClusterNode node : cctx.discovery().aliveServerNodes()) { |
| if (res == null || res.order() > node.order()) |
| res = node; |
| } |
| |
| assert res != null; |
| |
| crdNode = res; |
| |
| return res; |
| } |
| } |
| |
| /** |
| * Check if discovery message has already been received. |
| * |
| * @param msg Message. |
| * @return {@code True} if this is a duplicate. |
| */ |
| private boolean isDuplicate(WalStateAbstractMessage msg) { |
| T2<UUID, Boolean> key; |
| |
| if (msg instanceof WalStateProposeMessage) |
| key = new T2<>(msg.operationId(), true); |
| else { |
| assert msg instanceof WalStateFinishMessage; |
| |
| key = new T2<>(msg.operationId(), false); |
| } |
| |
| if (!discoMsgIdHist.add(key)) { |
| U.warn(log, "Received duplicate WAL mode change discovery message (will ignore): " + msg); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Add locally result to pending map. |
| * |
| * @param res Result. |
| */ |
| private void addResult(WalStateResult res) { |
| ress.put(res.message().operationId(), res); |
| } |
| |
| /** |
| * Force checkpoint. |
| * |
| * @param msg Message. |
| * @return Checkpoint future or {@code null} if failed to get checkpointer. |
| */ |
| @Nullable private CheckpointProgress triggerCheckpoint(String msg) { |
| return cctx.database().forceCheckpoint(msg); |
| } |
| |
| /** |
| * Await for the checkpoint to finish. |
| * |
| * @param cpFut Checkpoint future. |
| * @param msg Orignial message which triggered the process. |
| * @return Result. |
| */ |
| private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStateProposeMessage msg) { |
| WalStateResult res; |
| |
| try { |
| assert msg.affinityNode(); |
| |
| if (cpFut != null) |
| cpFut.futureFor(FINISHED).get(); |
| |
| res = new WalStateResult(msg, true); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to change WAL mode due to unexpected exception [msg=" + msg + ']', e); |
| |
| res = new WalStateResult(msg, "Failed to change WAL mode due to unexpected exception " + |
| "(see server logs for more information): " + e.getMessage()); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Checks WAL disabled for cache group. |
| * |
| * @param grpId Group id. |
| * @return {@code True} if WAL disable for group. {@code False} If not. |
| */ |
| public boolean isDisabled(int grpId) { |
| CacheGroupContext ctx = cctx.cache().cacheGroup(grpId); |
| |
| return ctx != null && !ctx.walEnabled(); |
| } |
| |
| /** |
| * @return WAL disable context. |
| */ |
| public WALDisableContext walDisableContext() { |
| return walDisableContext; |
| } |
| |
| /** |
| * None record will be logged in closure call. |
| * |
| * @param cls Closure to execute out of WAL scope. |
| * @throws IgniteCheckedException If operation failed. |
| */ |
| public void runWithOutWAL(IgniteRunnable cls) throws IgniteCheckedException { |
| WALDisableContext ctx = walDisableContext; |
| |
| if (ctx == null) |
| throw new IgniteCheckedException("Disable WAL context is not initialized."); |
| |
| ctx.execute(cls); |
| } |
| |
| /** |
| * WAL state change worker. |
| */ |
| private class WalStateChangeWorker extends GridWorker { |
| /** Message. */ |
| private final WalStateProposeMessage msg; |
| |
| /** Checkpoint future. */ |
| private final CheckpointProgress cpFut; |
| |
| /** |
| * Constructor. |
| * |
| * @param msg Propose message. |
| */ |
| private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointProgress cpFut) { |
| super(cctx.igniteInstanceName(), "wal-state-change-worker-" + msg.groupId(), WalStateManager.this.log); |
| |
| this.msg = msg; |
| this.cpFut = cpFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| WalStateResult res = awaitCheckpoint(cpFut, msg); |
| |
| addResult(res); |
| |
| onCompletedLocally(res); |
| } |
| } |
| |
| /** |
| * Temporary storage for disabled WALs of group. |
| */ |
| public static class WALDisableContext implements MetastorageLifecycleListener { |
| /** */ |
| public static final String WAL_DISABLED = "wal-disabled"; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private final IgniteCacheDatabaseSharedManager dbMgr; |
| |
| /** */ |
| private volatile ReadWriteMetastorage metaStorage; |
| |
| /** */ |
| private final IgnitePageStoreManager pageStoreMgr; |
| |
| /** */ |
| private volatile boolean resetWalFlag; |
| |
| /** */ |
| private volatile boolean disableWal; |
| |
| /** |
| * @param dbMgr Database manager. |
| * @param pageStoreMgr Page store manager. |
| * @param log |
| * |
| */ |
| public WALDisableContext( |
| IgniteCacheDatabaseSharedManager dbMgr, |
| IgnitePageStoreManager pageStoreMgr, |
| @Nullable IgniteLogger log |
| ) { |
| this.dbMgr = dbMgr; |
| this.pageStoreMgr = pageStoreMgr; |
| this.log = log; |
| } |
| |
| /** |
| * @param cls Closure to execute with disabled WAL. |
| * @throws IgniteCheckedException If execution failed. |
| */ |
| public void execute(IgniteRunnable cls) throws IgniteCheckedException { |
| if (cls == null) |
| throw new IgniteCheckedException("Task to execute is not specified."); |
| |
| if (metaStorage == null) |
| throw new IgniteCheckedException("Meta storage is not ready."); |
| |
| writeMetaStoreDisableWALFlag(); |
| |
| dbMgr.waitForCheckpoint("Checkpoint before apply updates on recovery."); |
| |
| disableWAL(true); |
| |
| try { |
| cls.run(); |
| } |
| catch (IgniteException e) { |
| throw new IgniteCheckedException(e); |
| } |
| finally { |
| disableWAL(false); |
| |
| dbMgr.waitForCheckpoint("Checkpoint after apply updates on recovery."); |
| |
| removeMetaStoreDisableWALFlag(); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If write meta store flag failed. |
| */ |
| protected void writeMetaStoreDisableWALFlag() throws IgniteCheckedException { |
| dbMgr.checkpointReadLock(); |
| |
| try { |
| metaStorage.write(WAL_DISABLED, Boolean.TRUE); |
| } |
| finally { |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If remove meta store flag failed. |
| */ |
| protected void removeMetaStoreDisableWALFlag() throws IgniteCheckedException { |
| dbMgr.checkpointReadLock(); |
| |
| try { |
| metaStorage.remove(WAL_DISABLED); |
| } |
| finally { |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * @param disable Flag wal disable. |
| */ |
| protected void disableWAL(boolean disable) throws IgniteCheckedException { |
| dbMgr.checkpointReadLock(); |
| |
| try { |
| disableWal = disable; |
| |
| if (log != null) |
| log.info("WAL logging " + (disable ? "disabled" : "enabled")); |
| } |
| finally { |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReadyForRead(ReadOnlyMetastorage ms) throws IgniteCheckedException { |
| Boolean disabled = (Boolean)ms.read(WAL_DISABLED); |
| |
| // Node crash when WAL was disabled. |
| if (disabled != null && disabled) { |
| resetWalFlag = true; |
| |
| pageStoreMgr.cleanupPersistentSpace(); |
| |
| dbMgr.cleanupTempCheckpointDirectory(); |
| |
| dbMgr.cleanupCheckpointDirectory(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReadyForReadWrite(ReadWriteMetastorage ms) throws IgniteCheckedException { |
| // On new node start WAL always enabled. Remove flag from metastore. |
| if (resetWalFlag) |
| ms.remove(WAL_DISABLED); |
| |
| metaStorage = ms; |
| } |
| |
| /** |
| * @return {@code true} If WAL is disabled. |
| */ |
| public boolean check() { |
| return disableWal; |
| } |
| } |
| |
| /** |
| * Checkpoint reason for enabling group durability. |
| * |
| * @param grpId Group id. |
| * @param topVer Topology version. |
| */ |
| public static String reason(long grpId, AffinityTopologyVersion topVer) { |
| return ENABLE_DURABILITY_AFTER_REBALANCING + grpId + "-" + topVer; |
| } |
| } |