blob: b3d948380290890f8476ba641f28a77f08d5d5e3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to watch a file for changes. A thread periodically checks the file
* modification time and uses the provided {@link FileChangeListener} to notify
* a consumer.
*/
public class FileWatchService {
final static Logger LOG = LoggerFactory.getLogger(FileWatchService.class);
// Default time to wait between checking the file.
static final long DEFAULT_CHECK_INTERVAL_MS = 10 * 1000;
// Time between checking for changes. Mutable for unit tests.
private long checkIntervalMs_ = DEFAULT_CHECK_INTERVAL_MS;
// Future returned by scheduleAtFixedRate(), needed to stop the checking thread.
private ScheduledFuture<?> fileCheckFuture_;
private final AtomicBoolean running_;
private final FileChangeListener changeListener_; // Used to notify when changes occur.
private final File file_; // The file to check for changes.
private boolean alreadyWarned_; // Avoid repeatedly warning if the file is missing
private long prevChange_; // Time of the last observed change
/**
* Listener used to notify of file changes.
*/
public interface FileChangeListener {
/**
* Called when the file changes.
*/
void onFileChange();
}
public FileWatchService(File file, FileChangeListener listener) {
Preconditions.checkNotNull(file);
Preconditions.checkNotNull(listener);
Preconditions.checkArgument(file.exists());
running_ = new AtomicBoolean(false);
file_ = file;
changeListener_ = listener;
prevChange_ = 0L;
alreadyWarned_ = false;
}
/**
* Set the time (in milliseconds) to wait between checking the file for changes.
* Only used in tests.
*/
@VisibleForTesting
public void setCheckIntervalMs(long checkIntervalMs) {
checkIntervalMs_ = checkIntervalMs;
}
/**
* Checks if the file has changed since the last observed change and if so,
* notifies the listener.
*/
private void checkFile() {
if (file_.exists()) {
long lastChange = file_.lastModified();
if (lastChange > prevChange_) {
changeListener_.onFileChange();
prevChange_ = lastChange;
alreadyWarned_ = false;
}
} else {
if (!alreadyWarned_) {
LOG.warn("File does not exist: {}", file_.getPath());
alreadyWarned_ = true;
}
}
}
/**
* Starts the thread to check for file changes. Continues checking for file changes
* every 'checkIntervalMs_' milliseconds until stop() is called.
*/
public synchronized void start() {
Preconditions.checkState(!running_.get());
running_.set(true);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FileWatchThread(" + file_.getPath() + ")-%d")
.build());
fileCheckFuture_ = executor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
checkFile();
} catch (SecurityException e) {
LOG.warn("Not allowed to check read file existence: " + file_.getPath(), e);
}
}
}, 0L, checkIntervalMs_, TimeUnit.MILLISECONDS);
}
/**
* Stops the file watching thread.
*/
public synchronized void stop() {
Preconditions.checkState(running_.get());
running_.set(false);
fileCheckFuture_.cancel(false);
}
}