| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.logaggregation; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.PrintWriter; |
| import java.io.Writer; |
| import java.net.URL; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.servlet.http.HttpServletRequest; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; |
| import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; |
| import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; |
| import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; |
| import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; |
| import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.TFileAggregatedLogsBlock; |
| import org.apache.hadoop.yarn.webapp.YarnWebParams; |
| import org.apache.hadoop.yarn.webapp.View.ViewContext; |
| import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest; |
| import org.apache.hadoop.yarn.webapp.view.BlockForTest; |
| import org.apache.hadoop.yarn.webapp.view.HtmlBlock; |
| import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest; |
| import org.junit.Test; |
| |
| import static org.mockito.Mockito.*; |
| |
| import com.google.inject.Inject; |
| |
| import static org.junit.Assert.*; |
| |
| /** |
| * Test AggregatedLogsBlock. AggregatedLogsBlock should check user, aggregate a |
| * logs into one file and show this logs or errors into html code |
| * |
| */ |
| public class TestAggregatedLogsBlock { |
| /** |
| * Bad user. User 'owner' is trying to read logs without access |
| */ |
| @Test |
| public void testAccessDenied() throws Exception { |
| |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); |
| |
| writeLog(configuration, "owner"); |
| |
| |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| TFileAggregatedLogsBlockForTest aggregatedBlock |
| = getTFileAggregatedLogsBlockForTest(configuration, "owner", |
| "container_0_0001_01_000001", "localhost:1234"); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out |
| .contains("User [owner] is not authorized to view the logs for entity")); |
| |
| } |
| |
| @Test |
| public void testBlockContainsPortNumForUnavailableAppLog() { |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| String nodeName = configuration.get(YarnConfiguration.NM_WEBAPP_ADDRESS, |
| YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS); |
| AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( |
| configuration, "admin", "container_0_0001_01_000001", nodeName); |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out.contains(nodeName)); |
| } |
| |
| /** |
| * try to read bad logs |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testBadLogs() throws Exception { |
| |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); |
| |
| writeLog(configuration, "owner"); |
| |
| AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( |
| configuration, "admin", "container_0_0001_01_000001"); |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out |
| .contains("Logs not available for entity. Aggregation may not be complete, Check back later or try the nodemanager at localhost:1234")); |
| |
| } |
| |
| /** |
| * Reading from logs should succeed and they should be shown in the |
| * AggregatedLogsBlock html. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testAggregatedLogsBlock() throws Exception { |
| |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); |
| |
| writeLog(configuration, "admin"); |
| |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| TFileAggregatedLogsBlockForTest aggregatedBlock |
| = getTFileAggregatedLogsBlockForTest(configuration, "admin", |
| "container_0_0001_01_000001", "localhost:1234"); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out.contains("test log1")); |
| assertTrue(out.contains("test log2")); |
| assertTrue(out.contains("test log3")); |
| } |
| |
| /** |
| * Reading from logs should succeed (from a HAR archive) and they should be |
| * shown in the AggregatedLogsBlock html. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testAggregatedLogsBlockHar() throws Exception { |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| URL harUrl = ClassLoader.getSystemClassLoader() |
| .getResource("application_1440536969523_0001.har"); |
| assertNotNull(harUrl); |
| String path = "target/logs/admin/logs/application_1440536969523_0001" + |
| "/application_1440536969523_0001.har"; |
| FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path)); |
| |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| TFileAggregatedLogsBlockForTest aggregatedBlock |
| = getTFileAggregatedLogsBlockForTest(configuration, "admin", |
| "container_1440536969523_0001_01_000001", "host1:1111"); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out.contains("Hello stderr")); |
| assertTrue(out.contains("Hello stdout")); |
| assertTrue(out.contains("Hello syslog")); |
| |
| aggregatedBlock = getTFileAggregatedLogsBlockForTest( |
| configuration, "admin", |
| "container_1440536969523_0001_01_000002", "host2:2222"); |
| data = new ByteArrayOutputStream(); |
| printWriter = new PrintWriter(data); |
| html = new HtmlBlockForTest(); |
| block = new BlockForTest(html, printWriter, 10, false); |
| aggregatedBlock.render(block); |
| block.getWriter().flush(); |
| out = data.toString(); |
| assertTrue(out.contains("Goodbye stderr")); |
| assertTrue(out.contains("Goodbye stdout")); |
| assertTrue(out.contains("Goodbye syslog")); |
| } |
| |
| /** |
| * Log files was deleted. |
| * @throws Exception |
| */ |
| @Test |
| public void testNoLogs() throws Exception { |
| |
| FileUtil.fullyDelete(new File("target/logs")); |
| Configuration configuration = getConfiguration(); |
| |
| File f = new File("target/logs/logs/application_0_0001/container_0_0001_01_000001"); |
| if (!f.exists()) { |
| assertTrue(f.mkdirs()); |
| } |
| writeLog(configuration, "admin"); |
| |
| ByteArrayOutputStream data = new ByteArrayOutputStream(); |
| PrintWriter printWriter = new PrintWriter(data); |
| HtmlBlock html = new HtmlBlockForTest(); |
| HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); |
| TFileAggregatedLogsBlockForTest aggregatedBlock |
| = getTFileAggregatedLogsBlockForTest(configuration, "admin", |
| "container_0_0001_01_000001", "localhost:1234"); |
| aggregatedBlock.render(block); |
| |
| block.getWriter().flush(); |
| String out = data.toString(); |
| assertTrue(out.contains("No logs available for container container_0_0001_01_000001")); |
| |
| } |
| |
| private Configuration getConfiguration() { |
| Configuration configuration = new YarnConfiguration(); |
| configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); |
| configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs"); |
| configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); |
| configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); |
| return configuration; |
| } |
| |
| private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( |
| Configuration configuration, String user, String containerId) { |
| return getAggregatedLogsBlockForTest(configuration, user, containerId, |
| "localhost:1234"); |
| } |
| |
| private TFileAggregatedLogsBlockForTest getTFileAggregatedLogsBlockForTest( |
| Configuration configuration, String user, String containerId, |
| String nodeName) { |
| HttpServletRequest request = mock(HttpServletRequest.class); |
| when(request.getRemoteUser()).thenReturn(user); |
| ViewContext mockContext = mock(ViewContext.class); |
| TFileAggregatedLogsBlockForTest aggregatedBlock |
| = new TFileAggregatedLogsBlockForTest(mockContext, |
| configuration); |
| aggregatedBlock.setRequest(request); |
| aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID, |
| containerId); |
| aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, |
| nodeName); |
| aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user); |
| aggregatedBlock.moreParams().put("start", ""); |
| aggregatedBlock.moreParams().put("end", ""); |
| aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity"); |
| return aggregatedBlock; |
| } |
| |
| |
| private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( |
| Configuration configuration, String user, String containerId, |
| String nodeName) { |
| HttpServletRequest request = mock(HttpServletRequest.class); |
| when(request.getRemoteUser()).thenReturn(user); |
| AggregatedLogsBlockForTest aggregatedBlock = new AggregatedLogsBlockForTest( |
| configuration); |
| aggregatedBlock.setRequest(request); |
| aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID, containerId); |
| aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, nodeName); |
| aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user); |
| aggregatedBlock.moreParams().put("start", ""); |
| aggregatedBlock.moreParams().put("end", ""); |
| aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity"); |
| return aggregatedBlock; |
| } |
| |
| private void writeLog(Configuration configuration, String user) |
| throws Exception { |
| ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1); |
| ApplicationAttemptId appAttemptId = ApplicationAttemptIdPBImpl.newInstance(appId, 1); |
| ContainerId containerId = ContainerIdPBImpl.newContainerId(appAttemptId, 1); |
| |
| String path = "target/logs/" + user |
| + "/logs/application_0_0001/localhost_1234"; |
| File f = new File(path); |
| if (!f.getParentFile().exists()) { |
| assertTrue(f.getParentFile().mkdirs()); |
| } |
| List<String> rootLogDirs = Arrays.asList("target/logs/logs"); |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| |
| LogAggregationFileControllerFactory factory |
| = new LogAggregationFileControllerFactory(configuration); |
| LogAggregationFileController fileController = factory |
| .getFileControllerForWrite(); |
| try { |
| Map<ApplicationAccessType, String> appAcls = new HashMap<>(); |
| appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); |
| NodeId nodeId = NodeId.newInstance("localhost", 1234); |
| LogAggregationFileControllerContext context |
| = new LogAggregationFileControllerContext( |
| new Path(path), new Path(path), false, 3600, |
| appId, appAcls, nodeId, ugi); |
| fileController.initializeWriter(context); |
| fileController.write( |
| new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), |
| new AggregatedLogFormat.LogValue(rootLogDirs, containerId, |
| UserGroupInformation.getCurrentUser().getShortUserName())); |
| } finally { |
| fileController.closeWriter(); |
| } |
| } |
| |
| private void writeLogs(String dirName) throws Exception { |
| File f = new File(dirName + File.separator + "log1"); |
| if (!f.getParentFile().exists()) { |
| assertTrue(f.getParentFile().mkdirs()); |
| } |
| |
| writeLog(dirName + File.separator + "log1", "test log1"); |
| writeLog(dirName + File.separator + "log2", "test log2"); |
| writeLog(dirName + File.separator + "log3", "test log3"); |
| } |
| |
| private void writeLog(String fileName, String text) throws Exception { |
| File f = new File(fileName); |
| Writer writer = new FileWriter(f); |
| writer.write(text); |
| writer.flush(); |
| writer.close(); |
| } |
| |
| private static class TFileAggregatedLogsBlockForTest |
| extends TFileAggregatedLogsBlock { |
| |
| private Map<String, String> params = new HashMap<String, String>(); |
| private HttpServletRequest request; |
| |
| @Inject |
| TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) { |
| super(ctx, conf); |
| } |
| |
| public void render(Block html) { |
| super.render(html); |
| } |
| |
| @Override |
| public Map<String, String> moreParams() { |
| return params; |
| } |
| |
| public HttpServletRequest request() { |
| return request; |
| } |
| |
| public void setRequest(HttpServletRequest request) { |
| this.request = request; |
| } |
| } |
| } |