| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; |
| |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.verify; |
| |
| import java.io.File; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; |
| import org.apache.hadoop.yarn.server.nodemanager.DeletionService; |
| import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; |
| import org.apache.hadoop.yarn.util.BuilderUtils; |
| import org.junit.Test; |
| import org.mockito.exceptions.verification.WantedButNotInvoked; |
| |
| public class TestNonAggregatingLogHandler { |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testLogDeletion() { |
| DeletionService delService = mock(DeletionService.class); |
| Configuration conf = new YarnConfiguration(); |
| String user = "testuser"; |
| |
| File[] localLogDirs = new File[2]; |
| localLogDirs[0] = |
| new File("target", this.getClass().getName() + "-localLogDir0") |
| .getAbsoluteFile(); |
| localLogDirs[1] = |
| new File("target", this.getClass().getName() + "-localLogDir1") |
| .getAbsoluteFile(); |
| String localLogDirsString = |
| localLogDirs[0].getAbsolutePath() + "," |
| + localLogDirs[1].getAbsolutePath(); |
| |
| conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); |
| conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); |
| conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); |
| |
| DrainDispatcher dispatcher = createDispatcher(conf); |
| EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); |
| dispatcher.register(ApplicationEventType.class, appEventHandler); |
| |
| LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); |
| dirsHandler.init(conf); |
| |
| ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); |
| ApplicationAttemptId appAttemptId1 = |
| BuilderUtils.newApplicationAttemptId(appId1, 1); |
| ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); |
| |
| NonAggregatingLogHandler logHandler = |
| new NonAggregatingLogHandler(dispatcher, delService, dirsHandler); |
| logHandler.init(conf); |
| logHandler.start(); |
| |
| logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, |
| ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); |
| |
| logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); |
| |
| logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); |
| |
| Path[] localAppLogDirs = new Path[2]; |
| localAppLogDirs[0] = |
| new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); |
| localAppLogDirs[1] = |
| new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); |
| |
| // 5 seconds for the delete which is a separate thread. |
| long verifyStartTime = System.currentTimeMillis(); |
| WantedButNotInvoked notInvokedException = null; |
| boolean matched = false; |
| while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) { |
| try { |
| verify(delService).delete(eq(user), (Path) eq(null), |
| eq(localAppLogDirs[0]), eq(localAppLogDirs[1])); |
| matched = true; |
| } catch (WantedButNotInvoked e) { |
| notInvokedException = e; |
| try { |
| Thread.sleep(50l); |
| } catch (InterruptedException i) { |
| } |
| } |
| } |
| if (!matched) { |
| throw notInvokedException; |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testDelayedDelete() { |
| DeletionService delService = mock(DeletionService.class); |
| Configuration conf = new YarnConfiguration(); |
| String user = "testuser"; |
| |
| File[] localLogDirs = new File[2]; |
| localLogDirs[0] = |
| new File("target", this.getClass().getName() + "-localLogDir0") |
| .getAbsoluteFile(); |
| localLogDirs[1] = |
| new File("target", this.getClass().getName() + "-localLogDir1") |
| .getAbsoluteFile(); |
| String localLogDirsString = |
| localLogDirs[0].getAbsolutePath() + "," |
| + localLogDirs[1].getAbsolutePath(); |
| |
| conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); |
| conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); |
| |
| conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l); |
| |
| DrainDispatcher dispatcher = createDispatcher(conf); |
| EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); |
| dispatcher.register(ApplicationEventType.class, appEventHandler); |
| |
| LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); |
| dirsHandler.init(conf); |
| |
| ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); |
| ApplicationAttemptId appAttemptId1 = |
| BuilderUtils.newApplicationAttemptId(appId1, 1); |
| ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); |
| |
| NonAggregatingLogHandler logHandler = |
| new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService, |
| dirsHandler); |
| logHandler.init(conf); |
| logHandler.start(); |
| |
| logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, |
| ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); |
| |
| logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); |
| |
| logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); |
| |
| Path[] localAppLogDirs = new Path[2]; |
| localAppLogDirs[0] = |
| new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); |
| localAppLogDirs[1] = |
| new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); |
| |
| ScheduledThreadPoolExecutor mockSched = |
| ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched; |
| |
| verify(mockSched).schedule(any(Runnable.class), eq(10800l), |
| eq(TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testStop() throws Exception { |
| NonAggregatingLogHandler aggregatingLogHandler = |
| new NonAggregatingLogHandler(null, null, null); |
| |
| // It should not throw NullPointerException |
| aggregatingLogHandler.stop(); |
| |
| NonAggregatingLogHandlerWithMockExecutor logHandler = |
| new NonAggregatingLogHandlerWithMockExecutor(null, null, null); |
| logHandler.init(new Configuration()); |
| logHandler.stop(); |
| verify(logHandler.mockSched).shutdown(); |
| verify(logHandler.mockSched) |
| .awaitTermination(eq(10l), eq(TimeUnit.SECONDS)); |
| verify(logHandler.mockSched).shutdownNow(); |
| } |
| |
| private class NonAggregatingLogHandlerWithMockExecutor extends |
| NonAggregatingLogHandler { |
| |
| private ScheduledThreadPoolExecutor mockSched; |
| |
| public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher, |
| DeletionService delService, LocalDirsHandlerService dirsHandler) { |
| super(dispatcher, delService, dirsHandler); |
| } |
| |
| @Override |
| ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( |
| Configuration conf) { |
| mockSched = mock(ScheduledThreadPoolExecutor.class); |
| return mockSched; |
| } |
| |
| } |
| |
| private DrainDispatcher createDispatcher(Configuration conf) { |
| DrainDispatcher dispatcher = new DrainDispatcher(); |
| dispatcher.init(conf); |
| dispatcher.start(); |
| return dispatcher; |
| } |
| } |