| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.timeline; |
| |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.type.TypeFactory; |
| import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| |
| /** |
| * Utility methods related to the ATS v1.5 plugin storage tests. |
| */ |
| public class PluginStoreTestUtils { |
| |
| /** |
| * For a given file system, setup directories ready to test the plugin storage. |
| * |
| * @param fs a {@link FileSystem} object that the plugin storage will work with |
| * @return the dfsCluster ready to start plugin storage tests. |
| * @throws IOException |
| */ |
| public static FileSystem prepareFileSystemForPluginStore(FileSystem fs) |
| throws IOException { |
| Path activeDir = new Path( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT |
| ); |
| Path doneDir = new Path( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT |
| ); |
| |
| fs.mkdirs(activeDir); |
| fs.mkdirs(doneDir); |
| return fs; |
| } |
| |
| /** |
| * Prepare configuration for plugin tests. This method will also add the mini |
| * DFS cluster's info to the configuration. |
| * Note: the test program needs to setup the reader plugin by itself. |
| * |
| * @param conf |
| * @param dfsCluster |
| * @return the modified configuration |
| */ |
| public static YarnConfiguration prepareConfiguration(YarnConfiguration conf, |
| MiniDFSCluster dfsCluster) { |
| conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, |
| dfsCluster.getURI().toString()); |
| conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f); |
| conf.setLong( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, |
| 1); |
| conf.set(YarnConfiguration.TIMELINE_SERVICE_STORE, |
| EntityGroupFSTimelineStore.class.getName()); |
| return conf; |
| } |
| |
| static FSDataOutputStream createLogFile(Path logPath, FileSystem fs) |
| throws IOException { |
| FSDataOutputStream stream; |
| stream = fs.create(logPath, true); |
| return stream; |
| } |
| |
| static ObjectMapper createObjectMapper() { |
| ObjectMapper mapper = new ObjectMapper(); |
| mapper.setAnnotationIntrospector( |
| new JaxbAnnotationIntrospector(TypeFactory.defaultInstance())); |
| mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| return mapper; |
| } |
| |
| /** |
| * Create sample entities for testing |
| * @return two timeline entities in a {@link TimelineEntities} object |
| */ |
| static TimelineEntities generateTestEntities() { |
| TimelineEntities entities = new TimelineEntities(); |
| Map<String, Set<Object>> primaryFilters = |
| new HashMap<String, Set<Object>>(); |
| Set<Object> l1 = new HashSet<Object>(); |
| l1.add("username"); |
| Set<Object> l2 = new HashSet<Object>(); |
| l2.add(Integer.MAX_VALUE); |
| Set<Object> l3 = new HashSet<Object>(); |
| l3.add("123abc"); |
| Set<Object> l4 = new HashSet<Object>(); |
| l4.add((long)Integer.MAX_VALUE + 1l); |
| primaryFilters.put("user", l1); |
| primaryFilters.put("appname", l2); |
| primaryFilters.put("other", l3); |
| primaryFilters.put("long", l4); |
| Map<String, Object> secondaryFilters = new HashMap<String, Object>(); |
| secondaryFilters.put("startTime", 123456); |
| secondaryFilters.put("status", "RUNNING"); |
| Map<String, Object> otherInfo1 = new HashMap<String, Object>(); |
| otherInfo1.put("info1", "val1"); |
| otherInfo1.putAll(secondaryFilters); |
| |
| String entityId1 = "id_1"; |
| String entityType1 = "type_1"; |
| String entityId2 = "id_2"; |
| String entityType2 = "type_2"; |
| |
| Map<String, Set<String>> relatedEntities = |
| new HashMap<String, Set<String>>(); |
| relatedEntities.put(entityType2, Collections.singleton(entityId2)); |
| |
| TimelineEvent ev3 = createEvent(789l, "launch_event", null); |
| TimelineEvent ev4 = createEvent(0l, "init_event", null); |
| List<TimelineEvent> events = new ArrayList<TimelineEvent>(); |
| events.add(ev3); |
| events.add(ev4); |
| entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null, |
| null, null, "domain_id_1")); |
| |
| TimelineEvent ev1 = createEvent(123l, "start_event", null); |
| entities.addEntity(createEntity(entityId1, entityType1, 123l, |
| Collections.singletonList(ev1), relatedEntities, primaryFilters, |
| otherInfo1, "domain_id_1")); |
| return entities; |
| } |
| |
| static void verifyTestEntities(TimelineDataManager tdm) |
| throws YarnException, IOException { |
| TimelineEntity entity1 = tdm.getEntity("type_1", "id_1", |
| EnumSet.allOf(TimelineReader.Field.class), |
| UserGroupInformation.getLoginUser()); |
| TimelineEntity entity2 = tdm.getEntity("type_2", "id_2", |
| EnumSet.allOf(TimelineReader.Field.class), |
| UserGroupInformation.getLoginUser()); |
| assertNotNull(entity1); |
| assertNotNull(entity2); |
| assertEquals((Long) 123l, entity1.getStartTime(), "Failed to read out entity 1"); |
| assertEquals((Long) 456l, entity2.getStartTime(), "Failed to read out entity 2"); |
| } |
| |
| /** |
| * Create a test entity |
| */ |
| static TimelineEntity createEntity(String entityId, String entityType, |
| Long startTime, List<TimelineEvent> events, |
| Map<String, Set<String>> relatedEntities, |
| Map<String, Set<Object>> primaryFilters, |
| Map<String, Object> otherInfo, String domainId) { |
| TimelineEntity entity = new TimelineEntity(); |
| entity.setEntityId(entityId); |
| entity.setEntityType(entityType); |
| entity.setStartTime(startTime); |
| entity.setEvents(events); |
| if (relatedEntities != null) { |
| for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) { |
| for (String v : e.getValue()) { |
| entity.addRelatedEntity(e.getKey(), v); |
| } |
| } |
| } else { |
| entity.setRelatedEntities(null); |
| } |
| entity.setPrimaryFilters(primaryFilters); |
| entity.setOtherInfo(otherInfo); |
| entity.setDomainId(domainId); |
| return entity; |
| } |
| |
| /** |
| * Create a test event |
| */ |
| static TimelineEvent createEvent(long timestamp, String type, Map<String, |
| Object> info) { |
| TimelineEvent event = new TimelineEvent(); |
| event.setTimestamp(timestamp); |
| event.setEventType(type); |
| event.setEventInfo(info); |
| return event; |
| } |
| |
| /** |
| * Write timeline entities to a file system |
| * @param entities |
| * @param logPath |
| * @param fs |
| * @throws IOException |
| */ |
| static void writeEntities(TimelineEntities entities, Path logPath, |
| FileSystem fs) throws IOException { |
| FSDataOutputStream outStream = createLogFile(logPath, fs); |
| JsonGenerator jsonGenerator |
| = new JsonFactory().createGenerator((OutputStream)outStream); |
| jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); |
| ObjectMapper objMapper = createObjectMapper(); |
| for (TimelineEntity entity : entities.getEntities()) { |
| objMapper.writeValue(jsonGenerator, entity); |
| } |
| outStream.close(); |
| } |
| |
| static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) { |
| TimelineACLsManager aclManager = new TimelineACLsManager(config); |
| TimelineDataManager tdm = new TimelineDataManager(store, aclManager); |
| tdm.init(config); |
| return tdm; |
| } |
| |
| static TimelineDataManager getTdmWithMemStore(Configuration config) { |
| TimelineStore store = new MemoryTimelineStore("MemoryStore.test"); |
| TimelineDataManager tdm = getTdmWithStore(config, store); |
| return tdm; |
| } |
| |
| } |