blob: 9c43ddef92a116e5dd88e2c807ded9c055799ec1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Log Handler which schedules deletion of log files based on the configured log
* retention time.
*/
public class NonAggregatingLogHandler extends AbstractService implements
LogHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NonAggregatingLogHandler.class);
private final Dispatcher dispatcher;
private final DeletionService delService;
private final Map<ApplicationId, String> appOwners;
private final LocalDirsHandlerService dirsHandler;
private final NMStateStoreService stateStore;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
DeletionService delService, LocalDirsHandlerService dirsHandler,
NMStateStoreService stateStore) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
this.dirsHandler = dirsHandler;
this.stateStore = stateStore;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
sched = createScheduledThreadPoolExecutor(conf);
super.serviceInit(conf);
recover();
}
@Override
protected void serviceStop() throws Exception {
if (sched != null) {
sched.shutdown();
boolean isShutdown = false;
try {
isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
sched.shutdownNow();
isShutdown = true;
}
if (!isShutdown) {
sched.shutdownNow();
}
}
super.serviceStop();
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
private void recover() throws IOException {
if (stateStore.canRecover()) {
RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
long now = System.currentTimeMillis();
for (Map.Entry<ApplicationId, LogDeleterProto> entry :
state.getLogDeleterMap().entrySet()) {
ApplicationId appId = entry.getKey();
LogDeleterProto proto = entry.getValue();
long deleteDelayMsec = proto.getDeletionTime() - now;
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling deletion of " + appId + " logs in "
+ deleteDelayMsec + " msec");
}
LogDeleterRunnable logDeleter =
new LogDeleterRunnable(proto.getUser(), appId);
try {
sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// Handling this event in local thread before starting threads
// or after calling sched.shutdownNow().
logDeleter.run();
}
}
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartedEvent =
(LogHandlerAppStartedEvent) event;
this.appOwners.put(appStartedEvent.getApplicationId(),
appStartedEvent.getUser());
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appStartedEvent.getApplicationId(),
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED));
break;
case CONTAINER_FINISHED:
// Ignore
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
ApplicationId appId = appFinishedEvent.getApplicationId();
// Schedule - so that logs are available on the UI till they're deleted.
LOG.info("Scheduling Log Deletion for application: "
+ appId + ", with delay of "
+ this.deleteDelaySeconds + " seconds");
String user = appOwners.remove(appId);
if (user == null) {
LOG.error("Unable to locate user for " + appId);
// send LOG_HANDLING_FAILED out
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
break;
}
LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);
long deletionTimestamp = System.currentTimeMillis()
+ this.deleteDelaySeconds * 1000;
LogDeleterProto deleterProto = LogDeleterProto.newBuilder()
.setUser(user)
.setDeletionTime(deletionTimestamp)
.build();
try {
stateStore.storeLogDeleter(appId, deleterProto);
} catch (IOException e) {
LOG.error("Unable to record log deleter state", e);
}
try {
sched.schedule(logDeleter, this.deleteDelaySeconds,
TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
// Handling this event in local thread before starting threads
// or after calling sched.shutdownNow().
logDeleter.run();
}
break;
default:
; // Ignore
}
}
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
new HadoopScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;
}
class LogDeleterRunnable implements Runnable {
private String user;
private ApplicationId applicationId;
public LogDeleterRunnable(String user, ApplicationId applicationId) {
this.user = user;
this.applicationId = applicationId;
}
@Override
@SuppressWarnings("unchecked")
public void run() {
List<Path> localAppLogDirs = new ArrayList<Path>();
FileContext lfs = getLocalFileContext(getConfig());
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
Path logDir = new Path(rootLogDir, applicationId.toString());
try {
lfs.getFileStatus(logDir);
localAppLogDirs.add(logDir);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Unsupported file system used for log dir " + logDir, ue);
continue;
} catch (IOException ie) {
continue;
}
}
// Inform the application before the actual delete itself, so that links
// to logs will no longer be there on NM web-UI.
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.applicationId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
if (localAppLogDirs.size() > 0) {
FileDeletionTask deletionTask = new FileDeletionTask(
NonAggregatingLogHandler.this.delService, user, null,
localAppLogDirs);
NonAggregatingLogHandler.this.delService.delete(deletionTask);
}
try {
NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
this.applicationId);
} catch (IOException e) {
LOG.error("Error removing log deletion state", e);
}
}
@Override
public String toString() {
return "LogDeleter for AppId " + this.applicationId.toString()
+ ", owned by " + user;
}
}
}