YARN-10471. Prevent logs for any container from becoming larger than a configurable size. Contributed by Eric Payne
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
index 0207010..26a204e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
@@ -78,4 +78,9 @@
    */
   public static final int KILLED_BY_CONTAINER_SCHEDULER = -108;
 
+  /**
+   * Container was terminated for generating excess log data.
+   */
+  public static final int KILLED_FOR_EXCESS_LOGS = -109;
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ba440e6..f11ebca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1820,6 +1820,25 @@
   public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
       false;
 
+  /** Enable switch for container log monitoring. */
+  public static final String NM_CONTAINER_LOG_MONITOR_ENABLED =
+      NM_PREFIX + "container-log-monitor.enable";
+  public static final boolean DEFAULT_NM_CONTAINER_LOG_MONITOR_ENABLED = false;
+  /** How often to monitor logs generated by containers. */
+  public static final String NM_CONTAINER_LOG_MON_INTERVAL_MS =
+      NM_PREFIX + "container-log-monitor.interval-ms";
+  public static final int DEFAULT_NM_CONTAINER_LOG_MON_INTERVAL_MS = 60000;
+  /** The disk space limit for a single container log directory. */
+  public static final String NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES =
+      NM_PREFIX + "container-log-monitor.dir-size-limit-bytes";
+  public static final long DEFAULT_NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES =
+      1000000000L;
+  /** The disk space limit for all of a container's logs. */
+  public static final String NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
+      NM_PREFIX + "container-log-monitor.total-size-limit-bytes";
+  public static final long DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
+      10000000000L;
+
   /** Enable/disable container metrics. */
   @Private
   public static final String NM_CONTAINER_METRICS_ENABLE =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7236e4f..910c100 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1577,6 +1577,34 @@
   </property>
 
   <property>
+    <description>Flag to enable the container log monitor which enforces
+      container log directory size limits.</description>
+    <name>yarn.nodemanager.container-log-monitor.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>How often to check the usage of a container's log directories
+      in milliseconds</description>
+    <name>yarn.nodemanager.container-log-monitor.interval-ms</name>
+    <value>60000</value>
+  </property>
+
+  <property>
+    <description>The disk space limit, in bytes, for a single
+      container log directory</description>
+    <name>yarn.nodemanager.container-log-monitor.dir-size-limit-bytes</name>
+    <value>1000000000</value>
+  </property>
+
+  <property>
+    <description>The disk space limit, in bytes, for all of a container's
+      logs</description>
+    <name>yarn.nodemanager.container-log-monitor.total-size-limit-bytes</name>
+    <value>10000000000</value>
+  </property>
+
+  <property>
     <description>Class that calculates containers current resource utilization.
     If not set, the value for yarn.nodemanager.resource-calculator.class will
     be used.</description>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index d56ca65..a7bf73f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -29,6 +29,7 @@
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -46,11 +47,14 @@
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 
 import java.util.Arrays;
+import java.io.File;
 import java.util.Map;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +73,10 @@
 
   private long monitoringInterval;
   private MonitoringThread monitoringThread;
+  private int logCheckInterval;
+  private LogMonitorThread logMonitorThread;
+  private long logDirSizeLimit;
+  private long logTotalSizeLimit;
   private CGroupElasticMemoryController oomListenerThread;
   private boolean containerMetricsEnabled;
   private long containerMetricsPeriodMs;
@@ -94,6 +102,7 @@
   private boolean elasticMemoryEnforcement;
   private boolean strictMemoryEnforcement;
   private boolean containersMonitorEnabled;
+  private boolean logMonitorEnabled;
 
   private long maxVCoresAllottedForContainers;
 
@@ -122,6 +131,8 @@
 
     this.monitoringThread = new MonitoringThread();
 
+    this.logMonitorThread = new LogMonitorThread();
+
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
   }
 
@@ -133,6 +144,16 @@
             this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
                 YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));
 
