blob: ebf81f2d47387f2222d9f30c8393603c618049ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.monitor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.FileUtils;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This monitor class periodically checks for the presence
* of stale store directories and deletes them if the
* samza task that created it is not running
* for more than X days.
*/
public class LocalStoreMonitor implements Monitor {
private static final Clock CLOCK = SystemClock.instance();
private static final Logger LOG = LoggerFactory.getLogger(LocalStoreMonitor.class);
private static final String OFFSET_FILE_NAME = StorageManagerUtil.OFFSET_FILE_NAME_NEW;
private final JobsClient jobsClient;
private final LocalStoreMonitorConfig config;
private final LocalStoreMonitorMetrics localStoreMonitorMetrics;
public LocalStoreMonitor(LocalStoreMonitorConfig config,
LocalStoreMonitorMetrics localStoreMonitorMetrics,
JobsClient jobsClient) {
Preconditions.checkState(!Strings.isNullOrEmpty(config.getLocalStoreBaseDir()),
String.format("%s is not set in config.", LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR));
this.config = config;
this.jobsClient = jobsClient;
this.localStoreMonitorMetrics = localStoreMonitorMetrics;
}
/**
* This monitor method is invoked periodically to delete the stale state stores
* of dead jobs/tasks.
* @throws Exception if there was any problem in running the monitor.
*/
@Override
public void monitor() throws Exception {
File localStoreDir = new File(config.getLocalStoreBaseDir());
Preconditions.checkState(localStoreDir.isDirectory(),
String.format("LocalStoreDir: %s is not a directory", localStoreDir.getAbsolutePath()));
String localHostName = InetAddress.getLocalHost().getHostName();
for (JobInstance jobInstance : getHostAffinityEnabledJobs(localStoreDir)) {
File jobDir = new File(localStoreDir,
String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
try {
JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus);
for (Task task : jobsClient.getTasks(jobInstance)) {
LOG.info("Evaluating stores for task: {}", task);
for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
/**
* A task store is active if all of the following conditions are true:
* a) If the store is amongst the active stores of the task.
* b) If the job has been started.
* c) If the preferred host of the task is the localhost on which the monitor is run.
*/
if (jobStatus.hasBeenStarted()
&& task.getStoreNames().contains(storeName)
&& task.getPreferredHost().equals(localHostName)) {
LOG.info(String.format("Local store: %s is actively used by the task: %s.", storeName, task.getTaskName()));
} else {
LOG.info(String.format("Local store: %s not used by the task: %s.", storeName, task.getTaskName()));
markSweepTaskStore(new StorageManagerUtil().getTaskStoreDir(jobDir, storeName,
new TaskName(task.getTaskName()), TaskMode.Active));
}
}
}
} catch (Exception ex) {
localStoreMonitorMetrics.failedStoreDeletionAttempts.inc();
if (!config.getIgnoreFailures()) {
throw ex;
}
LOG.warn("Config: {} turned on, failures will be ignored. Local store cleanup for job: {} resulted in exception: {}.",
new Object[]{LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, jobInstance, ex});
}
}
}
/**
* Helper method to find and return the list of host affinity enabled jobs on this NM.
* @param localStoreDirFile the location in which all stores of host affinity enabled jobs are persisted.
* @return the list of the host affinity enabled jobs that are installed on this NM.
*/
private static List<JobInstance> getHostAffinityEnabledJobs(File localStoreDirFile) {
List<JobInstance> jobInstances = new ArrayList<>();
for (File jobStore : localStoreDirFile.listFiles(File::isDirectory)) {
// Name of the jobStore(jobStorePath) is of the form : ${job.name}-${job.id}.
String jobStorePath = jobStore.getName();
int indexSeparator = jobStorePath.lastIndexOf("-");
if (indexSeparator != -1) {
jobInstances.add(new JobInstance(jobStorePath.substring(0, indexSeparator),
jobStorePath.substring(indexSeparator + 1)));
}
}
return jobInstances;
}
/**
* Role of this method is to garbage collect(mark-sweep) the task store.
* @param taskStoreDir store directory of the task to perform garbage collection.
*
* This method cleans up each of the task store directory in two phases.
*
* Phase 1:
* Delete the offset file in the task store if (curTime - lastModifiedTimeOfOffsetFile) > offsetTTL.
*
* Phase 2:
* Delete the task store directory if the offsetFile does not exist in task store directory.
*
* The separate phases are a safety precaution to prevent deleting a store that is currently being used.
* A running task will recreate the deleted offset file on the next commit. If a task is not running or
* running on a different host and gets moved to this host, it will not use a store without the offset file.
*
* Time interval between the two phases is controlled by this monitor scheduling
* interval in milli seconds.
*
* @throws IOException if there is an exception during the clean up of the task store files.
*/
private void markSweepTaskStore(File taskStoreDir) throws IOException {
String taskStorePath = taskStoreDir.getAbsolutePath();
File offsetFile = new File(taskStoreDir, OFFSET_FILE_NAME);
if (!offsetFile.exists()) {
LOG.info("Deleting the task store: {}, since it has no offset file.", taskStorePath);
long taskStoreSizeInBytes = taskStoreDir.getTotalSpace();
FileUtils.deleteDirectory(taskStoreDir);
localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(taskStoreSizeInBytes);
localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
} else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} "
+ "is older than the configured ttl: {}.",
taskStorePath, offsetFile.lastModified(), config.getOffsetFileTTL());
offsetFile.delete();
}
}
}