blob: a90912e6885ecfbb5b89692f68f9c58cd64be922 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Log Handler which schedules deletion of log files based on the configured log
* retention time.
*/
public class NonAggregatingLogHandler extends AbstractService implements
LogHandler {
private static final Log LOG = LogFactory
.getLog(NonAggregatingLogHandler.class);
private final Dispatcher dispatcher;
private final DeletionService delService;
private final Map<ApplicationId, String> appOwners;
private final LocalDirsHandlerService dirsHandler;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
DeletionService delService, LocalDirsHandlerService dirsHandler) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
this.dirsHandler = dirsHandler;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@Override
public void init(Configuration conf) {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
sched = createScheduledThreadPoolExecutor(conf);
super.init(conf);
}
@Override
public void stop() {
sched.shutdown();
boolean isShutdown = false;
try {
isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
sched.shutdownNow();
isShutdown = true;
}
if (!isShutdown) {
sched.shutdownNow();
}
super.stop();
}
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartedEvent =
(LogHandlerAppStartedEvent) event;
this.appOwners.put(appStartedEvent.getApplicationId(),
appStartedEvent.getUser());
break;
case CONTAINER_FINISHED:
// Ignore
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
// Schedule - so that logs are available on the UI till they're deleted.
LOG.info("Scheduling Log Deletion for application: "
+ appFinishedEvent.getApplicationId() + ", with delay of "
+ this.deleteDelaySeconds + " seconds");
sched.schedule(
new LogDeleterRunnable(appOwners.remove(appFinishedEvent
.getApplicationId()), appFinishedEvent.getApplicationId()),
this.deleteDelaySeconds, TimeUnit.SECONDS);
break;
default:
; // Ignore
}
}
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
new ScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;
}
class LogDeleterRunnable implements Runnable {
private String user;
private ApplicationId applicationId;
public LogDeleterRunnable(String user, ApplicationId applicationId) {
this.user = user;
this.applicationId = applicationId;
}
@Override
@SuppressWarnings("unchecked")
public void run() {
List<String> rootLogDirs =
NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
index++;
}
// Inform the application before the actual delete itself, so that links
// to logs will no longer be there on NM web-UI.
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.applicationId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
NonAggregatingLogHandler.this.delService.delete(user, null,
localAppLogDirs);
}
@Override
public String toString() {
return "LogDeleter for AppId " + this.applicationId.toString()
+ ", owned by " + user;
}
}
}