| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.util.distributed; |
| |
| import java.io.Serializable; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.BiFunction; |
| import java.util.function.Function; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.CI3; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| |
| /** |
| * Distributed process is a cluster-wide process that accumulates single nodes results to finish itself. |
| * <p> |
| * The process consists of the following phases: |
| * <ol> |
| * <li>The initial request starts the process. The {@link InitMessage} sent via discovery.</li> |
| * <li>Each server node processes an initial request and sends the single node result to the coordinator. The |
| * {@link SingleNodeMessage} sent via communication.</li> |
| * <li>The coordinator accumulate all single nodes results and finish process. The {@link FullMessage} sent via |
| * discovery.</li> |
| * </ol> |
| * <p> |
| * Several processes of one type can be started at the same time. |
| * |
| * @param <I> Request type. |
| * @param <R> Result type. |
| * @see InitMessage |
| * @see FullMessage |
| */ |
| public class DistributedProcess<I extends Serializable, R extends Serializable> { |
| /** Process type. */ |
| private final DistributedProcessType type; |
| |
| /** Active processes. */ |
| private final ConcurrentHashMap<UUID, Process> processes = new ConcurrentHashMap<>(1); |
| |
| /** Synchronization mutex for coordinator initializing and the remaining collection operations. */ |
| private final Object mux = new Object(); |
| |
| /** Kernal context. */ |
| private final GridKernalContext ctx; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Factory which creates custom {@link InitMessage} for distributed process initialization. */ |
| private BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory; |
| |
| /** |
| * @param ctx Kernal context. |
| * @param type Process type. |
| * @param exec Execute action and returns future with the single node result to send to the coordinator. |
| * @param finish Finish process closure. Called on each node when all single nodes results received. |
| */ |
| public DistributedProcess( |
| GridKernalContext ctx, |
| DistributedProcessType type, |
| Function<I, IgniteInternalFuture<R>> exec, |
| CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish |
| ) { |
| this(ctx, type, exec, finish, (id, req) -> new InitMessage<>(id, type, req)); |
| } |
| |
| /** |
| * @param ctx Kernal context. |
| * @param type Process type. |
| * @param exec Execute action and returns future with the single node result to send to the coordinator. |
| * @param finish Finish process closure. Called on each node when all single nodes results received. |
| * @param initMsgFactory Factory which creates custom {@link InitMessage} for distributed process initialization. |
| */ |
| public DistributedProcess( |
| GridKernalContext ctx, |
| DistributedProcessType type, |
| Function<I, IgniteInternalFuture<R>> exec, |
| CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish, |
| BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory |
| ) { |
| this.ctx = ctx; |
| this.type = type; |
| this.initMsgFactory = initMsgFactory; |
| |
| log = ctx.log(getClass()); |
| |
| ctx.discovery().setCustomEventListener(InitMessage.class, (topVer, snd, msg) -> { |
| if (msg.type() != type.ordinal()) |
| return; |
| |
| Process p = processes.computeIfAbsent(msg.processId(), id -> new Process(msg.processId())); |
| |
| // May be completed in case of double delivering. |
| if (p.initFut.isDone()) |
| return; |
| |
| ClusterNode crd = coordinator(); |
| |
| if (crd == null) { |
| p.initFut.onDone(); |
| |
| onAllServersLeft(); |
| |
| return; |
| } |
| |
| p.crdId = crd.id(); |
| |
| if (crd.isLocal()) |
| initCoordinator(p, topVer); |
| |
| IgniteInternalFuture<R> fut = exec.apply((I)msg.request()); |
| |
| fut.listen(f -> { |
| if (f.error() != null) |
| p.resFut.onDone(f.error()); |
| else |
| p.resFut.onDone(f.result()); |
| |
| if (!ctx.clientNode()) { |
| assert crd != null; |
| |
| sendSingleMessage(p); |
| } |
| }); |
| |
| p.initFut.onDone(); |
| }); |
| |
| ctx.discovery().setCustomEventListener(FullMessage.class, (topVer, snd, msg0) -> { |
| if (msg0.type() != type.ordinal()) |
| return; |
| |
| FullMessage<R> msg = (FullMessage<R>)msg0; |
| |
| Process p = processes.get(msg.processId()); |
| |
| if (p == null) { |
| log.warning("Received the finish distributed process message for an uninitialized process " + |
| "(possible cause is message's double delivering) [msg=" + |
| msg + ']'); |
| |
| return; |
| } |
| |
| finish.apply(p.id, msg.result(), msg.error()); |
| |
| processes.remove(msg.processId()); |
| }); |
| |
| ctx.io().addMessageListener(GridTopic.TOPIC_DISTRIBUTED_PROCESS, (nodeId, msg0, plc) -> { |
| if (msg0 instanceof SingleNodeMessage && ((SingleNodeMessage)msg0).type() == type.ordinal()) { |
| SingleNodeMessage<R> msg = (SingleNodeMessage<R>)msg0; |
| |
| if (msg.type() == type.ordinal()) |
| onSingleNodeMessageReceived(msg, nodeId); |
| } |
| }); |
| |
| ctx.event().addDiscoveryEventListener((evt, discoCache) -> { |
| UUID leftNodeId = evt.eventNode().id(); |
| |
| for (Process p : processes.values()) { |
| p.initFut.listen(fut -> { |
| if (F.eq(leftNodeId, p.crdId)) { |
| ClusterNode crd = coordinator(); |
| |
| if (crd == null) { |
| onAllServersLeft(); |
| |
| return; |
| } |
| |
| p.crdId = crd.id(); |
| |
| if (crd.isLocal()) |
| initCoordinator(p, discoCache.version()); |
| |
| if (!ctx.clientNode()) |
| p.resFut.listen(f -> sendSingleMessage(p)); |
| } |
| else if (F.eq(ctx.localNodeId(), p.crdId)) { |
| boolean isEmpty = false; |
| |
| synchronized (mux) { |
| if (p.remaining.remove(leftNodeId)) |
| isEmpty = p.remaining.isEmpty(); |
| } |
| |
| if (isEmpty) |
| finishProcess(p); |
| } |
| }); |
| } |
| }, EVT_NODE_FAILED, EVT_NODE_LEFT); |
| } |
| |
| /** |
| * Starts distributed process. |
| * |
| * @param id Process id. |
| * @param req Initial request. |
| */ |
| public void start(UUID id, I req) { |
| try { |
| ctx.discovery().sendCustomEvent(initMsgFactory.apply(id, req)); |
| } |
| catch (IgniteCheckedException e) { |
| log.warning("Unable to start process.", e); |
| } |
| } |
| |
| /** |
| * Initiates process coordinator. |
| * |
| * @param p Process. |
| * @param topVer Topology version. |
| */ |
| private void initCoordinator(Process p, AffinityTopologyVersion topVer) { |
| synchronized (mux) { |
| if (p.initCrdFut.isDone()) |
| return; |
| |
| assert p.remaining.isEmpty(); |
| |
| p.remaining.addAll(F.viewReadOnly(ctx.discovery().serverNodes(topVer), F.node2id())); |
| |
| p.initCrdFut.onDone(); |
| } |
| } |
| |
| /** |
| * Sends single node message to coordinator. |
| * |
| * @param p Process. |
| */ |
| private void sendSingleMessage(Process p) { |
| assert p.resFut.isDone(); |
| |
| SingleNodeMessage<R> singleMsg = new SingleNodeMessage<>(p.id, type, p.resFut.result(), |
| (Exception)p.resFut.error()); |
| |
| UUID crdId = p.crdId; |
| |
| if (F.eq(ctx.localNodeId(), crdId)) |
| onSingleNodeMessageReceived(singleMsg, crdId); |
| else { |
| try { |
| ctx.io().sendToGridTopic(crdId, GridTopic.TOPIC_DISTRIBUTED_PROCESS, singleMsg, SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| // The coordinator has failed. The single message will be sent when a new coordinator initialized. |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to send a single message to coordinator: [crdId=" + crdId + |
| ", processId=" + p.id + ", error=" + e.getMessage() + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Unable to send message to coordinator.", e); |
| |
| ctx.failure().process(new FailureContext(CRITICAL_ERROR, |
| new Exception("Unable to send message to coordinator.", e))); |
| } |
| } |
| } |
| |
| /** |
| * Processes the received single node message. |
| * |
| * @param msg Message. |
| * @param nodeId Node id. |
| */ |
| private void onSingleNodeMessageReceived(SingleNodeMessage<R> msg, UUID nodeId) { |
| Process p = processes.computeIfAbsent(msg.processId(), id -> new Process(msg.processId())); |
| |
| p.initCrdFut.listen(f -> { |
| boolean isEmpty; |
| |
| synchronized (mux) { |
| if (p.remaining.remove(nodeId)) |
| p.singleMsgs.put(nodeId, msg); |
| |
| isEmpty = p.remaining.isEmpty(); |
| } |
| |
| if (isEmpty) |
| finishProcess(p); |
| }); |
| } |
| |
| /** |
| * Creates and sends finish message when all single nodes result received. |
| * |
| * @param p Process. |
| */ |
| private void finishProcess(Process p) { |
| HashMap<UUID, R> res = new HashMap<>(); |
| |
| HashMap<UUID, Exception> err = new HashMap<>(); |
| |
| p.singleMsgs.forEach((uuid, msg) -> { |
| if (msg.hasError()) |
| err.put(uuid, msg.error()); |
| else |
| res.put(uuid, msg.response()); |
| }); |
| |
| FullMessage<R> msg = new FullMessage<>(p.id, type, res, err); |
| |
| try { |
| ctx.discovery().sendCustomEvent(msg); |
| } |
| catch (IgniteCheckedException e) { |
| log.warning("Unable to send action message.", e); |
| } |
| } |
| |
| /** Handles case when all server nodes have left the grid. */ |
| private void onAllServersLeft() { |
| processes.clear(); |
| } |
| |
| /** @return Cluster coordinator, {@code null} if failed to determine. */ |
| private @Nullable ClusterNode coordinator() { |
| return U.oldest(ctx.discovery().aliveServerNodes(), null); |
| } |
| |
| /** The process meta information. */ |
| private class Process { |
| /** Process id. */ |
| private final UUID id; |
| |
| /** Init coordinator future. */ |
| private final GridFutureAdapter<Void> initCrdFut = new GridFutureAdapter<>(); |
| |
| /** Coordinator node id. */ |
| private volatile UUID crdId; |
| |
| /** Init process future. */ |
| private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>(); |
| |
| /** Remaining nodes ids to received single nodes result. */ |
| private final Set<UUID> remaining = new GridConcurrentHashSet<>(); |
| |
| /** Future for a local action result. */ |
| private final GridFutureAdapter<R> resFut = new GridFutureAdapter<>(); |
| |
| /** Nodes results. */ |
| private final ConcurrentHashMap<UUID, SingleNodeMessage<R>> singleMsgs = new ConcurrentHashMap<>(); |
| |
| /** @param id Process id. */ |
| private Process(UUID id) { |
| this.id = id; |
| } |
| } |
| |
| /** Defines distributed processes. */ |
| public enum DistributedProcessType { |
| /** For test purposes only. */ |
| TEST_PROCESS, |
| |
| /** |
| * Master key change prepare process. |
| * |
| * @see GridEncryptionManager |
| */ |
| MASTER_KEY_CHANGE_PREPARE, |
| |
| /** |
| * Master key change finish process. |
| * |
| * @see GridEncryptionManager |
| */ |
| MASTER_KEY_CHANGE_FINISH, |
| |
| /** |
| * Start snapshot procedure. |
| * |
| * @see IgniteSnapshotManager |
| */ |
| START_SNAPSHOT, |
| |
| /** |
| * End snapshot procedure. |
| * |
| * @see IgniteSnapshotManager |
| */ |
| END_SNAPSHOT, |
| |
| /** |
| * Cache group encyption key change prepare phase. |
| */ |
| CACHE_GROUP_KEY_CHANGE_PREPARE, |
| |
| /** |
| * Cache group encyption key change perform phase. |
| */ |
| CACHE_GROUP_KEY_CHANGE_FINISH, |
| |
| /** |
| * Rotate performance statistics. |
| */ |
| PERFORMANCE_STATISTICS_ROTATE, |
| |
| /** |
| * Cache group restore prepare phase. |
| */ |
| RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, |
| |
| /** |
| * Cache group restore preload phase. |
| */ |
| RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, |
| |
| /** |
| * Cache group restore cache start phase. |
| */ |
| RESTORE_CACHE_GROUP_SNAPSHOT_START, |
| |
| /** |
| * Cache group restore rollback phase. |
| */ |
| RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK |
| } |
| } |