+    this.logCheckInterval =
+        conf.getInt(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MON_INTERVAL_MS);
+    this.logDirSizeLimit =
+        conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES,
+            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES);
+    this.logTotalSizeLimit =
+        conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES,
+            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES);
+
     this.resourceCalculatorPlugin =
         ResourceCalculatorPlugin.getContainersMonitorPlugin(this.conf);
     LOG.info(" Using ResourceCalculatorPlugin : "
@@ -217,6 +238,11 @@
         isContainerMonitorEnabled() && monitoringInterval > 0;
     LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
 
+    logMonitorEnabled =
+            conf.getBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
+                    YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MONITOR_ENABLED);
+    LOG.info("Container Log Monitor Enabled: "+ logMonitorEnabled);
+
     nodeCpuPercentageForYARN =
         NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf);
 
@@ -286,13 +312,16 @@
     if (oomListenerThread != null) {
       oomListenerThread.start();
     }
+    if (logMonitorEnabled) {
+      this.logMonitorThread.start();
+    }
     super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
+    stopped = true;
     if (containersMonitorEnabled) {
-      stopped = true;
       this.monitoringThread.interrupt();
       try {
         this.monitoringThread.join();
@@ -308,6 +337,13 @@
         }
       }
     }
+    if (logMonitorEnabled) {
+      this.logMonitorThread.interrupt();
+      try {
+        this.logMonitorThread.join();
+      } catch (InterruptedException e) {
+      }
+    }
     super.serviceStop();
   }
 
@@ -771,7 +807,8 @@
         }
       }
 
-      if (isMemoryOverLimit.isPresent() && isMemoryOverLimit.get()) {
+      if (isMemoryOverLimit.isPresent() && isMemoryOverLimit.get()
+          && trackingContainers.remove(containerId) != null) {
         // Virtual or physical memory over limit. Fail the container and
         // remove
         // the corresponding process tree
@@ -785,7 +822,6 @@
         eventDispatcher.getEventHandler().handle(
                 new ContainerKillEvent(containerId,
                       containerExitStatus, msg));
-        trackingContainers.remove(containerId);
         LOG.info("Removed ProcessTree with root " + pId);
       }
     }
@@ -853,6 +889,72 @@
     }
   }
 
