| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.util; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.log4j.AppenderSkeleton; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.Logger; |
| import org.apache.log4j.spi.LoggingEvent; |
| |
| import java.util.*; |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class Log4jWarningErrorMetricsAppender extends AppenderSkeleton { |
| |
| public static final String LOG_METRICS_APPENDER = "RM_LOG_METRICS_APPENDER"; |
| static final int MAX_MESSAGE_SIZE = 2048; |
| |
| static public class Element { |
| public Long count; |
| public Long timestampSeconds; |
| |
| Element(Long count, Long timestampSeconds) { |
| this.count = count; |
| this.timestampSeconds = timestampSeconds; |
| } |
| } |
| |
| static class PurgeElement implements Comparable<PurgeElement> { |
| String message; |
| Long timestamp; |
| |
| PurgeElement(String message, Long timestamp) { |
| this.message = message; |
| this.timestamp = timestamp; |
| } |
| |
| public int compareTo(PurgeElement e) { |
| if (e == null) { |
| throw new NullPointerException("Null element passed to compareTo"); |
| } |
| int ret = this.timestamp.compareTo(e.timestamp); |
| if (ret != 0) { |
| return ret; |
| } |
| return this.message.compareTo(e.message); |
| } |
| |
| @Override |
| public boolean equals(Object e) { |
| if (!(e instanceof PurgeElement)) { |
| return false; |
| } |
| if (e == this) { |
| return true; |
| } |
| PurgeElement el = (PurgeElement) e; |
| return (this.message.equals(el.message)) |
| && (this.timestamp.equals(el.timestamp)); |
| } |
| |
| @Override |
| public int hashCode() { |
| return this.timestamp.hashCode(); |
| } |
| } |
| |
| Map<String, SortedMap<Long, Integer>> errors; |
| Map<String, SortedMap<Long, Integer>> warnings; |
| SortedMap<Long, Integer> errorsTimestampCount; |
| SortedMap<Long, Integer> warningsTimestampCount; |
| SortedSet<PurgeElement> errorsPurgeInformation; |
| SortedSet<PurgeElement> warningsPurgeInformation; |
| |
| Timer cleanupTimer; |
| long cleanupInterval; |
| long messageAgeLimitSeconds; |
| int maxUniqueMessages; |
| |
| final Object lock = new Object(); |
| |
| /** |
| * Create an appender to keep track of the errors and warnings logged by the |
| * system. It will keep purge messages older than 2 days. It will store upto |
| * the last 500 unique errors and the last 500 unique warnings. The thread to |
| * purge message will run every 5 minutes, unless the 500 message limit is hit |
| * earlier. |
| */ |
| public Log4jWarningErrorMetricsAppender() { |
| this(5 * 60, 24 * 60 * 60, 250); |
| } |
| |
| /** |
| * Create an appender to keep track of the errors and warnings logged by the |
| * system. |
| * |
| * @param cleanupIntervalSeconds |
| * the interval at which old messages are purged to prevent the |
| * message stores from growing unbounded |
| * @param messageAgeLimitSeconds |
| * the maximum age of a message in seconds before it is purged from |
| * the store |
| * @param maxUniqueMessages |
| * the maximum number of unique messages of each type we keep before |
| * we start purging |
| */ |
| public Log4jWarningErrorMetricsAppender(int cleanupIntervalSeconds, |
| long messageAgeLimitSeconds, int maxUniqueMessages) { |
| super(); |
| errors = new HashMap<>(); |
| warnings = new HashMap<>(); |
| errorsTimestampCount = new TreeMap<>(); |
| warningsTimestampCount = new TreeMap<>(); |
| errorsPurgeInformation = new TreeSet<>(); |
| warningsPurgeInformation = new TreeSet<>(); |
| |
| cleanupTimer = new Timer(); |
| cleanupInterval = cleanupIntervalSeconds * 1000; |
| cleanupTimer.schedule(new ErrorAndWarningsCleanup(), cleanupInterval); |
| this.messageAgeLimitSeconds = messageAgeLimitSeconds; |
| this.maxUniqueMessages = maxUniqueMessages; |
| this.setName(LOG_METRICS_APPENDER); |
| this.setThreshold(Level.WARN); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| protected void append(LoggingEvent event) { |
| String message = event.getRenderedMessage(); |
| String[] throwableStr = event.getThrowableStrRep(); |
| if (throwableStr != null) { |
| message = message + "\n" + StringUtils.join("\n", throwableStr); |
| message = |
| org.apache.commons.lang3.StringUtils.left(message, MAX_MESSAGE_SIZE); |
| } |
| int level = event.getLevel().toInt(); |
| |
| if (level == Level.WARN_INT || level == Level.ERROR_INT) { |
| // store second level information |
| Long eventTimeSeconds = event.getTimeStamp() / 1000; |
| Map<String, SortedMap<Long, Integer>> map; |
| SortedMap<Long, Integer> timestampsCount; |
| SortedSet<PurgeElement> purgeInformation; |
| if (level == Level.WARN_INT) { |
| map = warnings; |
| timestampsCount = warningsTimestampCount; |
| purgeInformation = warningsPurgeInformation; |
| } else { |
| map = errors; |
| timestampsCount = errorsTimestampCount; |
| purgeInformation = errorsPurgeInformation; |
| } |
| updateMessageDetails(message, eventTimeSeconds, map, timestampsCount, |
| purgeInformation); |
| } |
| } |
| |
| private void updateMessageDetails(String message, Long eventTimeSeconds, |
| Map<String, SortedMap<Long, Integer>> map, |
| SortedMap<Long, Integer> timestampsCount, |
| SortedSet<PurgeElement> purgeInformation) { |
| synchronized (lock) { |
| if (map.containsKey(message)) { |
| SortedMap<Long, Integer> tmp = map.get(message); |
| Long lastMessageTime = tmp.lastKey(); |
| int value = 1; |
| if (tmp.containsKey(eventTimeSeconds)) { |
| value = tmp.get(eventTimeSeconds) + 1; |
| } |
| tmp.put(eventTimeSeconds, value); |
| purgeInformation.remove(new PurgeElement(message, lastMessageTime)); |
| } else { |
| SortedMap<Long, Integer> value = new TreeMap<>(); |
| value.put(eventTimeSeconds, 1); |
| map.put(message, value); |
| if (map.size() > maxUniqueMessages * 2) { |
| cleanupTimer.cancel(); |
| cleanupTimer = new Timer(); |
| cleanupTimer.schedule(new ErrorAndWarningsCleanup(), 0); |
| } |
| } |
| purgeInformation.add(new PurgeElement(message, eventTimeSeconds)); |
| int newValue = 1; |
| if (timestampsCount.containsKey(eventTimeSeconds)) { |
| newValue = timestampsCount.get(eventTimeSeconds) + 1; |
| } |
| timestampsCount.put(eventTimeSeconds, newValue); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void close() { |
| cleanupTimer.cancel(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean requiresLayout() { |
| return false; |
| } |
| |
| /** |
| * Get the counts of errors in the time periods provided. Note that the counts |
| * provided by this function may differ from the ones provided by |
| * getErrorMessagesAndCounts since the message store is purged at regular |
| * intervals to prevent it from growing without bounds, while the store for |
| * the counts is purged less frequently. |
| * |
| * @param cutoffs |
| * list of timestamp cutoffs(in seconds) for which the counts are |
| * desired |
| * @return list of error counts in the time periods corresponding to cutoffs |
| */ |
| public List<Integer> getErrorCounts(List<Long> cutoffs) { |
| return this.getCounts(errorsTimestampCount, cutoffs); |
| } |
| |
| /** |
| * Get the counts of warnings in the time periods provided. Note that the |
| * counts provided by this function may differ from the ones provided by |
| * getWarningMessagesAndCounts since the message store is purged at regular |
| * intervals to prevent it from growing without bounds, while the store for |
| * the counts is purged less frequently. |
| * |
| * @param cutoffs |
| * list of timestamp cutoffs(in seconds) for which the counts are |
| * desired |
| * @return list of warning counts in the time periods corresponding to cutoffs |
| */ |
| public List<Integer> getWarningCounts(List<Long> cutoffs) { |
| return this.getCounts(warningsTimestampCount, cutoffs); |
| } |
| |
| private List<Integer> getCounts(SortedMap<Long, Integer> map, |
| List<Long> cutoffs) { |
| List<Integer> ret = new ArrayList<>(); |
| Long largestCutoff = Collections.min(cutoffs); |
| for (int i = 0; i < cutoffs.size(); ++i) { |
| ret.add(0); |
| } |
| synchronized (lock) { |
| Map<Long, Integer> submap = map.tailMap(largestCutoff); |
| for (Map.Entry<Long, Integer> entry : submap.entrySet()) { |
| for (int i = 0; i < cutoffs.size(); ++i) { |
| if (entry.getKey() >= cutoffs.get(i)) { |
| int tmp = ret.get(i); |
| ret.set(i, tmp + entry.getValue()); |
| } |
| } |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Get the errors and the number of occurrences for each of the errors for the |
| * time cutoffs provided. Note that the counts provided by this function may |
| * differ from the ones provided by getErrorCounts since the message store is |
| * purged at regular intervals to prevent it from growing without bounds, |
| * while the store for the counts is purged less frequently. |
| * |
| * @param cutoffs |
| * list of timestamp cutoffs(in seconds) for which the counts are |
| * desired |
| * @return list of maps corresponding for each cutoff provided; each map |
| * contains the error and the number of times the error occurred in |
| * the time period |
| */ |
| public List<Map<String, Element>> |
| getErrorMessagesAndCounts(List<Long> cutoffs) { |
| return this.getElementsAndCounts(errors, cutoffs, errorsPurgeInformation); |
| } |
| |
| /** |
| * Get the warning and the number of occurrences for each of the warnings for |
| * the time cutoffs provided. Note that the counts provided by this function |
| * may differ from the ones provided by getWarningCounts since the message |
| * store is purged at regular intervals to prevent it from growing without |
| * bounds, while the store for the counts is purged less frequently. |
| * |
| * @param cutoffs |
| * list of timestamp cutoffs(in seconds) for which the counts are |
| * desired |
| * @return list of maps corresponding for each cutoff provided; each map |
| * contains the warning and the number of times the error occurred in |
| * the time period |
| */ |
| public List<Map<String, Element>> getWarningMessagesAndCounts( |
| List<Long> cutoffs) { |
| return this.getElementsAndCounts(warnings, cutoffs, warningsPurgeInformation); |
| } |
| |
| private List<Map<String, Element>> getElementsAndCounts( |
| Map<String, SortedMap<Long, Integer>> map, List<Long> cutoffs, |
| SortedSet<PurgeElement> purgeInformation) { |
| if (purgeInformation.size() > maxUniqueMessages) { |
| ErrorAndWarningsCleanup cleanup = new ErrorAndWarningsCleanup(); |
| long cutoff = Time.now() - (messageAgeLimitSeconds * 1000); |
| cutoff = (cutoff / 1000); |
| cleanup.cleanupMessages(map, purgeInformation, cutoff, maxUniqueMessages); |
| } |
| List<Map<String, Element>> ret = new ArrayList<>(cutoffs.size()); |
| for (int i = 0; i < cutoffs.size(); ++i) { |
| ret.add(new HashMap<String, Element>()); |
| } |
| synchronized (lock) { |
| for (Map.Entry<String, SortedMap<Long, Integer>> element : map.entrySet()) { |
| for (int i = 0; i < cutoffs.size(); ++i) { |
| Map<String, Element> retMap = ret.get(i); |
| SortedMap<Long, Integer> qualifyingTimes = |
| element.getValue().tailMap(cutoffs.get(i)); |
| long count = 0; |
| for (Map.Entry<Long, Integer> entry : qualifyingTimes.entrySet()) { |
| count += entry.getValue(); |
| } |
| if (!qualifyingTimes.isEmpty()) { |
| retMap.put(element.getKey(), |
| new Element(count, qualifyingTimes.lastKey())); |
| } |
| } |
| } |
| } |
| return ret; |
| } |
| |
| // getters and setters for log4j |
| public long getCleanupInterval() { |
| return cleanupInterval; |
| } |
| |
| public void setCleanupInterval(long cleanupInterval) { |
| this.cleanupInterval = cleanupInterval; |
| } |
| |
| public long getMessageAgeLimitSeconds() { |
| return messageAgeLimitSeconds; |
| } |
| |
| public void setMessageAgeLimitSeconds(long messageAgeLimitSeconds) { |
| this.messageAgeLimitSeconds = messageAgeLimitSeconds; |
| } |
| |
| public int getMaxUniqueMessages() { |
| return maxUniqueMessages; |
| } |
| |
| public void setMaxUniqueMessages(int maxUniqueMessages) { |
| this.maxUniqueMessages = maxUniqueMessages; |
| } |
| |
| class ErrorAndWarningsCleanup extends TimerTask { |
| |
| @Override |
| public void run() { |
| long cutoff = Time.now() - (messageAgeLimitSeconds * 1000); |
| cutoff = (cutoff / 1000); |
| cleanupMessages(errors, errorsPurgeInformation, cutoff, maxUniqueMessages); |
| cleanupMessages(warnings, warningsPurgeInformation, cutoff, |
| maxUniqueMessages); |
| cleanupCounts(errorsTimestampCount, cutoff); |
| cleanupCounts(warningsTimestampCount, cutoff); |
| try { |
| cleanupTimer.schedule(new ErrorAndWarningsCleanup(), cleanupInterval); |
| } catch (IllegalStateException ie) { |
| // don't do anything since new timer is already scheduled |
| } |
| } |
| |
| void cleanupMessages(Map<String, SortedMap<Long, Integer>> map, |
| SortedSet<PurgeElement> purgeInformation, long cutoff, |
| int mapTargetSize) { |
| |
| PurgeElement el = new PurgeElement("", cutoff); |
| synchronized (lock) { |
| SortedSet<PurgeElement> removeSet = purgeInformation.headSet(el); |
| Iterator<PurgeElement> it = removeSet.iterator(); |
| while (it.hasNext()) { |
| PurgeElement p = it.next(); |
| map.remove(p.message); |
| it.remove(); |
| } |
| |
| // don't keep more mapTargetSize keys |
| if (purgeInformation.size() > mapTargetSize) { |
| Object[] array = purgeInformation.toArray(); |
| int cutoffIndex = purgeInformation.size() - mapTargetSize; |
| for (int i = 0; i < cutoffIndex; ++i) { |
| PurgeElement p = (PurgeElement) array[i]; |
| map.remove(p.message); |
| purgeInformation.remove(p); |
| } |
| } |
| } |
| } |
| |
| void cleanupCounts(SortedMap<Long, Integer> map, long cutoff) { |
| synchronized (lock) { |
| Iterator<Map.Entry<Long, Integer>> it = map.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<Long, Integer> element = it.next(); |
| if (element.getKey() < cutoff) { |
| it.remove(); |
| } |
| } |
| } |
| } |
| } |
| |
| // helper function |
| public static Log4jWarningErrorMetricsAppender findAppender() { |
| Enumeration appenders = Logger.getRootLogger().getAllAppenders(); |
| while(appenders.hasMoreElements()) { |
| Object obj = appenders.nextElement(); |
| if(obj instanceof Log4jWarningErrorMetricsAppender) { |
| return (Log4jWarningErrorMetricsAppender) obj; |
| } |
| } |
| return null; |
| } |
| } |