| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.worker; |
| |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.internal.util.worker.GridWorkerListener; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.jetbrains.annotations.NotNull; |
| |
| import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_BLOCKED; |
| import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; |
| |
| /** |
| * Workers registry. Maintains a set of workers currently running. |
| * Can perform periodic liveness checks for these workers on behalf of any of them. |
| */ |
| public class WorkersRegistry implements GridWorkerListener { |
| /** */ |
| private static final long DFLT_CHECK_INTERVAL = 3_000; |
| |
| /** Registered workers. */ |
| private final ConcurrentMap<String, GridWorker> registeredWorkers = new ConcurrentHashMap<>(); |
| |
| /** Whether workers' liveness checking enabled or not. */ |
| private volatile boolean livenessCheckEnabled = true; |
| |
| /** Points to the next worker to check. */ |
| private volatile Iterator<Map.Entry<String, GridWorker>> checkIter = registeredWorkers.entrySet().iterator(); |
| |
| /** It's safe to omit 'volatile' due to memory effects of lastChecker. */ |
| private long lastCheckTs = U.currentTimeMillis(); |
| |
| /** Last thread that performed the check. Null reference denotes "checking is in progress". */ |
| private final AtomicReference<Thread> lastChecker = new AtomicReference<>(Thread.currentThread()); |
| |
| /** */ |
| private final IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd; |
| |
| /** |
| * Maximum inactivity period for system worker in milliseconds, when exceeded, worker is considered as blocked. |
| */ |
| private volatile long sysWorkerBlockedTimeout; |
| |
| /** Time in milliseconds between successive workers checks. */ |
| private final long checkInterval; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** |
| * @param workerFailedHnd Closure to invoke on worker failure. |
| * @param sysWorkerBlockedTimeout Maximum allowed worker heartbeat interval in milliseconds, non-positive value denotes |
| * infinite interval. |
| */ |
| public WorkersRegistry( |
| @NotNull IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd, |
| long sysWorkerBlockedTimeout, |
| IgniteLogger log |
| ) { |
| this.workerFailedHnd = workerFailedHnd; |
| this.sysWorkerBlockedTimeout = U.ensurePositive(sysWorkerBlockedTimeout, Long.MAX_VALUE); |
| this.checkInterval = Math.min(DFLT_CHECK_INTERVAL, sysWorkerBlockedTimeout); |
| this.log = log; |
| } |
| |
| /** |
| * Adds worker to the registry. |
| * |
| * @param w Worker. |
| */ |
| public void register(GridWorker w) { |
| if (registeredWorkers.putIfAbsent(w.runner().getName(), w) != null) |
| throw new IllegalStateException("Worker is already registered [worker=" + w + ']'); |
| |
| checkIter = registeredWorkers.entrySet().iterator(); |
| } |
| |
| /** |
| * Removes worker from the registry. |
| * |
| * @param name Worker name. |
| */ |
| public void unregister(String name) { |
| registeredWorkers.remove(name); |
| |
| checkIter = registeredWorkers.entrySet().iterator(); |
| } |
| |
| /** |
| * Returns names of all registered workers. |
| * |
| * @return Registered worker names. |
| */ |
| public Collection<String> names() { |
| return registeredWorkers.keySet(); |
| } |
| |
| /** |
| * Returns worker with given name. |
| * |
| * @param name Name. |
| * @return Registered {@link GridWorker} with name {@code name} or {@code null} if not found. |
| */ |
| public GridWorker worker(String name) { |
| return registeredWorkers.get(name); |
| } |
| |
| /** */ |
| public boolean livenessCheckEnabled() { |
| return livenessCheckEnabled; |
| } |
| |
| /** */ |
| public void livenessCheckEnabled(boolean val) { |
| livenessCheckEnabled = val; |
| } |
| |
| /** |
| * Returns maximum inactivity period for system worker. When exceeded, worker is considered as blocked. |
| * |
| * @return Maximum inactivity period for system worker in milliseconds. |
| */ |
| public long getSystemWorkerBlockedTimeout() { |
| return sysWorkerBlockedTimeout == Long.MAX_VALUE ? 0 : sysWorkerBlockedTimeout; |
| } |
| |
| /** |
| * Sets maximum inactivity period for system worker. When exceeded, worker is considered as blocked. |
| * |
| * @param val Maximum inactivity period for system worker in milliseconds. |
| */ |
| public void setSystemWorkerBlockedTimeout(long val) { |
| sysWorkerBlockedTimeout = U.ensurePositive(val, Long.MAX_VALUE); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onStarted(GridWorker w) { |
| register(w); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onStopped(GridWorker w) { |
| if (!w.isCancelled()) |
| workerFailedHnd.apply(w, SYSTEM_WORKER_TERMINATION); |
| |
| unregister(w.runner().getName()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onIdle(GridWorker w) { |
| if (!livenessCheckEnabled) |
| return; |
| |
| Thread prevCheckerThread = lastChecker.get(); |
| |
| if (prevCheckerThread == null || |
| U.currentTimeMillis() - lastCheckTs <= checkInterval || |
| !lastChecker.compareAndSet(prevCheckerThread, null)) |
| return; |
| |
| try { |
| lastCheckTs = U.currentTimeMillis(); |
| |
| long workersToCheck = Math.max(registeredWorkers.size() * checkInterval / sysWorkerBlockedTimeout, 1); |
| |
| int workersChecked = 0; |
| |
| while (workersChecked < workersToCheck) { |
| if (!checkIter.hasNext()) |
| checkIter = registeredWorkers.entrySet().iterator(); |
| |
| GridWorker worker; |
| |
| try { |
| worker = checkIter.next().getValue(); |
| } |
| catch (NoSuchElementException e) { |
| return; |
| } |
| |
| Thread runner = worker.runner(); |
| |
| if (runner != null && runner != Thread.currentThread() && !worker.isCancelled()) { |
| if (!runner.isAlive()) { |
| // In normal operation GridWorker implementation guarantees: |
| // worker termination happens before its removal from registeredWorkers. |
| // That is, if worker is dead, but still resides in registeredWorkers |
| // then something went wrong, the only extra thing is to test |
| // whether the iterator refers to actual state of registeredWorkers. |
| GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); |
| |
| if (worker0 != null && worker0 == worker) |
| workerFailedHnd.apply(worker, SYSTEM_WORKER_TERMINATION); |
| } |
| |
| long heartbeatDelay = U.currentTimeMillis() - worker.heartbeatTs(); |
| |
| if (heartbeatDelay > sysWorkerBlockedTimeout) { |
| GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); |
| |
| if (worker0 != null && worker0 == worker) { |
| log.error("Blocked system-critical thread has been detected. " + |
| "This can lead to cluster-wide undefined behaviour " + |
| "[threadName=" + worker.name() + ", blockedFor=" + heartbeatDelay / 1000 + "s]"); |
| |
| U.dumpThread(worker.runner(), log); |
| |
| workerFailedHnd.apply(worker, SYSTEM_WORKER_BLOCKED); |
| } |
| |
| // Iterator should not be reset: |
| // otherwise we'll never iterate beyond the blocked worker, |
| // that may stay in the map for indefinite time. |
| } |
| } |
| |
| if (runner != Thread.currentThread()) |
| workersChecked++; |
| } |
| } |
| finally { |
| boolean set = lastChecker.compareAndSet(null, Thread.currentThread()); |
| |
| assert set; |
| } |
| } |
| } |