blob: 15e8afb2487dba6b21c1418949cce52081807d78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
/**
* Service deployment manager.
*
* @see ServiceDeploymentTask
* @see ServiceDeploymentActions
*/
public class ServiceDeploymentManager {
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Services discovery messages listener. */
private final DiscoveryEventListener discoLsnr = new ServiceDiscoveryListener();
/** Services communication messages listener. */
private final GridMessageListener commLsnr = new ServiceCommunicationListener();
/** Services deployments tasks. */
private final Map<ServiceDeploymentProcessId, ServiceDeploymentTask> tasks = new ConcurrentHashMap<>();
/** Discovery events received while cluster state transition was in progress. */
private final List<PendingEventHolder> pendingEvts = new ArrayList<>();
/** Topology version of latest deployment task's event. */
private final AtomicReference<AffinityTopologyVersion> readyTopVer =
new AtomicReference<>(AffinityTopologyVersion.NONE);
/** Kernal context. */
private final GridKernalContext ctx;
/** Logger. */
private final IgniteLogger log;
/** Deployment worker. */
private final ServicesDeploymentWorker depWorker;
/** Default dump operation limit. */
private final long dfltDumpTimeoutLimit;
/**
* @param ctx Grid kernal context.
*/
ServiceDeploymentManager(@NotNull GridKernalContext ctx) {
this.ctx = ctx;
log = ctx.log(getClass());
ctx.event().addDiscoveryEventListener(discoLsnr,
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT);
ctx.io().addMessageListener(TOPIC_SERVICES, commLsnr);
depWorker = new ServicesDeploymentWorker();
long limit = getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT, 0);
dfltDumpTimeoutLimit = limit <= 0 ? 30 * 60_000 : limit;
}
/**
* Starts processing of services deployments tasks.
*/
void startProcessing() {
assert depWorker.runner() == null : "Method shouldn't be called twice during lifecycle;";
new IgniteThread(ctx.igniteInstanceName(), "services-deployment-worker", depWorker).start();
}
/**
* Stops processing of services deployments tasks.
*
* @param stopErr Cause error of deployment manager stop.
*/
void stopProcessing(IgniteCheckedException stopErr) {
busyLock.block();
try {
ctx.event().removeDiscoveryEventListener(discoLsnr);
ctx.io().removeMessageListener(commLsnr);
U.cancel(depWorker);
U.join(depWorker, log);
depWorker.tasksQueue.clear();
pendingEvts.clear();
tasks.values().forEach(t -> t.completeError(stopErr));
tasks.clear();
}
finally {
busyLock.unblock();
}
}
/**
* @return Ready topology version.
*/
public AffinityTopologyVersion readyTopologyVersion() {
return readyTopVer.get();
}
/**
* Special handler for local discovery events for which the regular events are not generated, e.g. local join and
* client reconnect events.
*
* @param evt Discovery event.
* @param discoCache Discovery cache.
* @param depActions Service deployment actions.
*/
void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache, ServiceDeploymentActions depActions) {
checkClusterStateAndAddTask(evt, discoCache, depActions);
}
/**
* Invokes {@link GridWorker#blockingSectionBegin()} for service deployment worker.
* <p/>
* Should be called from service deployment worker thread.
*/
void deployerBlockingSectionBegin() {
assert depWorker != null && Thread.currentThread() == depWorker.runner();
depWorker.blockingSectionBegin();
}
/**
* Invokes {@link GridWorker#blockingSectionEnd()} for service deployment worker.
* <p/>
* Should be called from service deployment worker thread.
*/
void deployerBlockingSectionEnd() {
assert depWorker != null && Thread.currentThread() == depWorker.runner();
depWorker.blockingSectionEnd();
}
/**
* Checks cluster state and handles given event.
* <pre>
* - if cluster is active, then adds event in deployment queue;
* - if cluster state in transition, them adds to pending events;
* - if cluster is inactive, then ignore event;
* </pre>
* <b>Should be called from discovery thread.</b>
*
* @param evt Discovery event.
* @param discoCache Discovery cache.
* @param depActions Services deployment actions.
*/
private void checkClusterStateAndAddTask(@NotNull DiscoveryEvent evt, @NotNull DiscoCache discoCache,
@Nullable ServiceDeploymentActions depActions) {
if (discoCache.state().transition())
pendingEvts.add(new PendingEventHolder(evt, discoCache.version(), depActions));
else if (discoCache.state().active())
addTask(evt, discoCache.version(), depActions);
else if (log.isDebugEnabled())
log.debug("Ignore event, cluster is inactive, evt=" + evt);
}
/**
* Adds deployment task with given deployment process id.
* </p>
* <b>Should be called from discovery thread.</b>
*
* @param evt Discovery event.
* @param topVer Topology version.
* @param depActions Services deployment actions.
*/
private void addTask(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersion topVer,
@Nullable ServiceDeploymentActions depActions) {
final ServiceDeploymentProcessId depId = deploymentId(evt, topVer);
ServiceDeploymentTask task = tasks.computeIfAbsent(depId,
t -> new ServiceDeploymentTask(ctx, depId));
if (!task.onEnqueued()) {
if (log.isDebugEnabled()) {
log.debug("Service deployment process hasn't been started for discovery event, because of " +
"a task with the same deployment process id is already added (possible cause is message's" +
" double delivering), evt=" + evt);
}
return;
}
assert task.event() == null && task.topologyVersion() == null;
task.onEvent(evt, topVer, depActions);
depWorker.tasksQueue.add(task);
}
/**
* Creates service deployment process id.
*
* @param evt Discovery event.
* @param topVer Topology version.
* @return Services deployment process id.
*/
private ServiceDeploymentProcessId deploymentId(@NotNull DiscoveryEvent evt,
@NotNull AffinityTopologyVersion topVer) {
return evt instanceof DiscoveryCustomEvent ?
new ServiceDeploymentProcessId(((DiscoveryCustomEvent)evt).customMessage().id()) :
new ServiceDeploymentProcessId(topVer);
}
/**
* Clones some instances of {@link DiscoveryCustomEvent} to capture necessary data, to avoid custom messages's
* nullifying by {@link GridDhtPartitionsExchangeFuture#onDone}.
*
* @param evt Discovery event.
* @return Discovery event to process.
*/
private DiscoveryCustomEvent copyIfNeeded(@NotNull DiscoveryCustomEvent evt) {
DiscoveryCustomMessage msg = evt.customMessage();
assert msg != null : "DiscoveryCustomMessage has been nullified concurrently, evt=" + evt;
if (msg instanceof ServiceChangeBatchRequest)
return evt;
DiscoveryCustomEvent cp = new DiscoveryCustomEvent();
cp.node(evt.node());
cp.customMessage(msg);
cp.eventNode(evt.eventNode());
cp.affinityTopologyVersion(evt.affinityTopologyVersion());
return cp;
}
/**
* Services discovery messages high priority listener.
* <p/>
* The listener should be notified earlier then PME's listener because of a custom message of {@link
* DiscoveryCustomEvent} may be nullified in PME before the listener will be able to capture it.
*/
private class ServiceDiscoveryListener implements DiscoveryEventListener, HighPriorityListener {
/** {@inheritDoc} */
@Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) {
if (!enterBusy())
return;
try {
final UUID snd = evt.eventNode().id();
final int evtType = evt.type();
assert snd != null : "Event's node id shouldn't be null.";
assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED
|| evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was received, evt=" + evt;
if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
if (msg instanceof ChangeGlobalStateFinishMessage) {
ChangeGlobalStateFinishMessage msg0 = (ChangeGlobalStateFinishMessage)msg;
if (msg0.clusterActive())
pendingEvts.forEach(t -> addTask(t.evt, t.topVer, t.depActions));
else if (log.isDebugEnabled())
pendingEvts.forEach(t -> log.debug("Ignore event, cluster is inactive: " + t.evt));
pendingEvts.clear();
}
else {
if (msg instanceof ServiceClusterDeploymentResultBatch) {
ServiceClusterDeploymentResultBatch msg0 = (ServiceClusterDeploymentResultBatch)msg;
if (log.isDebugEnabled()) {
log.debug("Received services full deployments message : " +
"[locId=" + ctx.localNodeId() + ", snd=" + snd + ", msg=" + msg0 + ']');
}
ServiceDeploymentProcessId depId = msg0.deploymentId();
assert depId != null;
ServiceDeploymentTask task = tasks.get(depId);
if (task != null) // May be null in case of double delivering
task.onReceiveFullDeploymentsMessage(msg0);
}
else if (msg instanceof CacheAffinityChangeMessage)
addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), null);
else {
ServiceDeploymentActions depActions = null;
if (msg instanceof ChangeGlobalStateMessage)
depActions = ((ChangeGlobalStateMessage)msg).servicesDeploymentActions();
else if (msg instanceof ServiceChangeBatchRequest) {
depActions = ((ServiceChangeBatchRequest)msg)
.servicesDeploymentActions();
}
else if (msg instanceof DynamicCacheChangeBatch)
depActions = ((DynamicCacheChangeBatch)msg).servicesDeploymentActions();
if (depActions != null)
addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), depActions);
}
}
}
else {
if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED)
tasks.values().forEach(t -> t.onNodeLeft(snd));
checkClusterStateAndAddTask(evt, discoCache, null);
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public int order() {
return 0;
}
}
/**
* Pending event's holder.
*/
private static class PendingEventHolder {
/** Discovery event. */
private DiscoveryEvent evt;
/** Topology version. */
private AffinityTopologyVersion topVer;
/** Services deployemnt actions. */
private ServiceDeploymentActions depActions;
/**
* @param evt Discovery event.
* @param topVer Topology version.
* @param depActions Services deployment actions.
*/
private PendingEventHolder(DiscoveryEvent evt,
AffinityTopologyVersion topVer, ServiceDeploymentActions depActions) {
this.evt = evt;
this.topVer = topVer;
this.depActions = depActions;
}
}
/**
* Services messages communication listener.
*/
private class ServiceCommunicationListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!enterBusy())
return;
try {
if (msg instanceof ServiceSingleNodeDeploymentResultBatch) {
ServiceSingleNodeDeploymentResultBatch msg0 = (ServiceSingleNodeDeploymentResultBatch)msg;
if (log.isDebugEnabled()) {
log.debug("Received services single deployments message : " +
"[locId=" + ctx.localNodeId() + ", snd=" + nodeId + ", msg=" + msg0 + ']');
}
tasks.computeIfAbsent(msg0.deploymentId(),
t -> new ServiceDeploymentTask(ctx, msg0.deploymentId()))
.onReceiveSingleDeploymentsMessage(nodeId, msg0);
}
}
finally {
leaveBusy();
}
}
}
/**
* Services deployment worker.
*/
private class ServicesDeploymentWorker extends GridWorker {
/** Queue to process. */
private final LinkedBlockingQueue<ServiceDeploymentTask> tasksQueue = new LinkedBlockingQueue<>();
/** {@inheritDoc} */
private ServicesDeploymentWorker() {
super(ctx.igniteInstanceName(), "services-deployment-worker",
ServiceDeploymentManager.this.log, ctx.workersRegistry());
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
Throwable err = null;
try {
ServiceDeploymentTask task;
while (!isCancelled()) {
onIdle();
blockingSectionBegin();
try {
task = tasksQueue.take();
}
finally {
blockingSectionEnd();
}
if (isCancelled())
Thread.currentThread().interrupt();
task.init();
final long dumpTimeout = 2 * ctx.config().getNetworkTimeout();
long dumpCnt = 0;
long nextDumpTime = 0;
while (true) {
try {
blockingSectionBegin();
try {
task.waitForComplete(dumpTimeout);
}
finally {
blockingSectionEnd();
}
taskPostProcessing(task);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
if (isCancelled())
return;
if (nextDumpTime <= U.currentTimeMillis()) {
log.warning("Failed to wait service deployment process or timeout had been" +
" reached, timeout=" + dumpTimeout +
(log.isDebugEnabled() ? ", task=" + task : ", taskDepId=" + task.deploymentId()));
long nextTimeout = dumpTimeout * (2 + dumpCnt++);
nextDumpTime = U.currentTimeMillis() + Math.min(nextTimeout, dfltDumpTimeoutLimit);
}
}
catch (ClusterTopologyServerNotFoundException e) {
U.error(log, e);
taskPostProcessing(task);
break;
}
}
}
}
catch (InterruptedException | IgniteInterruptedCheckedException e) {
Thread.currentThread().interrupt();
if (!isCancelled())
err = e;
}
catch (Throwable t) {
err = t;
}
finally {
if (err == null && !isCancelled())
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly.");
if (err instanceof OutOfMemoryError)
ctx.failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
/**
* Does additional actions after task's completion.
*/
private void taskPostProcessing(ServiceDeploymentTask task) {
AffinityTopologyVersion readyVer = readyTopVer.get();
readyTopVer.compareAndSet(readyVer, task.topologyVersion());
tasks.remove(task.deploymentId());
}
}
/**
* Enters busy state.
*
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
return busyLock.enterBusy();
}
/**
* Leaves busy state.
*/
private void leaveBusy() {
busyLock.leaveBusy();
}
}