| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timelineservice.documentstore; |
| |
| import com.microsoft.azure.cosmosdb.ConnectionPolicy; |
| import com.microsoft.azure.cosmosdb.ConsistencyLevel; |
| import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; |
| import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEventSubDoc; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument; |
| import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| /** |
| * This class consists of all the utils required for reading or writing |
| * documents for a {@link DocumentStoreVendor}. |
| */ |
| public final class DocumentStoreUtils { |
| |
| private DocumentStoreUtils(){} |
| |
| /** milliseconds in one day. */ |
| private static final long MILLIS_ONE_DAY = 86400000L; |
| |
| private static final String TIMELINE_STORE_TYPE = |
| YarnConfiguration.TIMELINE_SERVICE_PREFIX + "document-store-type"; |
| static final String TIMELINE_SERVICE_COSMOSDB_ENDPOINT = |
| "yarn.timeline-service.document-store.cosmos-db.endpoint"; |
| static final String TIMELINE_SERVICE_COSMOSDB_MASTER_KEY = |
| "yarn.timeline-service.document-store.cosmos-db.masterkey"; |
| static final String TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME = |
| "yarn.timeline-service.document-store.db-name"; |
| private static final String |
| DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME = "timeline_service"; |
| |
| /** |
| * Checks whether the cosmosdb conf are set properly in yarn-site.xml conf. |
| * @param conf |
| * related to yarn |
| * @throws YarnException if required config properties are missing |
| */ |
| public static void validateCosmosDBConf(Configuration conf) |
| throws YarnException { |
| if (conf == null) { |
| throw new NullPointerException("Configuration cannot be null"); |
| } |
| if (isNullOrEmpty(conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT), |
| conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY))) { |
| throw new YarnException("One or more CosmosDB configuration property is" + |
| " missing in yarn-site.xml"); |
| } |
| } |
| |
| /** |
| * Retrieves {@link DocumentStoreVendor} configured. |
| * @param conf |
| * related to yarn |
| * @return Returns the {@link DocumentStoreVendor} that is configured, else |
| * uses {@link DocumentStoreVendor#COSMOS_DB} as default |
| */ |
| public static DocumentStoreVendor getStoreVendor(Configuration conf) { |
| return DocumentStoreVendor.getStoreType(conf.get(TIMELINE_STORE_TYPE, |
| DocumentStoreVendor.COSMOS_DB.name())); |
| } |
| |
| /** |
| * Retrieves a {@link TimelineEvent} from {@link TimelineEntity#events}. |
| * @param timelineEntity |
| * from which the set of events are examined. |
| * @param eventType |
| * that has to be checked. |
| * @return {@link TimelineEvent} if found else null |
| */ |
| public static TimelineEvent fetchEvent(TimelineEntity timelineEntity, |
| String eventType) { |
| for (TimelineEvent event : timelineEntity.getEvents()) { |
| if (event.getId().equals(eventType)) { |
| return event; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Checks if the string is null or empty. |
| * @param values |
| * array of string to be checked |
| * @return false if any of the string is null or empty else true |
| */ |
| public static boolean isNullOrEmpty(String...values) { |
| if (values == null || values.length == 0) { |
| return true; |
| } |
| |
| for (String value : values) { |
| if (value == null || value.isEmpty()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Creates CosmosDB Async Document Client. |
| * @param conf |
| * to retrieve cosmos db endpoint and key |
| * @return async document client for CosmosDB |
| */ |
| public static AsyncDocumentClient createCosmosDBAsyncClient( |
| Configuration conf){ |
| return new AsyncDocumentClient.Builder() |
| .withServiceEndpoint(DocumentStoreUtils.getCosmosDBEndpoint(conf)) |
| .withMasterKeyOrResourceToken( |
| DocumentStoreUtils.getCosmosDBMasterKey(conf)) |
| .withConnectionPolicy(ConnectionPolicy.GetDefault()) |
| .withConsistencyLevel(ConsistencyLevel.Session) |
| .build(); |
| } |
| |
| /** |
| * Returns the timestamp of the day's start (which is midnight 00:00:00 AM) |
| * for a given input timestamp. |
| * |
| * @param timeStamp Timestamp. |
| * @return timestamp of that day's beginning (midnight) |
| */ |
| public static long getTopOfTheDayTimestamp(long timeStamp) { |
| return timeStamp - (timeStamp % MILLIS_ONE_DAY); |
| } |
| |
| /** |
| * Creates a composite key for storing {@link TimelineEntityDocument}. |
| * @param collectorContext |
| * of the timeline writer |
| * @param type |
| * of the entity |
| * @return composite key delimited with ! |
| */ |
| public static String constructTimelineEntityDocId(TimelineCollectorContext |
| collectorContext, String type) { |
| return String.format("%s!%s!%s!%d!%s!%s", |
| collectorContext.getClusterId(), collectorContext.getUserId(), |
| collectorContext.getFlowName(), collectorContext.getFlowRunId(), |
| collectorContext.getAppId(), type); |
| } |
| |
| /** |
| * Creates a composite key for storing {@link TimelineEntityDocument}. |
| * @param collectorContext |
| * of the timeline writer |
| * @param type |
| * of the entity |
| * @param id |
| * of the entity |
| * @return composite key delimited with ! |
| */ |
| public static String constructTimelineEntityDocId(TimelineCollectorContext |
| collectorContext, String type, String id) { |
| return String.format("%s!%s!%s!%d!%s!%s!%s", |
| collectorContext.getClusterId(), collectorContext.getUserId(), |
| collectorContext.getFlowName(), collectorContext.getFlowRunId(), |
| collectorContext.getAppId(), type, id); |
| } |
| |
| /** |
| * Creates a composite key for storing {@link FlowRunDocument}. |
| * @param collectorContext |
| * of the timeline writer |
| * @return composite key delimited with ! |
| */ |
| public static String constructFlowRunDocId(TimelineCollectorContext |
| collectorContext) { |
| return String.format("%s!%s!%s!%s", collectorContext.getClusterId(), |
| collectorContext.getUserId(), collectorContext.getFlowName(), |
| collectorContext.getFlowRunId()); |
| } |
| |
| /** |
| * Creates a composite key for storing {@link FlowActivityDocument}. |
| * @param collectorContext |
| * of the timeline writer |
| * @param eventTimestamp |
| * of the timeline entity |
| * @return composite key delimited with ! |
| */ |
| public static String constructFlowActivityDocId(TimelineCollectorContext |
| collectorContext, long eventTimestamp) { |
| return String.format("%s!%s!%s!%s", collectorContext.getClusterId(), |
| getTopOfTheDayTimestamp(eventTimestamp), |
| collectorContext.getUserId(), collectorContext.getFlowName()); |
| } |
| |
| private static String getCosmosDBEndpoint(Configuration conf) { |
| return conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT); |
| } |
| |
| private static String getCosmosDBMasterKey(Configuration conf) { |
| return conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY); |
| } |
| |
| public static String getCosmosDBDatabaseName(Configuration conf) { |
| return conf.get(TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME, |
| getDefaultTimelineServiceDBName(conf)); |
| } |
| |
| private static String getDefaultTimelineServiceDBName( |
| Configuration conf) { |
| return getClusterId(conf) + "_" + |
| DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME; |
| } |
| |
| private static String getClusterId(Configuration conf) { |
| return conf.get(YarnConfiguration.RM_CLUSTER_ID, |
| YarnConfiguration.DEFAULT_RM_CLUSTER_ID); |
| } |
| |
| private static boolean isTimeInRange(long time, long timeBegin, |
| long timeEnd) { |
| return (time >= timeBegin) && (time <= timeEnd); |
| } |
| |
| /** |
| * Checks if the {@link TimelineEntityFilters} are not matching for a given |
| * {@link TimelineEntity}. |
| * @param filters |
| * that has to be checked for an entity |
| * @param timelineEntity |
| * for which the filters would be applied |
| * @return true if any one of the filter is not matching else false |
| * @throws IOException if an unsupported filter is being matched. |
| */ |
| static boolean isFilterNotMatching(TimelineEntityFilters filters, |
| TimelineEntity timelineEntity) throws IOException { |
| if (timelineEntity.getCreatedTime() != null && !isTimeInRange(timelineEntity |
| .getCreatedTime(), filters.getCreatedTimeBegin(), |
| filters.getCreatedTimeEnd())) { |
| return true; |
| } |
| |
| if (filters.getRelatesTo() != null && |
| !filters.getRelatesTo().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchRelatesTo(timelineEntity, |
| filters.getRelatesTo())) { |
| return true; |
| } |
| |
| if (filters.getIsRelatedTo() != null && |
| !filters.getIsRelatedTo().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchIsRelatedTo(timelineEntity, |
| filters.getIsRelatedTo())) { |
| return true; |
| } |
| |
| if (filters.getInfoFilters() != null && |
| !filters.getInfoFilters().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchInfoFilters(timelineEntity, |
| filters.getInfoFilters())) { |
| return true; |
| } |
| |
| if (filters.getConfigFilters() != null && |
| !filters.getConfigFilters().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchConfigFilters(timelineEntity, |
| filters.getConfigFilters())) { |
| return true; |
| } |
| |
| if (filters.getMetricFilters() != null && |
| !filters.getMetricFilters().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchMetricFilters(timelineEntity, |
| filters.getMetricFilters())) { |
| return true; |
| } |
| |
| return filters.getEventFilters() != null && |
| !filters.getEventFilters().getFilterList().isEmpty() && |
| !TimelineStorageUtils.matchEventFilters(timelineEntity, |
| filters.getEventFilters()); |
| } |
| |
| /** |
| * Creates the final entity to be returned as the result. |
| * @param timelineEntityDocument |
| * which has all the information for the entity |
| * @param dataToRetrieve |
| * specifies filters and fields to retrieve |
| * @return {@link TimelineEntity} as the result |
| */ |
| public static TimelineEntity createEntityToBeReturned( |
| TimelineEntityDocument timelineEntityDocument, |
| TimelineDataToRetrieve dataToRetrieve) { |
| TimelineEntity entityToBeReturned = createTimelineEntity( |
| timelineEntityDocument.getType(), |
| timelineEntityDocument.fetchTimelineEntity()); |
| |
| entityToBeReturned.setIdentifier(new TimelineEntity.Identifier( |
| timelineEntityDocument.getType(), timelineEntityDocument.getId())); |
| entityToBeReturned.setCreatedTime( |
| timelineEntityDocument.getCreatedTime()); |
| entityToBeReturned.setInfo(timelineEntityDocument.getInfo()); |
| |
| if (dataToRetrieve.getFieldsToRetrieve() != null) { |
| fillFields(entityToBeReturned, timelineEntityDocument, |
| dataToRetrieve); |
| } |
| return entityToBeReturned; |
| } |
| |
| /** |
| * Creates the final entity to be returned as the result. |
| * @param timelineEntityDocument |
| * which has all the information for the entity |
| * @param confsToRetrieve |
| * specifies config filters to be applied |
| * @param metricsToRetrieve |
| * specifies metric filters to be applied |
| * |
| * @return {@link TimelineEntity} as the result |
| */ |
| public static TimelineEntity createEntityToBeReturned( |
| TimelineEntityDocument timelineEntityDocument, |
| TimelineFilterList confsToRetrieve, |
| TimelineFilterList metricsToRetrieve) { |
| TimelineEntity timelineEntity = timelineEntityDocument |
| .fetchTimelineEntity(); |
| if (confsToRetrieve != null) { |
| timelineEntity.setConfigs(DocumentStoreUtils.applyConfigFilter( |
| confsToRetrieve, timelineEntity.getConfigs())); |
| } |
| if (metricsToRetrieve != null) { |
| timelineEntity.setMetrics(DocumentStoreUtils.transformMetrics( |
| metricsToRetrieve, timelineEntityDocument.getMetrics())); |
| } |
| return timelineEntity; |
| } |
| |
| private static TimelineEntity createTimelineEntity(String type, |
| TimelineEntity timelineEntity) { |
| switch (TimelineEntityType.valueOf(type)) { |
| case YARN_APPLICATION: |
| return new ApplicationEntity(); |
| case YARN_FLOW_RUN: |
| return new FlowRunEntity(); |
| case YARN_FLOW_ACTIVITY: |
| FlowActivityEntity flowActivityEntity = |
| (FlowActivityEntity) timelineEntity; |
| FlowActivityEntity newFlowActivity = new FlowActivityEntity(); |
| newFlowActivity.addFlowRuns(flowActivityEntity.getFlowRuns()); |
| return newFlowActivity; |
| default: |
| return new TimelineEntity(); |
| } |
| } |
| |
| // fetch required fields for final entity to be returned |
| private static void fillFields(TimelineEntity finalEntity, |
| TimelineEntityDocument entityDoc, |
| TimelineDataToRetrieve dataToRetrieve) { |
| EnumSet<TimelineReader.Field> fieldsToRetrieve = |
| dataToRetrieve.getFieldsToRetrieve(); |
| if (fieldsToRetrieve.contains(TimelineReader.Field.ALL)) { |
| fieldsToRetrieve = EnumSet.allOf(TimelineReader.Field.class); |
| } |
| for (TimelineReader.Field field : fieldsToRetrieve) { |
| switch(field) { |
| case CONFIGS: |
| finalEntity.setConfigs(applyConfigFilter(dataToRetrieve |
| .getConfsToRetrieve(), entityDoc.getConfigs())); |
| break; |
| case METRICS: |
| finalEntity.setMetrics(transformMetrics(dataToRetrieve |
| .getMetricsToRetrieve(), entityDoc.getMetrics())); |
| break; |
| case INFO: |
| finalEntity.setInfo(entityDoc.getInfo()); |
| break; |
| case IS_RELATED_TO: |
| finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities()); |
| break; |
| case RELATES_TO: |
| finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities()); |
| break; |
| case EVENTS: |
| finalEntity.setEvents(transformEvents(entityDoc.getEvents().values())); |
| break; |
| default: |
| } |
| } |
| } |
| |
| /* Transforms Collection<Set<TimelineEventSubDoc>> to |
| NavigableSet<TimelineEvent> */ |
| private static NavigableSet<TimelineEvent> transformEvents( |
| Collection<Set<TimelineEventSubDoc>> eventSetColl) { |
| NavigableSet<TimelineEvent> timelineEvents = new TreeSet<>(); |
| for (Set<TimelineEventSubDoc> eventSubDocs : eventSetColl) { |
| for (TimelineEventSubDoc eventSubDoc : eventSubDocs) { |
| timelineEvents.add(eventSubDoc.fetchTimelineEvent()); |
| } |
| } |
| return timelineEvents; |
| } |
| |
| public static Set<TimelineMetric> transformMetrics( |
| TimelineFilterList metricsToRetrieve, |
| Map<String, Set<TimelineMetricSubDoc>> metrics) { |
| if (metricsToRetrieve == null || |
| hasDataToBeRetrieve(metricsToRetrieve, metrics.keySet())) { |
| Set<TimelineMetric> metricSet = new HashSet<>(); |
| for(Set<TimelineMetricSubDoc> metricSubDocs : metrics.values()) { |
| for(TimelineMetricSubDoc metricSubDoc : metricSubDocs) { |
| metricSet.add(metricSubDoc.fetchTimelineMetric()); |
| } |
| } |
| return metricSet; |
| } |
| return new HashSet<>(); |
| } |
| |
| public static Map<String, String> applyConfigFilter( |
| TimelineFilterList configsToRetrieve, Map<String, String> configs) { |
| if (configsToRetrieve == null || |
| hasDataToBeRetrieve(configsToRetrieve, configs.keySet())) { |
| return configs; |
| } |
| return new HashMap<>(); |
| } |
| |
| private static boolean hasDataToBeRetrieve( |
| TimelineFilterList timelineFilters, Set<String> dataSet) { |
| Set<String> dataToBeRetrieved = new HashSet<>(); |
| TimelinePrefixFilter timelinePrefixFilter; |
| for (TimelineFilter timelineFilter : timelineFilters.getFilterList()) { |
| timelinePrefixFilter = (TimelinePrefixFilter) timelineFilter; |
| dataToBeRetrieved.add(timelinePrefixFilter.getPrefix()); |
| } |
| switch (timelineFilters.getOperator()) { |
| case OR: |
| if (dataToBeRetrieved.size() == 0 || |
| !Collections.disjoint(dataSet, dataToBeRetrieved)) { |
| return true; |
| } |
| case AND: |
| if (dataToBeRetrieved.size() == 0 || |
| dataSet.containsAll(dataToBeRetrieved)) { |
| return true; |
| } |
| default: |
| return false; |
| } |
| } |
| } |