blob: 956e9e9df739810a18f93f0048435465bb89c469 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
/** Test class to verify RollingLevelDBTimelineStore. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
private Configuration config = new YarnConfiguration();
@Before
public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext();
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new RollingLevelDBTimelineStore();
store.init(config);
store.start();
loadTestEntityData();
loadVerificationEntityData();
loadTestDomainData();
}
@After
public void tearDown() throws Exception {
store.stop();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
@Test
public void testRootDirPermission() throws IOException {
FileSystem fs = FileSystem.getLocal(new YarnConfiguration());
FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(),
RollingLevelDBTimelineStore.FILENAME));
assertNotNull(file);
assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK,
file.getPermission());
}
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
((RollingLevelDBTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
loadTestEntityData();
}
@Test
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithFromId() throws IOException {
super.testGetEntitiesWithFromId();
}
@Test
public void testGetEntitiesWithFromTs() throws IOException {
// feature not supported
}
@Test
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() throws IOException {
super.testGetEvents();
}
@Test
public void testCacheSizes() {
Configuration conf = new Configuration();
assertEquals(10000,
RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
assertEquals(10000,
RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
10001);
assertEquals(10001,
RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
conf = new Configuration();
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
10002);
assertEquals(10002,
RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
}
@Test
public void testCheckVersion() throws IOException {
RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store;
// default version
Version defaultVersion = dbStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
dbStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
restartTimelineStore();
dbStore = (RollingLevelDBTimelineStore) store;
// overwrite the compatible version
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
dbStore.storeVersion(incompatibleVersion);
try {
restartTimelineStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline store"));
}
}
@Test
public void testValidateConfig() throws IOException {
Configuration copyConfig = new YarnConfiguration(config);
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_TTL_MS));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE));
}
config = copyConfig;
restartTimelineStore();
}
private void restartTimelineStore() throws IOException {
// need to close so leveldb releases database lock
if (store != null) {
store.close();
}
store = new RollingLevelDBTimelineStore();
store.init(config);
store.start();
}
@Test
public void testGetDomain() throws IOException {
super.testGetDomain();
}
@Test
public void testGetDomains() throws IOException {
super.testGetDomains();
}
@Test
public void testRelatingToNonExistingEntity() throws IOException {
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
entityToStore.setEntityId("TEST_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
Assert.assertEquals("TEST_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("TEST_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
}
@Test
public void testRelatingToEntityInSamePut() throws IOException {
TimelineEntity entityToRelate = new TimelineEntity();
entityToRelate.setEntityType("TEST_ENTITY_TYPE_2");
entityToRelate.setEntityId("TEST_ENTITY_ID_2");
entityToRelate.setDomainId("TEST_DOMAIN");
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
entityToStore.setEntityId("TEST_ENTITY_ID_1");
entityToStore.setDomainId("TEST_DOMAIN");
entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
entities.addEntity(entityToRelate);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId());
Assert.assertEquals("TEST_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("TEST_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
}
@Test
public void testRelatingToOldEntityWithoutDomainId() throws IOException {
// New entity is put in the default domain
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
entityToStore.setEntityId("NEW_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
Assert.assertEquals("NEW_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("NEW_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
// New entity is not put in the default domain
entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
entityToStore.setEntityId("NEW_ENTITY_ID_2");
entityToStore.setDomainId("NON_DEFAULT");
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
entities = new TimelineEntities();
entities.addEntity(entityToStore);
TimelinePutResponse response = store.put(entities);
Assert.assertEquals(1, response.getErrors().size());
Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
response.getErrors().get(0).getErrorCode());
entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
// Still have one related entity
Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
.iterator().next().size());
}
public void testStorePerformance() throws IOException {
TimelineEntity entityToStorePrep = new TimelineEntity();
entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP");
entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP");
entityToStorePrep.setDomainId("TEST_DOMAIN");
entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2",
"TEST_ENTITY_ID_2");
entityToStorePrep.setStartTime(0L);
TimelineEntities entitiesPrep = new TimelineEntities();
entitiesPrep.addEntity(entityToStorePrep);
store.put(entitiesPrep);
long start = System.currentTimeMillis();
int num = 1000000;
Log.info("Start test for " + num);
final String tezTaskAttemptId = "TEZ_TA";
final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_";
final String tezTaskId = "TEZ_T";
final String tezDomainId = "Tez_ATS_application_1429158534256_0001";
TimelineEntity entityToStore = new TimelineEntity();
TimelineEvent startEvt = new TimelineEvent();
entityToStore.setEntityType(tezTaskAttemptId);
startEvt.setEventType("TASK_ATTEMPT_STARTED");
startEvt.setTimestamp(0);
entityToStore.addEvent(startEvt);
entityToStore.setDomainId(tezDomainId);
entityToStore.addPrimaryFilter("status", "SUCCEEDED");
entityToStore.addPrimaryFilter("applicationId",
"application_1429158534256_0001");
entityToStore.addPrimaryFilter("TEZ_VERTEX_ID",
"vertex_1429158534256_0001_1_00");
entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1");
entityToStore.addPrimaryFilter("TEZ_TASK_ID",
"task_1429158534256_0001_1_00_000000");
entityToStore.setStartTime(0L);
entityToStore.addOtherInfo("startTime", 0);
entityToStore.addOtherInfo("inProgressLogsURL",
"localhost:8042/inProgressLogsURL");
entityToStore.addOtherInfo("completedLogsURL", "");
entityToStore.addOtherInfo("nodeId", "localhost:54450");
entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042");
entityToStore.addOtherInfo("containerId",
"container_1429158534256_0001_01_000002");
entityToStore.addOtherInfo("status", "RUNNING");
entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
for (int i = 0; i < num; ++i) {
entityToStore.setEntityId(tezEntityId + i);
store.put(entities);
}
long duration = System.currentTimeMillis() - start;
Log.info("Duration for " + num + ": " + duration);
}
public static void main(String[] args) throws Exception {
TestRollingLevelDBTimelineStore store =
new TestRollingLevelDBTimelineStore();
store.setup();
store.testStorePerformance();
store.tearDown();
}
}