| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.client.api.impl; |
| |
| import java.io.Closeable; |
| import java.io.Flushable; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.Map.Entry; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.codehaus.jackson.JsonFactory; |
| import org.codehaus.jackson.JsonGenerator; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.map.SerializationConfig.Feature; |
| import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; |
| import org.codehaus.jackson.util.MinimalPrettyPrinter; |
| import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; |
| |
| import com.sun.jersey.api.client.Client; |
| |
| /** |
| * A simple writer class for storing Timeline data in any storage that |
| * implements a basic FileSystem interface. |
| * This writer is used for ATSv1.5. |
| */ |
| @Private |
| @Unstable |
| public class FileSystemTimelineWriter extends TimelineWriter{ |
| |
| private static final Log LOG = LogFactory |
| .getLog(FileSystemTimelineWriter.class); |
| |
| // App log directory must be readable by group so server can access logs |
| // and writable by group so it can be deleted by server |
| private static final short APP_LOG_DIR_PERMISSIONS = 0770; |
| // Logs must be readable by group so server can access them |
| private static final short FILE_LOG_PERMISSIONS = 0640; |
| private static final String DOMAIN_LOG_PREFIX = "domainlog-"; |
| private static final String SUMMARY_LOG_PREFIX = "summarylog-"; |
| private static final String ENTITY_LOG_PREFIX = "entitylog-"; |
| |
| private Path activePath = null; |
| private FileSystem fs = null; |
| private Set<String> summaryEntityTypes; |
| private ObjectMapper objMapper = null; |
| private long flushIntervalSecs; |
| private long cleanIntervalSecs; |
| private long ttl; |
| private LogFDsCache logFDsCache = null; |
| private boolean isAppendSupported; |
| private final AttemptDirCache attemptDirCache; |
| |
| public FileSystemTimelineWriter(Configuration conf, |
| UserGroupInformation authUgi, Client client, URI resURI) |
| throws IOException { |
| super(authUgi, client, resURI); |
| |
| Configuration fsConf = new Configuration(conf); |
| fsConf.setBoolean("dfs.client.retry.policy.enabled", true); |
| String retryPolicy = |
| fsConf.get(YarnConfiguration. |
| TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC, |
| YarnConfiguration. |
| DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC); |
| fsConf.set("dfs.client.retry.policy.spec", retryPolicy); |
| |
| activePath = new Path(fsConf.get( |
| YarnConfiguration |
| .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, |
| YarnConfiguration |
| .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); |
| fs = FileSystem.newInstance(activePath.toUri(), fsConf); |
| |
| // raise FileNotFoundException if the path is not found |
| fs.getFileStatus(activePath); |
| summaryEntityTypes = new HashSet<String>( |
| conf.getStringCollection(YarnConfiguration |
| .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES)); |
| |
| flushIntervalSecs = conf.getLong( |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS, |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT); |
| |
| cleanIntervalSecs = conf.getLong( |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS, |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT); |
| |
| ttl = conf.getLong( |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS, |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT); |
| |
| long timerTaskTTL = conf.getLong( |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS, |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT); |
| |
| logFDsCache = |
| new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl, |
| timerTaskTTL); |
| |
| this.isAppendSupported = conf.getBoolean( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); |
| |
| boolean storeInsideUserDir = conf.getBoolean( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR, |
| false); |
| |
| objMapper = createObjectMapper(); |
| |
| int attemptDirCacheSize = conf.getInt( |
| YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE, |
| YarnConfiguration |
| .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE); |
| |
| attemptDirCache = new AttemptDirCache(attemptDirCacheSize, fs, activePath, |
| authUgi, storeInsideUserDir); |
| |
| if (LOG.isDebugEnabled()) { |
| StringBuilder debugMSG = new StringBuilder(); |
| debugMSG.append( |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS |
| + "=" + flushIntervalSecs + ", " + |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS |
| + "=" + cleanIntervalSecs + ", " + |
| YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS |
| + "=" + ttl + ", " + |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND |
| + "=" + isAppendSupported + ", " + |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR |
| + "=" + storeInsideUserDir + ", " + |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR |
| + "=" + activePath); |
| |
| if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) { |
| debugMSG.append(", " + YarnConfiguration |
| .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES |
| + " = " + summaryEntityTypes); |
| } |
| LOG.debug(debugMSG.toString()); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "FileSystemTimelineWriter writing to " + activePath; |
| } |
| |
| @Override |
| public TimelinePutResponse putEntities( |
| ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, |
| TimelineEntity... entities) throws IOException, YarnException { |
| if (appAttemptId == null) { |
| return putEntities(entities); |
| } |
| |
| List<TimelineEntity> entitiesToDBStore = new ArrayList<TimelineEntity>(); |
| List<TimelineEntity> entitiesToSummaryCache |
| = new ArrayList<TimelineEntity>(); |
| List<TimelineEntity> entitiesToEntityCache |
| = new ArrayList<TimelineEntity>(); |
| Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId); |
| |
| for (TimelineEntity entity : entities) { |
| if (summaryEntityTypes.contains(entity.getEntityType())) { |
| entitiesToSummaryCache.add(entity); |
| } else { |
| if (groupId != null) { |
| entitiesToEntityCache.add(entity); |
| } else { |
| entitiesToDBStore.add(entity); |
| } |
| } |
| } |
| |
| if (!entitiesToSummaryCache.isEmpty()) { |
| Path summaryLogPath = |
| new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Writing summary log for " + appAttemptId.toString() + " to " |
| + summaryLogPath); |
| } |
| this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper, |
| appAttemptId, entitiesToSummaryCache, isAppendSupported); |
| } |
| |
| if (!entitiesToEntityCache.isEmpty()) { |
| Path entityLogPath = |
| new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Writing entity log for " + groupId.toString() + " to " |
| + entityLogPath); |
| } |
| this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper, |
| appAttemptId, groupId, entitiesToEntityCache, isAppendSupported); |
| } |
| |
| if (!entitiesToDBStore.isEmpty()) { |
| putEntities(entitiesToDBStore.toArray( |
| new TimelineEntity[entitiesToDBStore.size()])); |
| } |
| |
| return new TimelinePutResponse(); |
| } |
| |
| @Override |
| public void putDomain(ApplicationAttemptId appAttemptId, |
| TimelineDomain domain) throws IOException, YarnException { |
| if (appAttemptId == null) { |
| putDomain(domain); |
| } else { |
| writeDomain(appAttemptId, domain); |
| } |
| } |
| |
| @Override |
| public synchronized void close() throws Exception { |
| if (logFDsCache != null) { |
| LOG.debug("Closing cache"); |
| logFDsCache.flush(); |
| } |
| IOUtils.cleanup(LOG, logFDsCache, fs); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| if (logFDsCache != null) { |
| LOG.debug("Flushing cache"); |
| logFDsCache.flush(); |
| } |
| } |
| |
| private ObjectMapper createObjectMapper() { |
| ObjectMapper mapper = new ObjectMapper(); |
| mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); |
| mapper.setSerializationInclusion(Inclusion.NON_NULL); |
| mapper.configure(Feature.CLOSE_CLOSEABLE, false); |
| mapper.configure(Feature.FLUSH_AFTER_WRITE_VALUE, false); |
| return mapper; |
| } |
| |
| private void writeDomain(ApplicationAttemptId appAttemptId, |
| TimelineDomain domain) throws IOException { |
| Path domainLogPath = |
| new Path(attemptDirCache.getAppAttemptDir(appAttemptId), |
| DOMAIN_LOG_PREFIX + appAttemptId.toString()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Writing domains for " + appAttemptId.toString() + " to " |
| + domainLogPath); |
| } |
| this.logFDsCache.writeDomainLog( |
| fs, domainLogPath, objMapper, domain, isAppendSupported); |
| } |
| |
| private static class DomainLogFD extends LogFD { |
| public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, |
| boolean isAppendSupported) throws IOException { |
| super(fs, logPath, objMapper, isAppendSupported); |
| } |
| |
| public void writeDomain(TimelineDomain domain) |
| throws IOException { |
| getObjectMapper().writeValue(getJsonGenerator(), domain); |
| updateLastModifiedTime(Time.monotonicNow()); |
| } |
| } |
| |
| private static class EntityLogFD extends LogFD { |
| public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, |
| boolean isAppendSupported) throws IOException { |
| super(fs, logPath, objMapper, isAppendSupported); |
| } |
| |
| public void writeEntities(List<TimelineEntity> entities) |
| throws IOException { |
| if (writerClosed()) { |
| prepareForWrite(); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Writing entity list of size " + entities.size()); |
| } |
| for (TimelineEntity entity : entities) { |
| getObjectMapper().writeValue(getJsonGenerator(), entity); |
| } |
| updateLastModifiedTime(Time.monotonicNow()); |
| } |
| } |
| |
| private static class LogFD { |
| private FSDataOutputStream stream; |
| private ObjectMapper objMapper; |
| private JsonGenerator jsonGenerator; |
| private long lastModifiedTime; |
| private final boolean isAppendSupported; |
| private final ReentrantLock fdLock = new ReentrantLock(); |
| private final FileSystem fs; |
| private final Path logPath; |
| |
| public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, |
| boolean isAppendSupported) throws IOException { |
| this.fs = fs; |
| this.logPath = logPath; |
| this.isAppendSupported = isAppendSupported; |
| this.objMapper = objMapper; |
| prepareForWrite(); |
| } |
| |
| public void close() { |
| if (stream != null) { |
| IOUtils.cleanup(LOG, jsonGenerator); |
| IOUtils.cleanup(LOG, stream); |
| stream = null; |
| jsonGenerator = null; |
| } |
| } |
| |
| public void flush() throws IOException { |
| if (stream != null) { |
| jsonGenerator.flush(); |
| stream.hflush(); |
| } |
| } |
| |
| public long getLastModifiedTime() { |
| return this.lastModifiedTime; |
| } |
| |
| protected void prepareForWrite() throws IOException{ |
| this.stream = createLogFileStream(fs, logPath); |
| this.jsonGenerator = new JsonFactory().createJsonGenerator(stream); |
| this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); |
| this.lastModifiedTime = Time.monotonicNow(); |
| } |
| |
| protected boolean writerClosed() { |
| return stream == null; |
| } |
| |
| private FSDataOutputStream createLogFileStream(FileSystem fileSystem, |
| Path logPathToCreate) |
| throws IOException { |
| FSDataOutputStream streamToCreate; |
| if (!isAppendSupported) { |
| logPathToCreate = |
| new Path(logPathToCreate.getParent(), |
| (logPathToCreate.getName() + "_" + Time.monotonicNow())); |
| } |
| if (!fileSystem.exists(logPathToCreate)) { |
| streamToCreate = fileSystem.create(logPathToCreate, false); |
| fileSystem.setPermission(logPathToCreate, |
| new FsPermission(FILE_LOG_PERMISSIONS)); |
| } else { |
| streamToCreate = fileSystem.append(logPathToCreate); |
| } |
| return streamToCreate; |
| } |
| |
| public void lock() { |
| this.fdLock.lock(); |
| } |
| |
| public void unlock() { |
| this.fdLock.unlock(); |
| } |
| |
| protected JsonGenerator getJsonGenerator() { |
| return jsonGenerator; |
| } |
| |
| protected ObjectMapper getObjectMapper() { |
| return objMapper; |
| } |
| |
| protected void updateLastModifiedTime(long updatedTime) { |
| this.lastModifiedTime = updatedTime; |
| } |
| } |
| |
| private static class LogFDsCache implements Closeable, Flushable{ |
| private DomainLogFD domainLogFD; |
| private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs; |
| private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, |
| EntityLogFD>> entityLogFDs; |
| private Timer flushTimer = null; |
| private Timer cleanInActiveFDsTimer = null; |
| private Timer monitorTaskTimer = null; |
| private final long ttl; |
| private final ReentrantLock domainFDLocker = new ReentrantLock(); |
| private final ReentrantLock summaryTableLocker = new ReentrantLock(); |
| private final ReentrantLock entityTableLocker = new ReentrantLock(); |
| private final ReentrantLock summaryTableCopyLocker = new ReentrantLock(); |
| private final ReentrantLock entityTableCopyLocker = new ReentrantLock(); |
| private volatile boolean serviceStopped = false; |
| private volatile boolean timerTaskStarted = false; |
| private final ReentrantLock timerTaskLocker = new ReentrantLock(); |
| private final long flushIntervalSecs; |
| private final long cleanIntervalSecs; |
| private final long timerTaskRetainTTL; |
| private volatile long timeStampOfLastWrite = Time.monotonicNow(); |
| private final ReadLock timerTasksMonitorReadLock; |
| private final WriteLock timerTasksMonitorWriteLock; |
| |
| public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, |
| long ttl, long timerTaskRetainTTL) { |
| domainLogFD = null; |
| summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>(); |
| entityLogFDs = new HashMap<ApplicationAttemptId, |
| HashMap<TimelineEntityGroupId, EntityLogFD>>(); |
| this.ttl = ttl * 1000; |
| this.flushIntervalSecs = flushIntervalSecs; |
| this.cleanIntervalSecs = cleanIntervalSecs; |
| long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000; |
| if (timerTaskRetainTTLVar > this.ttl) { |
| this.timerTaskRetainTTL = timerTaskRetainTTLVar; |
| } else { |
| this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000; |
| LOG.warn("The specific " + YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : " |
| + timerTaskRetainTTL + " is invalid, because it is less than or " |
| + "equal to " + YarnConfiguration |
| .TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use " |
| + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " |
| + ttl + " + 120s instead."); |
| } |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| this.timerTasksMonitorReadLock = lock.readLock(); |
| this.timerTasksMonitorWriteLock = lock.writeLock(); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| try { |
| this.domainFDLocker.lock(); |
| if (domainLogFD != null) { |
| domainLogFD.flush(); |
| } |
| } finally { |
| this.domainFDLocker.unlock(); |
| } |
| |
| flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs)); |
| |
| flushEntityFDMap(copyEntityLogFDs(entityLogFDs)); |
| } |
| |
| private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs( |
| Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) { |
| try { |
| summaryTableCopyLocker.lock(); |
| return new HashMap<ApplicationAttemptId, EntityLogFD>( |
| summanyLogFDsToCopy); |
| } finally { |
| summaryTableCopyLocker.unlock(); |
| } |
| } |
| |
| private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, |
| EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId, |
| HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) { |
| try { |
| entityTableCopyLocker.lock(); |
| return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId, |
| EntityLogFD>>(entityLogFDsToCopy); |
| } finally { |
| entityTableCopyLocker.unlock(); |
| } |
| } |
| |
| private void flushSummaryFDMap(Map<ApplicationAttemptId, |
| EntityLogFD> logFDs) throws IOException { |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs |
| .entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| logFD.flush(); |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } |
| |
| private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap< |
| TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException { |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, |
| EntityLogFD>> logFDMapEntry : logFDs.entrySet()) { |
| HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap |
| = logFDMapEntry.getValue(); |
| for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry |
| : logFDMap.entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| logFD.flush(); |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } |
| } |
| |
| private class FlushTimerTask extends TimerTask { |
| @Override |
| public void run() { |
| try { |
| flush(); |
| } catch (Exception e) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(e); |
| } |
| } |
| } |
| } |
| |
| private void cleanInActiveFDs() { |
| long currentTimeStamp = Time.monotonicNow(); |
| try { |
| this.domainFDLocker.lock(); |
| if (domainLogFD != null) { |
| if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) { |
| domainLogFD.close(); |
| domainLogFD = null; |
| } |
| } |
| } finally { |
| this.domainFDLocker.unlock(); |
| } |
| |
| cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs), |
| currentTimeStamp); |
| |
| cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs), |
| currentTimeStamp); |
| } |
| |
| private void cleanInActiveSummaryFDsforMap( |
| Map<ApplicationAttemptId, EntityLogFD> logFDs, |
| long currentTimeStamp) { |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs |
| .entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { |
| logFD.close(); |
| } |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } |
| |
| private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId, |
| HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs, |
| long currentTimeStamp) { |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, HashMap< |
| TimelineEntityGroupId, EntityLogFD>> logFDMapEntry |
| : logFDs.entrySet()) { |
| HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap |
| = logFDMapEntry.getValue(); |
| for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry |
| : logFDMap.entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { |
| logFD.close(); |
| } |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } |
| } |
| |
| private class CleanInActiveFDsTask extends TimerTask { |
| @Override |
| public void run() { |
| try { |
| cleanInActiveFDs(); |
| } catch (Exception e) { |
| LOG.warn(e); |
| } |
| } |
| } |
| |
| private class TimerMonitorTask extends TimerTask { |
| @Override |
| public void run() { |
| try { |
| timerTasksMonitorWriteLock.lock(); |
| monitorTimerTasks(); |
| } finally { |
| timerTasksMonitorWriteLock.unlock(); |
| } |
| } |
| } |
| |
| private void monitorTimerTasks() { |
| if (Time.monotonicNow() - this.timeStampOfLastWrite |
| >= this.timerTaskRetainTTL) { |
| cancelAndCloseTimerTasks(); |
| |
| timerTaskStarted = false; |
| } else { |
| if (this.monitorTaskTimer != null) { |
| this.monitorTaskTimer.schedule(new TimerMonitorTask(), |
| this.timerTaskRetainTTL); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| |
| serviceStopped = true; |
| |
| cancelAndCloseTimerTasks(); |
| } |
| |
| private void cancelAndCloseTimerTasks() { |
| if (flushTimer != null) { |
| flushTimer.cancel(); |
| flushTimer = null; |
| } |
| |
| if (cleanInActiveFDsTimer != null) { |
| cleanInActiveFDsTimer.cancel(); |
| cleanInActiveFDsTimer = null; |
| } |
| |
| if (monitorTaskTimer != null) { |
| monitorTaskTimer.cancel(); |
| monitorTaskTimer = null; |
| } |
| |
| try { |
| this.domainFDLocker.lock(); |
| if (domainLogFD != null) { |
| domainLogFD.close(); |
| domainLogFD = null; |
| } |
| } finally { |
| this.domainFDLocker.unlock(); |
| } |
| |
| closeSummaryFDs(summanyLogFDs); |
| |
| closeEntityFDs(entityLogFDs); |
| } |
| |
| private void closeEntityFDs(Map<ApplicationAttemptId, |
| HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) { |
| try { |
| entityTableLocker.lock(); |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, |
| EntityLogFD>> logFDMapEntry : logFDs.entrySet()) { |
| HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap |
| = logFDMapEntry.getValue(); |
| for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry |
| : logFDMap.entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| logFD.close(); |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } |
| } finally { |
| entityTableLocker.unlock(); |
| } |
| } |
| |
| private void closeSummaryFDs( |
| Map<ApplicationAttemptId, EntityLogFD> logFDs) { |
| try { |
| summaryTableLocker.lock(); |
| if (!logFDs.isEmpty()) { |
| for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry |
| : logFDs.entrySet()) { |
| EntityLogFD logFD = logFDEntry.getValue(); |
| try { |
| logFD.lock(); |
| logFD.close(); |
| } finally { |
| logFD.unlock(); |
| } |
| } |
| } |
| } finally { |
| summaryTableLocker.unlock(); |
| } |
| } |
| |
| public void writeDomainLog(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, TimelineDomain domain, |
| boolean isAppendSupported) throws IOException { |
| checkAndStartTimeTasks(); |
| try { |
| this.domainFDLocker.lock(); |
| if (this.domainLogFD != null) { |
| this.domainLogFD.writeDomain(domain); |
| } else { |
| this.domainLogFD = |
| new DomainLogFD(fs, logPath, objMapper, isAppendSupported); |
| this.domainLogFD.writeDomain(domain); |
| } |
| } finally { |
| this.domainFDLocker.unlock(); |
| } |
| } |
| |
| public void writeEntityLogs(FileSystem fs, Path entityLogPath, |
| ObjectMapper objMapper, ApplicationAttemptId appAttemptId, |
| TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity, |
| boolean isAppendSupported) throws IOException{ |
| checkAndStartTimeTasks(); |
| writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, |
| groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); |
| } |
| |
| private void writeEntityLogs(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, ApplicationAttemptId attemptId, |
| TimelineEntityGroupId groupId, List<TimelineEntity> entities, |
| boolean isAppendSupported, Map<ApplicationAttemptId, HashMap< |
| TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException { |
| HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD |
| = logFDs.get(attemptId); |
| if (logMapFD != null) { |
| EntityLogFD logFD = logMapFD.get(groupId); |
| if (logFD != null) { |
| try { |
| logFD.lock(); |
| if (serviceStopped) { |
| return; |
| } |
| logFD.writeEntities(entities); |
| } finally { |
| logFD.unlock(); |
| } |
| } else { |
| createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, |
| entities, isAppendSupported, logFDs); |
| } |
| } else { |
| createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, |
| entities, isAppendSupported, logFDs); |
| } |
| } |
| |
| private void createEntityFDandWrite(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, ApplicationAttemptId attemptId, |
| TimelineEntityGroupId groupId, List<TimelineEntity> entities, |
| boolean isAppendSupported, Map<ApplicationAttemptId, HashMap< |
| TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{ |
| try { |
| entityTableLocker.lock(); |
| if (serviceStopped) { |
| return; |
| } |
| HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = |
| logFDs.get(attemptId); |
| if (logFDMap == null) { |
| logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>(); |
| } |
| EntityLogFD logFD = logFDMap.get(groupId); |
| if (logFD == null) { |
| logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); |
| } |
| try { |
| logFD.lock(); |
| logFD.writeEntities(entities); |
| try { |
| entityTableCopyLocker.lock(); |
| logFDMap.put(groupId, logFD); |
| logFDs.put(attemptId, logFDMap); |
| } finally { |
| entityTableCopyLocker.unlock(); |
| } |
| } finally { |
| logFD.unlock(); |
| } |
| } finally { |
| entityTableLocker.unlock(); |
| } |
| } |
| |
| public void writeSummaryEntityLogs(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, ApplicationAttemptId attemptId, |
| List<TimelineEntity> entities, boolean isAppendSupported) |
| throws IOException { |
| checkAndStartTimeTasks(); |
| writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities, |
| isAppendSupported, this.summanyLogFDs); |
| } |
| |
| private void writeSummmaryEntityLogs(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, ApplicationAttemptId attemptId, |
| List<TimelineEntity> entities, boolean isAppendSupported, |
| Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException { |
| EntityLogFD logFD = null; |
| logFD = logFDs.get(attemptId); |
| if (logFD != null) { |
| try { |
| logFD.lock(); |
| if (serviceStopped) { |
| return; |
| } |
| logFD.writeEntities(entities); |
| } finally { |
| logFD.unlock(); |
| } |
| } else { |
| createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities, |
| isAppendSupported, logFDs); |
| } |
| } |
| |
| private void createSummaryFDAndWrite(FileSystem fs, Path logPath, |
| ObjectMapper objMapper, ApplicationAttemptId attemptId, |
| List<TimelineEntity> entities, boolean isAppendSupported, |
| Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException { |
| try { |
| summaryTableLocker.lock(); |
| if (serviceStopped) { |
| return; |
| } |
| EntityLogFD logFD = logFDs.get(attemptId); |
| if (logFD == null) { |
| logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); |
| } |
| try { |
| logFD.lock(); |
| logFD.writeEntities(entities); |
| try { |
| summaryTableCopyLocker.lock(); |
| logFDs.put(attemptId, logFD); |
| } finally { |
| summaryTableCopyLocker.unlock(); |
| } |
| } finally { |
| logFD.unlock(); |
| } |
| } finally { |
| summaryTableLocker.unlock(); |
| } |
| } |
| |
| private void createAndStartTimerTasks() { |
| this.flushTimer = |
| new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", |
| true); |
| this.flushTimer.schedule(new FlushTimerTask(), flushIntervalSecs * 1000, |
| flushIntervalSecs * 1000); |
| |
| this.cleanInActiveFDsTimer = |
| new Timer(LogFDsCache.class.getSimpleName() |
| + "cleanInActiveFDsTimer", true); |
| this.cleanInActiveFDsTimer.schedule(new CleanInActiveFDsTask(), |
| cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); |
| |
| this.monitorTaskTimer = |
| new Timer(LogFDsCache.class.getSimpleName() + "MonitorTimer", |
| true); |
| this.monitorTaskTimer.schedule(new TimerMonitorTask(), |
| this.timerTaskRetainTTL); |
| } |
| |
| private void checkAndStartTimeTasks() { |
| try { |
| this.timerTasksMonitorReadLock.lock(); |
| this.timeStampOfLastWrite = Time.monotonicNow(); |
| if(!timerTaskStarted) { |
| try { |
| timerTaskLocker.lock(); |
| if (!timerTaskStarted) { |
| createAndStartTimerTasks(); |
| timerTaskStarted = true; |
| } |
| } finally { |
| timerTaskLocker.unlock(); |
| } |
| } |
| } finally { |
| this.timerTasksMonitorReadLock.unlock(); |
| } |
| } |
| } |
| |
| private static class AttemptDirCache { |
| private final int attemptDirCacheSize; |
| private final Map<ApplicationAttemptId, Path> attemptDirCache; |
| private final FileSystem fs; |
| private final Path activePath; |
| private final UserGroupInformation authUgi; |
| private final boolean storeInsideUserDir; |
| |
| public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath, |
| UserGroupInformation ugi, boolean storeInsideUserDir) { |
| this.attemptDirCacheSize = cacheSize; |
| this.attemptDirCache = |
| new LinkedHashMap<ApplicationAttemptId, Path>( |
| attemptDirCacheSize, 0.75f, true) { |
| private static final long serialVersionUID = 1L; |
| @Override |
| protected boolean removeEldestEntry( |
| Map.Entry<ApplicationAttemptId, Path> eldest) { |
| return size() > attemptDirCacheSize; |
| } |
| }; |
| this.fs = fs; |
| this.activePath = activePath; |
| this.authUgi = ugi; |
| this.storeInsideUserDir = storeInsideUserDir; |
| } |
| |
| public Path getAppAttemptDir(ApplicationAttemptId attemptId) |
| throws IOException { |
| Path attemptDir = this.attemptDirCache.get(attemptId); |
| if (attemptDir == null) { |
| synchronized(this) { |
| attemptDir = this.attemptDirCache.get(attemptId); |
| if (attemptDir == null) { |
| attemptDir = createAttemptDir(attemptId); |
| attemptDirCache.put(attemptId, attemptDir); |
| } |
| } |
| } |
| return attemptDir; |
| } |
| |
| private Path createAttemptDir(ApplicationAttemptId appAttemptId) |
| throws IOException { |
| Path appDir = createApplicationDir(appAttemptId.getApplicationId()); |
| |
| Path attemptDir = new Path(appDir, appAttemptId.toString()); |
| if (FileSystem.mkdirs(fs, attemptDir, |
| new FsPermission(APP_LOG_DIR_PERMISSIONS))) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("New attempt directory created - " + attemptDir); |
| } |
| } |
| return attemptDir; |
| } |
| |
| private Path createApplicationDir(ApplicationId appId) throws IOException { |
| Path appRootDir = getAppRootDir(authUgi.getShortUserName()); |
| Path appDir = new Path(appRootDir, appId.toString()); |
| if (FileSystem.mkdirs(fs, appDir, |
| new FsPermission(APP_LOG_DIR_PERMISSIONS))) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("New app directory created - " + appDir); |
| } |
| } |
| return appDir; |
| } |
| |
| private Path getAppRootDir(String user) throws IOException { |
| if (!storeInsideUserDir) { |
| return activePath; |
| } |
| Path userDir = new Path(activePath, user); |
| if (FileSystem.mkdirs(fs, userDir, |
| new FsPermission(APP_LOG_DIR_PERMISSIONS))) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("New user directory created - " + userDir); |
| } |
| } |
| return userDir; |
| } |
| } |
| } |