| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.util; |
| |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.slf4j.Logger; |
| |
| /** |
| * This is a debugging class that can be used by callers to track |
| * whether a specific lock is being held for too long and periodically |
| * log a warning and stack trace, if so. |
| * |
| * The logged warnings are throttled so that logs are not spammed. |
| * |
| * A new instance of InstrumentedLock can be created for each object |
| * that needs to be instrumented. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class InstrumentedLock implements Lock { |
| |
| private final Lock lock; |
| private final Logger logger; |
| private final String name; |
| private final Timer clock; |
| |
| /** Minimum gap between two lock warnings. */ |
| private final long minLoggingGap; |
| /** Threshold for detecting long lock held time. */ |
| private final long lockWarningThreshold; |
| |
| // Tracking counters for lock statistics. |
| private volatile long lockAcquireTimestamp; |
| private final AtomicLong lastHoldLogTimestamp; |
| private final AtomicLong lastWaitLogTimestamp; |
| private final SuppressedStats holdStats = new SuppressedStats(); |
| private final SuppressedStats waitStats = new SuppressedStats(); |
| |
| /** |
| * Create a instrumented lock instance which logs a warning message |
| * when lock held time is above given threshold. |
| * |
| * @param name the identifier of the lock object |
| * @param logger this class does not have its own logger, will log to the |
| * given logger instead |
| * @param minLoggingGapMs the minimum time gap between two log messages, |
| * this is to avoid spamming to many logs |
| * @param lockWarningThresholdMs the time threshold to view lock held |
| * time as being "too long" |
| */ |
| public InstrumentedLock(String name, Logger logger, long minLoggingGapMs, |
| long lockWarningThresholdMs) { |
| this(name, logger, new ReentrantLock(), |
| minLoggingGapMs, lockWarningThresholdMs); |
| } |
| |
| public InstrumentedLock(String name, Logger logger, Lock lock, |
| long minLoggingGapMs, long lockWarningThresholdMs) { |
| this(name, logger, lock, |
| minLoggingGapMs, lockWarningThresholdMs, new Timer()); |
| } |
| |
| @VisibleForTesting |
| InstrumentedLock(String name, Logger logger, Lock lock, |
| long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { |
| this.name = name; |
| this.lock = lock; |
| this.clock = clock; |
| this.logger = logger; |
| minLoggingGap = minLoggingGapMs; |
| lockWarningThreshold = lockWarningThresholdMs; |
| lastHoldLogTimestamp = new AtomicLong( |
| clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold)); |
| lastWaitLogTimestamp = new AtomicLong(lastHoldLogTimestamp.get()); |
| } |
| |
| @Override |
| public void lock() { |
| long waitStart = clock.monotonicNow(); |
| lock.lock(); |
| check(waitStart, clock.monotonicNow(), false); |
| startLockTiming(); |
| } |
| |
| @Override |
| public void lockInterruptibly() throws InterruptedException { |
| long waitStart = clock.monotonicNow(); |
| lock.lockInterruptibly(); |
| check(waitStart, clock.monotonicNow(), false); |
| startLockTiming(); |
| } |
| |
| @Override |
| public boolean tryLock() { |
| if (lock.tryLock()) { |
| startLockTiming(); |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { |
| long waitStart = clock.monotonicNow(); |
| boolean retval = false; |
| if (lock.tryLock(time, unit)) { |
| startLockTiming(); |
| retval = true; |
| } |
| check(waitStart, clock.monotonicNow(), false); |
| return retval; |
| } |
| |
| @Override |
| public void unlock() { |
| long localLockReleaseTime = clock.monotonicNow(); |
| long localLockAcquireTime = lockAcquireTimestamp; |
| lock.unlock(); |
| check(localLockAcquireTime, localLockReleaseTime, true); |
| } |
| |
| @Override |
| public Condition newCondition() { |
| return lock.newCondition(); |
| } |
| |
| @VisibleForTesting |
| void logWarning(long lockHeldTime, SuppressedSnapshot stats) { |
| logger.warn(String.format("Lock held time above threshold(%d ms): " + |
| "lock identifier: %s " + |
| "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " + |
| "Longest suppressed LockHeldTimeMs=%d. " + |
| "The stack trace is: %s" , |
| lockWarningThreshold, name, lockHeldTime, stats.getSuppressedCount(), |
| stats.getMaxSuppressedWait(), |
| StringUtils.getStackTrace(Thread.currentThread()))); |
| } |
| |
| @VisibleForTesting |
| void logWaitWarning(long lockWaitTime, SuppressedSnapshot stats) { |
| logger.warn(String.format("Waited above threshold(%d ms) to acquire lock: " + |
| "lock identifier: %s " + |
| "waitTimeMs=%d ms. Suppressed %d lock wait warnings. " + |
| "Longest suppressed WaitTimeMs=%d. " + |
| "The stack trace is: %s", lockWarningThreshold, name, lockWaitTime, |
| stats.getSuppressedCount(), stats.getMaxSuppressedWait(), |
| StringUtils.getStackTrace(Thread.currentThread()))); |
| } |
| |
| /** |
| * Starts timing for the instrumented lock. |
| */ |
| protected void startLockTiming() { |
| lockAcquireTimestamp = clock.monotonicNow(); |
| } |
| |
| /** |
| * Log a warning if the lock was held for too long. |
| * |
| * Should be invoked by the caller immediately AFTER releasing the lock. |
| * |
| * @param acquireTime - timestamp just after acquiring the lock. |
| * @param releaseTime - timestamp just before releasing the lock. |
| * @param checkLockHeld checkLockHeld. |
| */ |
| protected void check(long acquireTime, long releaseTime, |
| boolean checkLockHeld) { |
| if (!logger.isWarnEnabled()) { |
| return; |
| } |
| |
| final long lockHeldTime = releaseTime - acquireTime; |
| if (lockWarningThreshold - lockHeldTime < 0) { |
| AtomicLong lastLogTime; |
| SuppressedStats stats; |
| if (checkLockHeld) { |
| lastLogTime = lastHoldLogTimestamp; |
| stats = holdStats; |
| } else { |
| lastLogTime = lastWaitLogTimestamp; |
| stats = waitStats; |
| } |
| long now; |
| long localLastLogTs; |
| do { |
| now = clock.monotonicNow(); |
| localLastLogTs = lastLogTime.get(); |
| long deltaSinceLastLog = now - localLastLogTs; |
| // check should print log or not |
| if (deltaSinceLastLog - minLoggingGap < 0) { |
| stats.incrementSuppressed(lockHeldTime); |
| return; |
| } |
| } while (!lastLogTime.compareAndSet(localLastLogTs, now)); |
| SuppressedSnapshot statsSnapshot = stats.snapshot(); |
| if (checkLockHeld) { |
| logWarning(lockHeldTime, statsSnapshot); |
| } else { |
| logWaitWarning(lockHeldTime, statsSnapshot); |
| } |
| } |
| } |
| |
| protected Lock getLock() { |
| return lock; |
| } |
| |
| protected Timer getTimer() { |
| return clock; |
| } |
| |
| /** |
| * Internal class to track statistics about suppressed log messages in an |
| * atomic way. |
| */ |
| private static class SuppressedStats { |
| private long suppressedCount = 0; |
| private long maxSuppressedWait = 0; |
| |
| /** |
| * Increments the suppressed counter and increases the max wait time if the |
| * passed wait is greater than the current maxSuppressedWait. |
| * @param wait The wait time for this suppressed message |
| */ |
| synchronized public void incrementSuppressed(long wait) { |
| suppressedCount++; |
| if (wait > maxSuppressedWait) { |
| maxSuppressedWait = wait; |
| } |
| } |
| |
| /** |
| * Captures the current value of the counts into a SuppressedSnapshot object |
| * and resets the values to zero. |
| * |
| * @return SuppressedSnapshot containing the current value of the counters |
| */ |
| synchronized public SuppressedSnapshot snapshot() { |
| SuppressedSnapshot snap = |
| new SuppressedSnapshot(suppressedCount, maxSuppressedWait); |
| suppressedCount = 0; |
| maxSuppressedWait = 0; |
| return snap; |
| } |
| } |
| |
| /** |
| * Immutable class to capture a snapshot of suppressed log message stats. |
| */ |
| protected static class SuppressedSnapshot { |
| private long suppressedCount = 0; |
| private long maxSuppressedWait = 0; |
| |
| public SuppressedSnapshot(long suppressedCount, long maxWait) { |
| this.suppressedCount = suppressedCount; |
| this.maxSuppressedWait = maxWait; |
| } |
| |
| public long getMaxSuppressedWait() { |
| return maxSuppressedWait; |
| } |
| |
| public long getSuppressedCount() { |
| return suppressedCount; |
| } |
| } |
| } |