blob: 2b49e7b2316fe44e063fbf75db2cb419a3a79188 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.EnumSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestLogInfo {
private static final Path TEST_ROOT_DIR = new Path(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestLogInfo.class.getSimpleName());
private static final String TEST_ATTEMPT_DIR_NAME = "test_app";
private static final String TEST_ENTITY_FILE_NAME = "test_entity";
private static final String TEST_DOMAIN_FILE_NAME = "test_domain";
private static final String TEST_BROKEN_FILE_NAME = "test_broken";
private Configuration config = new YarnConfiguration();
private MiniDFSCluster hdfsCluster;
private FileSystem fs;
private FileContext fc;
private FileContextTestHelper fileContextTestHelper = new FileContextTestHelper("/tmp/TestLogInfo");
private ObjectMapper objMapper;
private JsonFactory jsonFactory = new JsonFactory();
private JsonGenerator jsonGenerator;
private FSDataOutputStream outStream = null;
private FSDataOutputStream outStreamDomain = null;
private TimelineDomain testDomain;
private static final short FILE_LOG_DIR_PERMISSIONS = 0770;
@Before
public void setup() throws Exception {
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
Path testAppDirPath = getTestRootPath(TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS));
objMapper = PluginStoreTestUtils.createObjectMapper();
TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities();
writeEntitiesLeaveOpen(testEntities,
new Path(testAppDirPath, TEST_ENTITY_FILE_NAME));
testDomain = new TimelineDomain();
testDomain.setId("domain_1");
testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName());
testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName());
testDomain.setDescription("description");
writeDomainLeaveOpen(testDomain,
new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME));
writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME));
}
@After
public void tearDown() throws Exception {
jsonGenerator.close();
outStream.close();
outStreamDomain.close();
hdfsCluster.shutdown();
}
@Test
public void testMatchesGroupId() throws Exception {
String testGroupId = "app1_group1";
// Match
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
"app1_group1",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
"test_app1_group1",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
// Unmatch
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
// Check delimiters
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
// Check file names shorter than group id
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
}
@Test
public void testParseEntity() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_ENTITY_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
testLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
// Verify for the first batch
PluginStoreTestUtils.verifyTestEntities(tdm);
// Load new data
TimelineEntity entityNew = PluginStoreTestUtils
.createEntity("id_3", "type_3", 789l, null, null,
null, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
writeEntitiesLeaveOpen(entityList,
new Path(getTestRootPath(TEST_ATTEMPT_DIR_NAME), TEST_ENTITY_FILE_NAME));
testLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
// Verify the newly added data
TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals("Failed to read out entity new",
entityNew.getStartTime(), entity3.getStartTime());
tdm.close();
}
@Test
public void testParseBrokenEntity() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_BROKEN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_BROKEN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
// Try parse, should not fail
testLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
domainLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
tdm.close();
}
@Test
public void testParseDomain() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_DOMAIN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
domainLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
// Verify domain data
TimelineDomain resultDomain = tdm.getDomain("domain_1",
UserGroupInformation.getLoginUser());
assertNotNull(resultDomain);
assertEquals(testDomain.getReaders(), resultDomain.getReaders());
assertEquals(testDomain.getOwner(), resultDomain.getOwner());
assertEquals(testDomain.getDescription(), resultDomain.getDescription());
}
private void writeBrokenFile(Path logPath) throws IOException {
FSDataOutputStream out = null;
try {
String broken = "{ broken { [[]} broken";
out = PluginStoreTestUtils.createLogFile(logPath, fs);
out.write(broken.getBytes(Charset.forName("UTF-8")));
out.close();
out = null;
} finally {
if (out != null) {
out.close();
}
}
}
// TestLogInfo needs to maintain opened hdfs files so we have to build our own
// write methods
private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath)
throws IOException {
if (outStream == null) {
outStream = PluginStoreTestUtils.createLogFile(logPath, fs);
jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
}
for (TimelineEntity entity : entities.getEntities()) {
objMapper.writeValue(jsonGenerator, entity);
}
outStream.hflush();
}
private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath)
throws IOException {
if (outStreamDomain == null) {
outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs);
}
// Write domain uses its own json generator to isolate from entity writers
JsonGenerator jsonGeneratorLocal
= (new JsonFactory()).createJsonGenerator(outStreamDomain);
jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
objMapper.writeValue(jsonGeneratorLocal, domain);
outStreamDomain.hflush();
}
private Path getTestRootPath() {
return fileContextTestHelper.getTestRootPath(fc);
}
private Path getTestRootPath(String pathString) {
return fileContextTestHelper.getTestRootPath(fc, pathString);
}
}