+  private class LogMonitorThread extends Thread {
+    LogMonitorThread() {
+      super("Container Log Monitor");
+    }
+
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        for (Entry<ContainerId, ProcessTreeInfo> entry :
+            trackingContainers.entrySet()) {
+          ContainerId containerId = entry.getKey();
+          ProcessTreeInfo ptInfo = entry.getValue();
+          Container container = context.getContainers().get(containerId);
+          if (container == null) {
+            continue;
+          }
+          try {
+            List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
+                containerId, container.getUser(), context);
+            long totalLogDataBytes = 0;
+            for (File dir : logDirs) {
+              long currentDirSizeBytes = FileUtil.getDU(dir);
+              totalLogDataBytes += currentDirSizeBytes;
+              String killMsg = null;
+              if (currentDirSizeBytes > logDirSizeLimit) {
+                killMsg = String.format(
+                    "Container [pid=%s,containerID=%s] is logging beyond "
+                        + "the container single log directory limit.%n"
+                        + "Limit: %d Log Directory Size: %d Log Directory: %s"
+                        + "%nKilling container.%n",
+                    ptInfo.getPID(), containerId, logDirSizeLimit,
+                    currentDirSizeBytes, dir);
+              } else if (totalLogDataBytes > logTotalSizeLimit) {
+                killMsg = String.format(
+                    "Container [pid=%s,containerID=%s] is logging beyond "
+                        + "the container total log limit.%n"
+                        + "Limit: %d Total Size: >=%d"
+                        + "%nKilling container.%n",
+                    ptInfo.getPID(), containerId, logTotalSizeLimit,
+                    totalLogDataBytes);
+              }
+              if (killMsg != null
+                  && trackingContainers.remove(containerId) != null) {
+                LOG.warn(killMsg);
+                eventDispatcher.getEventHandler().handle(
+                    new ContainerKillEvent(containerId,
+                        ContainerExitStatus.KILLED_FOR_EXCESS_LOGS, killMsg));
+                LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
+                break;
+              }
+            }
+          } catch (Exception e) {
+            LOG.warn("Uncaught exception in ContainerMemoryManager "
+                + "while monitoring log usage for " + containerId, e);
+          }
+        }
+        try {
+          Thread.sleep(logCheckInterval);
+        } catch (InterruptedException e) {
+          LOG.info("Log monitor thread was interrupted. "
+              + "Stopping container log monitoring.");
+        }
+      }
+    }
+  }
+
   private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
     if (!containerMetricsEnabled || monitoringEvent == null) {
       return;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
index 02f6cea..5bd9eaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
@@ -21,9 +21,13 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -32,12 +36,14 @@
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
 import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -61,12 +67,19 @@
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@@ -76,6 +89,7 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +102,7 @@
   static {
     LOG = LoggerFactory.getLogger(TestContainersMonitor.class);
   }
+
   @Before
   public void setup() throws IOException {
     conf.setClass(
@@ -353,6 +368,164 @@
             .build()));
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContainerKillOnExcessLogDirectory() throws Exception {
+    final String user = "someuser";
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Application app = mock(Application.class);
+    doReturn(user).when(app).getUser();
+    doReturn(appId).when(app).getAppId();
+    Container container = mock(Container.class);
+    doReturn(cid).when(container).getContainerId();
+    doReturn(user).when(container).getUser();
+    File containerLogDir = new File(new File(localLogDir, appId.toString()),
+        cid.toString());
+    containerLogDir.mkdirs();
+    LocalDirsHandlerService mockDirsHandler =
+        mock(LocalDirsHandlerService.class);
+    doReturn(Collections.singletonList(localLogDir.getAbsolutePath()))
+        .when(mockDirsHandler).getLogDirsForRead();
+    Context ctx = new NMContext(context.getContainerTokenSecretManager(),
+        context.getNMTokenSecretManager(), mockDirsHandler,
+        context.getApplicationACLsManager(), context.getNMStateStore(),
+        false, conf);
+
+    Configuration monitorConf = new Configuration(conf);
+    monitorConf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    monitorConf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+    monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
+        false);
+    monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
+        true);
+    monitorConf.setLong(
+        YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES, 10);
+    monitorConf.setLong(
+        YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES, 10000000);
+    monitorConf.setLong(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
+        10);
+
+    EventHandler mockHandler = mock(EventHandler.class);
+    AsyncDispatcher mockDispatcher = mock(AsyncDispatcher.class);
+    doReturn(mockHandler).when(mockDispatcher).getEventHandler();
+    ContainersMonitor monitor = new ContainersMonitorImpl(
+        mock(ContainerExecutor.class), mockDispatcher, ctx);
+    monitor.init(monitorConf);
+    monitor.start();
+    Event event;
+    try {
+      ctx.getApplications().put(appId, app);
+      ctx.getContainers().put(cid, container);
+      monitor.handle(new ContainerStartMonitoringEvent(cid, 1, 1, 1, 0, 0));
+
+      PrintWriter fileWriter = new PrintWriter(new File(containerLogDir,
+          "log"));
+      fileWriter.write("This container is logging too much.");
+      fileWriter.close();
+
+      ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+      verify(mockHandler, timeout(10000)).handle(captor.capture());
+      event = captor.getValue();
+    } finally {
+      monitor.stop();
+    }
+
+    assertTrue("Expected a kill event", event instanceof ContainerKillEvent);
+    ContainerKillEvent cke = (ContainerKillEvent) event;
+    assertEquals("Unexpected container exit status",
+        ContainerExitStatus.KILLED_FOR_EXCESS_LOGS,
+        cke.getContainerExitStatus());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContainerKillOnExcessTotalLogs() throws Exception {
+    final String user = "someuser";
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Application app = mock(Application.class);
+    doReturn(user).when(app).getUser();
+    doReturn(appId).when(app).getAppId();
+    Container container = mock(Container.class);
+    doReturn(cid).when(container).getContainerId();
+    doReturn(user).when(container).getUser();
+    File logDir1 = new File(localLogDir, "dir1");
+    File logDir2 = new File(localLogDir, "dir2");
+    List<String> logDirs = new ArrayList<>();
+    logDirs.add(logDir1.getAbsolutePath());
+    logDirs.add(logDir2.getAbsolutePath());
+    LocalDirsHandlerService mockDirsHandler =
+        mock(LocalDirsHandlerService.class);
+    doReturn(logDirs).when(mockDirsHandler).getLogDirsForRead();
+    Context ctx = new NMContext(context.getContainerTokenSecretManager(),
+        context.getNMTokenSecretManager(), mockDirsHandler,
+        context.getApplicationACLsManager(), context.getNMStateStore(),
+        false, conf);
+
+    File clogDir1 = new File(new File(logDir1, appId.toString()),
+        cid.toString());
+    clogDir1.mkdirs();
+    File clogDir2 = new File(new File(logDir2, appId.toString()),
+        cid.toString());
+    clogDir2.mkdirs();
+
+    Configuration monitorConf = new Configuration(conf);
+    monitorConf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    monitorConf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+    monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
+        false);
+    monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
+        true);
+    monitorConf.setLong(
+        YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES, 100000);
+    monitorConf.setLong(
+        YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES, 15);
+    monitorConf.setLong(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
+        10);
+    monitorConf.set(YarnConfiguration.NM_LOG_DIRS, logDir1.getAbsolutePath()
+        + "," + logDir2.getAbsolutePath());
+
+    EventHandler mockHandler = mock(EventHandler.class);
+    AsyncDispatcher mockDispatcher = mock(AsyncDispatcher.class);
+    doReturn(mockHandler).when(mockDispatcher).getEventHandler();
+    ContainersMonitor monitor = new ContainersMonitorImpl(
+        mock(ContainerExecutor.class), mockDispatcher, ctx);
+    monitor.init(monitorConf);
+    monitor.start();
+    Event event;
+    try {
+      ctx.getApplications().put(appId, app);
+      ctx.getContainers().put(cid, container);
+      monitor.handle(new ContainerStartMonitoringEvent(cid, 1, 1, 1, 0, 0));
+
+      PrintWriter fileWriter = new PrintWriter(new File(clogDir1, "log"));
+      fileWriter.write("0123456789");
+      fileWriter.close();
+
+      Thread.sleep(1000);
+      verify(mockHandler, never()).handle(any(Event.class));
+
+      fileWriter = new PrintWriter(new File(clogDir2, "log"));
+      fileWriter.write("0123456789");
+      fileWriter.close();
+
+      ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+      verify(mockHandler, timeout(10000)).handle(captor.capture());
+      event = captor.getValue();
+    } finally {
+      monitor.stop();
+    }
+
+    assertTrue("Expected a kill event", event instanceof ContainerKillEvent);
+    ContainerKillEvent cke = (ContainerKillEvent) event;
+    assertEquals("Unexpected container exit status",
+        ContainerExitStatus.KILLED_FOR_EXCESS_LOGS,
+        cke.getContainerExitStatus());
+  }
+
   @Test(timeout = 20000)
   public void testContainerMonitorMemFlags() {
     ContainersMonitor cm = null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
index dd7a195..3e23b63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
@@ -146,4 +146,20 @@
 	<property>
 		<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
 		<value>org.apache.hadoop.mapred.ShuffleHandler</value>
-	</property>
\ No newline at end of file
+	</property>
+
+Prevent Container Logs From Getting Too Big
+-------------------------------------------
+
+This allows a cluster admin to configure a cluster such that a task attempt will be killed if any container log exceeds a configured size. This helps prevent logs from filling disks and also prevent the need to aggregate enormous logs.
+
+### Configuration
+
+The following parameters can be used to configure the container log dir sizes.
+
+| Configuration Name | Allowed Values | Description |
+|:---- |:---- |:---- |
+| `yarn.nodemanager.container-log-monitor.enable` | true, false | Flag to enable the container log monitor which enforces container log directory size limits. Default is false. |
+| `yarn.nodemanager.container-log-monitor.interval-ms` | Positive integer | How often to check the usage of a container's log directories in milliseconds. Default is 60000 ms. |
+| `yarn.nodemanager.container-log-monitor.dir-size-limit-bytes` | Long | The disk space limit, in bytes, for a single container log directory. Default is 1000000000. |
+| `yarn.nodemanager.container-log-monitor.total-size-limit-bytes` | Long | The disk space limit, in bytes, for all of a container's logs. The default is 10000000000. |