| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.valves; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import jakarta.servlet.ServletException; |
| |
| import org.apache.catalina.LifecycleException; |
| import org.apache.catalina.connector.Request; |
| import org.apache.catalina.connector.Response; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| import org.apache.tomcat.util.res.StringManager; |
| |
| /** |
| * This valve allows to detect requests that take a long time to process, which might indicate that the thread that is |
| * processing it is stuck. |
| */ |
| public class StuckThreadDetectionValve extends ValveBase { |
| |
| /** |
| * Logger |
| */ |
| private static final Log log = LogFactory.getLog(StuckThreadDetectionValve.class); |
| |
| /** |
| * The string manager for this package. |
| */ |
| private static final StringManager sm = StringManager.getManager(Constants.Package); |
| |
| /** |
| * Keeps count of the number of stuck threads detected |
| */ |
| private final AtomicInteger stuckCount = new AtomicInteger(0); |
| |
| /** |
| * Keeps count of the number of stuck threads that have been interrupted |
| */ |
| private final AtomicLong interruptedThreadsCount = new AtomicLong(); |
| |
| /** |
| * In seconds. Default 600 (10 minutes). |
| */ |
| private int threshold = 600; |
| |
| /** |
| * In seconds. Default is -1 to disable interruption. |
| */ |
| private int interruptThreadThreshold; |
| |
| /** |
| * The only references we keep to actual running Thread objects are in this Map (which is automatically cleaned in |
| * invoke()s finally clause). That way, Threads can be GC'ed, even though the Valve still thinks they are stuck |
| * (caused by a long monitor interval) |
| */ |
| private final Map<Long,MonitoredThread> activeThreads = new ConcurrentHashMap<>(); |
| |
| private final Queue<CompletedStuckThread> completedStuckThreadsQueue = new ConcurrentLinkedQueue<>(); |
| |
| /** |
| * Specifies the threshold (in seconds) used when checking for stuck threads. If <=0, the detection is disabled. |
| * The default is 600 seconds. |
| * |
| * @param threshold The new threshold in seconds |
| */ |
| public void setThreshold(int threshold) { |
| this.threshold = threshold; |
| } |
| |
| /** |
| * @see #setThreshold(int) |
| * |
| * @return The current threshold in seconds |
| */ |
| public int getThreshold() { |
| return threshold; |
| } |
| |
| |
| public int getInterruptThreadThreshold() { |
| return interruptThreadThreshold; |
| } |
| |
| /** |
| * Specifies the threshold (in seconds) before stuck threads are interrupted. If <=0, the interruption is |
| * disabled. The default is -1. If >=0, the value must actually be >= threshold. |
| * |
| * @param interruptThreadThreshold The new thread interruption threshold in seconds |
| */ |
| public void setInterruptThreadThreshold(int interruptThreadThreshold) { |
| this.interruptThreadThreshold = interruptThreadThreshold; |
| } |
| |
| /** |
| * Required to enable async support. |
| */ |
| public StuckThreadDetectionValve() { |
| super(true); |
| } |
| |
| |
| @Override |
| protected void initInternal() throws LifecycleException { |
| super.initInternal(); |
| |
| if (log.isTraceEnabled()) { |
| log.trace("Monitoring stuck threads with threshold = " + threshold + " sec"); |
| } |
| } |
| |
| private void notifyStuckThreadDetected(MonitoredThread monitoredThread, long activeTime, int numStuckThreads) { |
| if (log.isWarnEnabled()) { |
| @SuppressWarnings("deprecation") |
| String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadDetected", |
| monitoredThread.getThread().getName(), Long.valueOf(activeTime), monitoredThread.getStartTime(), |
| Integer.valueOf(numStuckThreads), monitoredThread.getRequestUri(), Integer.valueOf(threshold), |
| String.valueOf(monitoredThread.getThread().getId())); |
| // msg += "\n" + getStackTraceAsString(trace); |
| Throwable th = new Throwable(); |
| th.setStackTrace(monitoredThread.getThread().getStackTrace()); |
| log.warn(msg, th); |
| } |
| } |
| |
| private void notifyStuckThreadCompleted(CompletedStuckThread thread, int numStuckThreads) { |
| if (log.isWarnEnabled()) { |
| String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadCompleted", thread.getName(), |
| Long.valueOf(thread.getTotalActiveTime()), Integer.valueOf(numStuckThreads), |
| String.valueOf(thread.getId())); |
| // Since the "stuck thread notification" is warn, this should also |
| // be warn |
| log.warn(msg); |
| } |
| } |
| |
| @Override |
| public void invoke(Request request, Response response) throws IOException, ServletException { |
| |
| if (threshold <= 0) { |
| // short-circuit if not monitoring stuck threads |
| getNext().invoke(request, response); |
| return; |
| } |
| |
| // Save the thread/runnable |
| // Keeping a reference to the thread object here does not prevent |
| // GC'ing, as the reference is removed from the Map in the finally clause |
| |
| Thread currentThread = Thread.currentThread(); |
| @SuppressWarnings("deprecation") |
| Long key = Long.valueOf(currentThread.getId()); |
| StringBuffer requestUrl = request.getRequestURL(); |
| if (request.getQueryString() != null) { |
| requestUrl.append('?'); |
| requestUrl.append(request.getQueryString()); |
| } |
| MonitoredThread monitoredThread = |
| new MonitoredThread(currentThread, requestUrl.toString(), interruptThreadThreshold > 0); |
| activeThreads.put(key, monitoredThread); |
| |
| try { |
| getNext().invoke(request, response); |
| } finally { |
| activeThreads.remove(key); |
| if (monitoredThread.markAsDone() == MonitoredThreadState.STUCK) { |
| if (monitoredThread.wasInterrupted()) { |
| interruptedThreadsCount.incrementAndGet(); |
| } |
| completedStuckThreadsQueue.add( |
| new CompletedStuckThread(monitoredThread.getThread(), monitoredThread.getActiveTimeInMillis())); |
| } |
| } |
| } |
| |
| @Override |
| public void backgroundProcess() { |
| super.backgroundProcess(); |
| |
| long thresholdInMillis = threshold * 1000L; |
| |
| // Check monitored threads, being careful that the request might have |
| // completed by the time we examine it |
| for (MonitoredThread monitoredThread : activeThreads.values()) { |
| long activeTime = monitoredThread.getActiveTimeInMillis(); |
| |
| if (activeTime >= thresholdInMillis && monitoredThread.markAsStuckIfStillRunning()) { |
| int numStuckThreads = stuckCount.incrementAndGet(); |
| notifyStuckThreadDetected(monitoredThread, activeTime, numStuckThreads); |
| } |
| if (interruptThreadThreshold > 0 && activeTime >= interruptThreadThreshold * 1000L) { |
| monitoredThread.interruptIfStuck(interruptThreadThreshold); |
| } |
| } |
| // Check if any threads previously reported as stuck, have finished. |
| for (CompletedStuckThread completedStuckThread = |
| completedStuckThreadsQueue.poll(); completedStuckThread != null; completedStuckThread = |
| completedStuckThreadsQueue.poll()) { |
| |
| int numStuckThreads = stuckCount.decrementAndGet(); |
| notifyStuckThreadCompleted(completedStuckThread, numStuckThreads); |
| } |
| } |
| |
| public int getStuckThreadCount() { |
| return stuckCount.get(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public long[] getStuckThreadIds() { |
| List<Long> idList = new ArrayList<>(); |
| for (MonitoredThread monitoredThread : activeThreads.values()) { |
| if (monitoredThread.isMarkedAsStuck()) { |
| idList.add(Long.valueOf(monitoredThread.getThread().getId())); |
| } |
| } |
| |
| long[] result = new long[idList.size()]; |
| for (int i = 0; i < result.length; i++) { |
| result[i] = idList.get(i).longValue(); |
| } |
| return result; |
| } |
| |
| public String[] getStuckThreadNames() { |
| List<String> nameList = new ArrayList<>(); |
| for (MonitoredThread monitoredThread : activeThreads.values()) { |
| if (monitoredThread.isMarkedAsStuck()) { |
| nameList.add(monitoredThread.getThread().getName()); |
| } |
| } |
| return nameList.toArray(new String[0]); |
| } |
| |
| public long getInterruptedThreadsCount() { |
| return interruptedThreadsCount.get(); |
| } |
| |
| |
| private static class MonitoredThread { |
| |
| /** |
| * Reference to the thread to get a stack trace from background task |
| */ |
| private final Thread thread; |
| private final String requestUri; |
| private final long start; |
| private final AtomicInteger state = new AtomicInteger(MonitoredThreadState.RUNNING.ordinal()); |
| /** |
| * Semaphore to synchronize the stuck thread with the background-process thread. It's not used if the |
| * interruption feature is not active. |
| */ |
| private final Semaphore interruptionSemaphore; |
| /** |
| * Set to true after the thread is interrupted. No need to make it volatile since it is accessed right after |
| * acquiring the semaphore. |
| */ |
| private boolean interrupted; |
| |
| MonitoredThread(Thread thread, String requestUri, boolean interruptible) { |
| this.thread = thread; |
| this.requestUri = requestUri; |
| this.start = System.currentTimeMillis(); |
| if (interruptible) { |
| interruptionSemaphore = new Semaphore(1); |
| } else { |
| interruptionSemaphore = null; |
| } |
| } |
| |
| public Thread getThread() { |
| return this.thread; |
| } |
| |
| public String getRequestUri() { |
| return requestUri; |
| } |
| |
| public long getActiveTimeInMillis() { |
| return System.currentTimeMillis() - start; |
| } |
| |
| public Date getStartTime() { |
| return new Date(start); |
| } |
| |
| public boolean markAsStuckIfStillRunning() { |
| return this.state.compareAndSet(MonitoredThreadState.RUNNING.ordinal(), |
| MonitoredThreadState.STUCK.ordinal()); |
| } |
| |
| public MonitoredThreadState markAsDone() { |
| int val = this.state.getAndSet(MonitoredThreadState.DONE.ordinal()); |
| MonitoredThreadState threadState = MonitoredThreadState.values()[val]; |
| |
| if (threadState == MonitoredThreadState.STUCK && interruptionSemaphore != null) { |
| try { |
| // use the semaphore to synchronize with the background thread |
| // which might try to interrupt this current thread. |
| // Otherwise, the current thread might be interrupted after |
| // going out from here, maybe already serving a new request |
| this.interruptionSemaphore.acquire(); |
| } catch (InterruptedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("stuckThreadDetectionValve.interrupted"), e); |
| } |
| } |
| // no need to release the semaphore, it will be GCed |
| } |
| // else the request went through before being marked as stuck, no need |
| // to sync against the semaphore |
| return threadState; |
| } |
| |
| boolean isMarkedAsStuck() { |
| return this.state.get() == MonitoredThreadState.STUCK.ordinal(); |
| } |
| |
| public boolean interruptIfStuck(long interruptThreadThreshold) { |
| if (!isMarkedAsStuck() || interruptionSemaphore == null || !this.interruptionSemaphore.tryAcquire()) { |
| // if the semaphore is already acquired, it means that the |
| // request thread got unstuck before we interrupted it |
| return false; |
| } |
| try { |
| if (log.isWarnEnabled()) { |
| @SuppressWarnings("deprecation") |
| String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadInterrupted", |
| this.getThread().getName(), Long.valueOf(getActiveTimeInMillis()), this.getStartTime(), |
| this.getRequestUri(), Long.valueOf(interruptThreadThreshold), |
| String.valueOf(this.getThread().getId())); |
| Throwable th = new Throwable(); |
| th.setStackTrace(this.getThread().getStackTrace()); |
| log.warn(msg, th); |
| } |
| this.thread.interrupt(); |
| } finally { |
| this.interrupted = true; |
| this.interruptionSemaphore.release(); |
| } |
| return true; |
| } |
| |
| public boolean wasInterrupted() { |
| return interrupted; |
| } |
| } |
| |
| private static class CompletedStuckThread { |
| |
| private final String threadName; |
| private final long threadId; |
| private final long totalActiveTime; |
| |
| @SuppressWarnings("deprecation") |
| CompletedStuckThread(Thread thread, long totalActiveTime) { |
| this.threadName = thread.getName(); |
| this.threadId = thread.getId(); |
| this.totalActiveTime = totalActiveTime; |
| } |
| |
| public String getName() { |
| return this.threadName; |
| } |
| |
| public long getId() { |
| return this.threadId; |
| } |
| |
| public long getTotalActiveTime() { |
| return this.totalActiveTime; |
| } |
| } |
| |
| private enum MonitoredThreadState { |
| RUNNING, |
| STUCK, |
| DONE |
| } |
| } |