| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.iq80.leveldb.DBIterator; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong; |
| import static org.junit.Assert.assertEquals; |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { |
| private FileContext fsContext; |
| private File fsPath; |
| |
| @Before |
| public void setup() throws Exception { |
| fsContext = FileContext.getLocalFSFileContext(); |
| Configuration conf = new Configuration(); |
| fsPath = new File("target", this.getClass().getSimpleName() + |
| "-tmpDir").getAbsoluteFile(); |
| fsContext.delete(new Path(fsPath.getAbsolutePath()), true); |
| conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, |
| fsPath.getAbsolutePath()); |
| conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); |
| store = new LeveldbTimelineStore(); |
| store.init(conf); |
| store.start(); |
| loadTestData(); |
| loadVerificationData(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| store.stop(); |
| fsContext.delete(new Path(fsPath.getAbsolutePath()), true); |
| } |
| |
| @Test |
| public void testGetSingleEntity() throws IOException { |
| super.testGetSingleEntity(); |
| ((LeveldbTimelineStore)store).clearStartTimeCache(); |
| super.testGetSingleEntity(); |
| loadTestData(); |
| } |
| |
| @Test |
| public void testGetEntities() throws IOException { |
| super.testGetEntities(); |
| } |
| |
| @Test |
| public void testGetEntitiesWithFromId() throws IOException { |
| super.testGetEntitiesWithFromId(); |
| } |
| |
| @Test |
| public void testGetEntitiesWithFromTs() throws IOException { |
| super.testGetEntitiesWithFromTs(); |
| } |
| |
| @Test |
| public void testGetEntitiesWithPrimaryFilters() throws IOException { |
| super.testGetEntitiesWithPrimaryFilters(); |
| } |
| |
| @Test |
| public void testGetEntitiesWithSecondaryFilters() throws IOException { |
| super.testGetEntitiesWithSecondaryFilters(); |
| } |
| |
| @Test |
| public void testGetEvents() throws IOException { |
| super.testGetEvents(); |
| } |
| |
| @Test |
| public void testCacheSizes() { |
| Configuration conf = new Configuration(); |
| assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); |
| assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); |
| conf.setInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, |
| 10001); |
| assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); |
| conf = new Configuration(); |
| conf.setInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, |
| 10002); |
| assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); |
| } |
| |
| private boolean deleteNextEntity(String entityType, byte[] ts) |
| throws IOException, InterruptedException { |
| DBIterator iterator = null; |
| DBIterator pfIterator = null; |
| try { |
| iterator = ((LeveldbTimelineStore)store).getDbIterator(false); |
| pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false); |
| return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, |
| iterator, pfIterator, false); |
| } finally { |
| IOUtils.cleanup(null, iterator, pfIterator); |
| } |
| } |
| |
| @Test |
| public void testGetEntityTypes() throws IOException { |
| List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes(); |
| assertEquals(4, entityTypes.size()); |
| assertEquals(entityType1, entityTypes.get(0)); |
| assertEquals(entityType2, entityTypes.get(1)); |
| assertEquals(entityType4, entityTypes.get(2)); |
| assertEquals(entityType5, entityTypes.get(3)); |
| } |
| |
| @Test |
| public void testDeleteEntities() throws IOException, InterruptedException { |
| assertEquals(2, getEntities("type_1").size()); |
| assertEquals(1, getEntities("type_2").size()); |
| |
| assertEquals(false, deleteNextEntity(entityType1, |
| writeReverseOrderedLong(122l))); |
| assertEquals(2, getEntities("type_1").size()); |
| assertEquals(1, getEntities("type_2").size()); |
| |
| assertEquals(true, deleteNextEntity(entityType1, |
| writeReverseOrderedLong(123l))); |
| List<TimelineEntity> entities = getEntities("type_2"); |
| assertEquals(1, entities.size()); |
| verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( |
| entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS, |
| EMPTY_MAP, entities.get(0)); |
| entities = getEntitiesWithPrimaryFilter("type_1", userFilter); |
| assertEquals(1, entities.size()); |
| verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, |
| primaryFilters, otherInfo, entities.get(0)); |
| |
| ((LeveldbTimelineStore)store).discardOldEntities(-123l); |
| assertEquals(1, getEntities("type_1").size()); |
| assertEquals(0, getEntities("type_2").size()); |
| assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size()); |
| |
| ((LeveldbTimelineStore)store).discardOldEntities(123l); |
| assertEquals(0, getEntities("type_1").size()); |
| assertEquals(0, getEntities("type_2").size()); |
| assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); |
| assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); |
| } |
| |
| @Test |
| public void testDeleteEntitiesPrimaryFilters() |
| throws IOException, InterruptedException { |
| Map<String, Set<Object>> primaryFilter = |
| Collections.singletonMap("user", Collections.singleton( |
| (Object) "otheruser")); |
| TimelineEntities atsEntities = new TimelineEntities(); |
| atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b, |
| entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter, |
| null))); |
| TimelinePutResponse response = store.put(atsEntities); |
| assertEquals(0, response.getErrors().size()); |
| |
| NameValuePair pfPair = new NameValuePair("user", "otheruser"); |
| List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1", |
| pfPair); |
| assertEquals(1, entities.size()); |
| verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2), |
| EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0)); |
| |
| entities = getEntitiesWithPrimaryFilter("type_1", userFilter); |
| assertEquals(2, entities.size()); |
| verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, |
| primaryFilters, otherInfo, entities.get(0)); |
| verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, |
| primaryFilters, otherInfo, entities.get(1)); |
| |
| ((LeveldbTimelineStore)store).discardOldEntities(-123l); |
| assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); |
| assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); |
| |
| ((LeveldbTimelineStore)store).discardOldEntities(123l); |
| assertEquals(0, getEntities("type_1").size()); |
| assertEquals(0, getEntities("type_2").size()); |
| assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); |
| |
| assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); |
| assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); |
| } |
| |
| @Test |
| public void testFromTsWithDeletion() |
| throws IOException, InterruptedException { |
| long l = System.currentTimeMillis(); |
| assertEquals(2, getEntitiesFromTs("type_1", l).size()); |
| assertEquals(1, getEntitiesFromTs("type_2", l).size()); |
| assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, |
| l).size()); |
| ((LeveldbTimelineStore)store).discardOldEntities(123l); |
| assertEquals(0, getEntitiesFromTs("type_1", l).size()); |
| assertEquals(0, getEntitiesFromTs("type_2", l).size()); |
| assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, |
| l).size()); |
| assertEquals(0, getEntities("type_1").size()); |
| assertEquals(0, getEntities("type_2").size()); |
| assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, |
| l).size()); |
| loadTestData(); |
| assertEquals(0, getEntitiesFromTs("type_1", l).size()); |
| assertEquals(0, getEntitiesFromTs("type_2", l).size()); |
| assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, |
| l).size()); |
| assertEquals(2, getEntities("type_1").size()); |
| assertEquals(1, getEntities("type_2").size()); |
| assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); |
| } |
| |
| } |