| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timeline; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.SortedSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; |
| import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; |
| import org.apache.hadoop.yarn.webapp.BadRequestException; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The class wrap over the timeline store and the ACLs manager. It does some non |
| * trivial manipulation of the timeline data before putting or after getting it |
| * from the timeline store, and checks the user's access to it. |
| * |
| */ |
| public class TimelineDataManager extends AbstractService { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TimelineDataManager.class); |
| @VisibleForTesting |
| public static final String DEFAULT_DOMAIN_ID = "DEFAULT"; |
| |
| private TimelineDataManagerMetrics metrics; |
| private TimelineStore store; |
| private TimelineACLsManager timelineACLsManager; |
| |
| public TimelineDataManager(TimelineStore store, |
| TimelineACLsManager timelineACLsManager) { |
| super(TimelineDataManager.class.getName()); |
| this.store = store; |
| this.timelineACLsManager = timelineACLsManager; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| metrics = TimelineDataManagerMetrics.create(); |
| TimelineDomain domain = store.getDomain("DEFAULT"); |
| // it is okay to reuse an existing domain even if it was created by another |
| // user of the timeline server before, because it allows everybody to access. |
| if (domain == null) { |
| // create a default domain, which allows everybody to access and modify |
| // the entities in it. |
| domain = new TimelineDomain(); |
| domain.setId(DEFAULT_DOMAIN_ID); |
| domain.setDescription("System Default Domain"); |
| domain.setOwner( |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| domain.setReaders("*"); |
| domain.setWriters("*"); |
| store.put(domain); |
| } |
| super.serviceInit(conf); |
| } |
| |
| public interface CheckAcl { |
| boolean check(TimelineEntity entity) throws IOException; |
| } |
| |
| class CheckAclImpl implements CheckAcl { |
| final UserGroupInformation ugi; |
| |
| public CheckAclImpl(UserGroupInformation callerUGI) { |
| ugi = callerUGI; |
| } |
| |
| public boolean check(TimelineEntity entity) throws IOException { |
| try{ |
| return timelineACLsManager.checkAccess( |
| ugi, ApplicationAccessType.VIEW_APP, entity); |
| } catch (YarnException e) { |
| LOG.info("Error when verifying access for user " + ugi |
| + " on the events of the timeline entity " |
| + new EntityIdentifier(entity.getEntityId(), |
| entity.getEntityType()), e); |
| return false; |
| } |
| } |
| } |
| |
| /** |
| * Get the timeline entities that the given user have access to. The meaning |
| * of each argument has been documented with |
| * {@link TimelineReader#getEntities}. |
| * |
| * @see TimelineReader#getEntities |
| */ |
| public TimelineEntities getEntities( |
| String entityType, |
| NameValuePair primaryFilter, |
| Collection<NameValuePair> secondaryFilter, |
| Long windowStart, |
| Long windowEnd, |
| String fromId, |
| Long fromTs, |
| Long limit, |
| EnumSet<Field> fields, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrGetEntitiesOps(); |
| try { |
| TimelineEntities entities = doGetEntities( |
| entityType, |
| primaryFilter, |
| secondaryFilter, |
| windowStart, |
| windowEnd, |
| fromId, |
| fromTs, |
| limit, |
| fields, |
| callerUGI); |
| metrics.incrGetEntitiesTotal(entities.getEntities().size()); |
| return entities; |
| } finally { |
| metrics.addGetEntitiesTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelineEntities doGetEntities( |
| String entityType, |
| NameValuePair primaryFilter, |
| Collection<NameValuePair> secondaryFilter, |
| Long windowStart, |
| Long windowEnd, |
| String fromId, |
| Long fromTs, |
| Long limit, |
| EnumSet<Field> fields, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineEntities entities = null; |
| entities = store.getEntities( |
| entityType, |
| limit, |
| windowStart, |
| windowEnd, |
| fromId, |
| fromTs, |
| primaryFilter, |
| secondaryFilter, |
| fields, |
| new CheckAclImpl(callerUGI)); |
| |
| if (entities == null) { |
| return new TimelineEntities(); |
| } |
| return entities; |
| } |
| |
| /** |
| * Get the single timeline entity that the given user has access to. The |
| * meaning of each argument has been documented with |
| * {@link TimelineReader#getEntity}. |
| * |
| * @see TimelineReader#getEntity |
| */ |
| public TimelineEntity getEntity( |
| String entityType, |
| String entityId, |
| EnumSet<Field> fields, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrGetEntityOps(); |
| try { |
| return doGetEntity(entityType, entityId, fields, callerUGI); |
| } finally { |
| metrics.addGetEntityTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelineEntity doGetEntity( |
| String entityType, |
| String entityId, |
| EnumSet<Field> fields, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineEntity entity = null; |
| entity = |
| store.getEntity(entityId, entityType, fields); |
| if (entity != null) { |
| addDefaultDomainIdIfAbsent(entity); |
| // check ACLs |
| if (!timelineACLsManager.checkAccess( |
| callerUGI, ApplicationAccessType.VIEW_APP, entity)) { |
| entity = null; |
| } |
| } |
| return entity; |
| } |
| |
| /** |
| * Get the events whose entities the given user has access to. The meaning of |
| * each argument has been documented with |
| * {@link TimelineReader#getEntityTimelines}. |
| * |
| * @see TimelineReader#getEntityTimelines |
| */ |
| public TimelineEvents getEvents( |
| String entityType, |
| SortedSet<String> entityIds, |
| SortedSet<String> eventTypes, |
| Long windowStart, |
| Long windowEnd, |
| Long limit, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrGetEventsOps(); |
| try { |
| TimelineEvents events = doGetEvents( |
| entityType, |
| entityIds, |
| eventTypes, |
| windowStart, |
| windowEnd, |
| limit, |
| callerUGI); |
| metrics.incrGetEventsTotal(events.getAllEvents().size()); |
| return events; |
| } finally { |
| metrics.addGetEventsTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelineEvents doGetEvents( |
| String entityType, |
| SortedSet<String> entityIds, |
| SortedSet<String> eventTypes, |
| Long windowStart, |
| Long windowEnd, |
| Long limit, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineEvents events = null; |
| events = store.getEntityTimelines( |
| entityType, |
| entityIds, |
| limit, |
| windowStart, |
| windowEnd, |
| eventTypes); |
| if (events != null) { |
| Iterator<TimelineEvents.EventsOfOneEntity> eventsItr = |
| events.getAllEvents().iterator(); |
| while (eventsItr.hasNext()) { |
| TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); |
| try { |
| TimelineEntity entity = store.getEntity( |
| eventsOfOneEntity.getEntityId(), |
| eventsOfOneEntity.getEntityType(), |
| EnumSet.of(Field.PRIMARY_FILTERS)); |
| addDefaultDomainIdIfAbsent(entity); |
| // check ACLs |
| if (!timelineACLsManager.checkAccess( |
| callerUGI, ApplicationAccessType.VIEW_APP, entity)) { |
| eventsItr.remove(); |
| } |
| } catch (Exception e) { |
| LOG.warn("Error when verifying access for user " + callerUGI |
| + " on the events of the timeline entity " |
| + new EntityIdentifier(eventsOfOneEntity.getEntityId(), |
| eventsOfOneEntity.getEntityType()), e); |
| eventsItr.remove(); |
| } |
| } |
| } |
| if (events == null) { |
| return new TimelineEvents(); |
| } |
| return events; |
| } |
| |
| /** |
| * Store the timeline entities into the store and set the owner of them to the |
| * given user. |
| */ |
| public TimelinePutResponse postEntities( |
| TimelineEntities entities, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrPostEntitiesOps(); |
| try { |
| return doPostEntities(entities, callerUGI); |
| } finally { |
| metrics.addPostEntitiesTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelinePutResponse doPostEntities( |
| TimelineEntities entities, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| if (entities == null) { |
| return new TimelinePutResponse(); |
| } |
| metrics.incrPostEntitiesTotal(entities.getEntities().size()); |
| TimelineEntities entitiesToPut = new TimelineEntities(); |
| List<TimelinePutResponse.TimelinePutError> errors = |
| new ArrayList<TimelinePutResponse.TimelinePutError>(); |
| for (TimelineEntity entity : entities.getEntities()) { |
| |
| // if the domain id is not specified, the entity will be put into |
| // the default domain |
| if (entity.getDomainId() == null || |
| entity.getDomainId().length() == 0) { |
| entity.setDomainId(DEFAULT_DOMAIN_ID); |
| } |
| if (entity.getEntityId() == null || entity.getEntityType() == null) { |
| throw new BadRequestException("Incomplete entity without entity" |
| + " id/type"); |
| } |
| // check if there is existing entity |
| TimelineEntity existingEntity = null; |
| try { |
| existingEntity = |
| store.getEntity(entity.getEntityId(), entity.getEntityType(), |
| EnumSet.of(Field.PRIMARY_FILTERS)); |
| if (existingEntity != null) { |
| addDefaultDomainIdIfAbsent(existingEntity); |
| if (!existingEntity.getDomainId().equals(entity.getDomainId())) { |
| throw new YarnException("The domain of the timeline entity " |
| + "{ id: " + entity.getEntityId() + ", type: " |
| + entity.getEntityType() + " } is not allowed to be changed from " |
| + existingEntity.getDomainId() + " to " + entity.getDomainId()); |
| } |
| } |
| if (!timelineACLsManager.checkAccess( |
| callerUGI, ApplicationAccessType.MODIFY_APP, entity)) { |
| throw new YarnException(callerUGI |
| + " is not allowed to put the timeline entity " |
| + "{ id: " + entity.getEntityId() + ", type: " |
| + entity.getEntityType() + " } into the domain " |
| + entity.getDomainId() + "."); |
| } |
| } catch (Exception e) { |
| // Skip the entity which already exists and was put by others |
| LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId() |
| + ", type: "+ entity.getEntityType() + " }", e); |
| TimelinePutResponse.TimelinePutError error = |
| new TimelinePutResponse.TimelinePutError(); |
| error.setEntityId(entity.getEntityId()); |
| error.setEntityType(entity.getEntityType()); |
| error.setErrorCode( |
| TimelinePutResponse.TimelinePutError.ACCESS_DENIED); |
| errors.add(error); |
| continue; |
| } |
| |
| entitiesToPut.addEntity(entity); |
| } |
| |
| TimelinePutResponse response = store.put(entitiesToPut); |
| // add the errors of timeline system filter key conflict |
| response.addErrors(errors); |
| return response; |
| } |
| |
| /** |
| * Add or update an domain. If the domain already exists, only the owner |
| * and the admin can update it. |
| */ |
| public void putDomain(TimelineDomain domain, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrPutDomainOps(); |
| try { |
| doPutDomain(domain, callerUGI); |
| } finally { |
| metrics.addPutDomainTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private void doPutDomain(TimelineDomain domain, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineDomain existingDomain = |
| store.getDomain(domain.getId()); |
| if (existingDomain != null) { |
| if (!timelineACLsManager.checkAccess(callerUGI, existingDomain)) { |
| throw new YarnException(callerUGI.getShortUserName() + |
| " is not allowed to override an existing domain " + |
| existingDomain.getId()); |
| } |
| // Set it again in case ACLs are not enabled: The domain can be |
| // modified by every body, but the owner is not changed. |
| domain.setOwner(existingDomain.getOwner()); |
| } |
| store.put(domain); |
| // If the domain exists already, it is likely to be in the cache. |
| // We need to invalidate it. |
| if (existingDomain != null) { |
| timelineACLsManager.replaceIfExist(domain); |
| } |
| } |
| |
| /** |
| * Get a single domain of the particular ID. If callerUGI is not the owner |
| * or the admin of the domain, null will be returned. |
| */ |
| public TimelineDomain getDomain(String domainId, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrGetDomainOps(); |
| try { |
| return doGetDomain(domainId, callerUGI); |
| } finally { |
| metrics.addGetDomainTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelineDomain doGetDomain(String domainId, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineDomain domain = store.getDomain(domainId); |
| if (domain != null) { |
| if (timelineACLsManager.checkAccess(callerUGI, domain)) { |
| return domain; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Get all the domains that belong to the given owner. If callerUGI is not |
| * the owner or the admin of the domain, empty list is going to be returned. |
| */ |
| public TimelineDomains getDomains(String owner, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| long startTime = Time.monotonicNow(); |
| metrics.incrGetDomainsOps(); |
| try { |
| TimelineDomains domains = doGetDomains(owner, callerUGI); |
| metrics.incrGetDomainsTotal(domains.getDomains().size()); |
| return domains; |
| } finally { |
| metrics.addGetDomainsTime(Time.monotonicNow() - startTime); |
| } |
| } |
| |
| private TimelineDomains doGetDomains(String owner, |
| UserGroupInformation callerUGI) throws YarnException, IOException { |
| TimelineDomains domains = store.getDomains(owner); |
| boolean hasAccess = true; |
| if (domains.getDomains().size() > 0) { |
| // The owner for each domain is the same, just need to check one |
| hasAccess = timelineACLsManager.checkAccess( |
| callerUGI, domains.getDomains().get(0)); |
| } |
| if (hasAccess) { |
| return domains; |
| } else { |
| return new TimelineDomains(); |
| } |
| } |
| |
| private static void addDefaultDomainIdIfAbsent(TimelineEntity entity) { |
| // be compatible with the timeline data created before 2.6 |
| if (entity.getDomainId() == null) { |
| entity.setDomainId(DEFAULT_DOMAIN_ID); |
| } |
| } |
| |
| } |