blob: 48cb85308c0a9098bd845ca8baa9a221e29bc400 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.view;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Thread.sleep;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.zip.ZipFile;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
public class ViewDirectoryWatcher implements DirectoryWatcher {
public static final int FIXED_FILE_COUNTER = 30;
public static final int FILE_CHECK_INTERVAL_MILLIS = 200;
// Global configuration
@Inject
Configuration configuration;
// View Registry
@Inject
ViewRegistry viewRegistry;
private WatchService watchService;
// Executor service on which the watcher will run
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private Future<?> watchTask;
private static Log LOG = LogFactory.getLog(ViewDirectoryWatcher.class);
// Callbacks to hook into file processing
private List<Function<Path, Boolean>> hooks = Lists.newArrayList(loggingHook());
public void addHook(Function<Path, Boolean> hook) {
hooks.add(hook);
}
private Function<Path, Boolean> loggingHook() {
return new Function<Path, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable Path path) {
LOG.info("Finished processing the view definition for" + path);
return true;
}
};
}
public void start() {
try {
Path path = buildWatchService();
Runnable task = startWatching(path);
watchTask = executorService.submit(task);
} catch (Exception e) {
LOG.error("There were errors in starting the view directory watcher. This task will not run", e);
}
}
@SuppressWarnings("unchecked")
private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
private Runnable startWatching(final Path path) {
return new Runnable() {
@Override
public void run() {
try {
while (true) {
// wait for key , park the thread meanwhile
WatchKey key = watchService.take();
LOG.info("Watcher Key was signalled");
for (WatchEvent<?> event : key.pollEvents()) {
LOG.info("Watcher recieved poll event");
WatchEvent<Path> ev = cast(event);
Path resolvedPath = path.resolve(ev.context());
LOG.info(String.format("Event %s: %s\n", ev.kind(), resolvedPath));
if (!canBlockTillFileAvailable(resolvedPath)) {
LOG.info("Watcher detected that the file was either empty or corrupt");
continue;
}
if (!verify(resolvedPath)) {
LOG.info("The uploaded file was 1> Empty 2> Not a regular file or 3> Not a valid Jar archive file");
continue;
}
try {
LOG.info("Starting view extraction");
viewRegistry.readViewArchive(resolvedPath);
// fire registered hooks
for (Function<Path, Boolean> hook : hooks) {
hook.apply(resolvedPath);
}
} catch (Exception e) {
LOG.error("Cannot read the view archive, offending file: " + resolvedPath, e);
}
}
// reset key
if (!key.reset()) {
//watch key is invalid, break out
LOG.error("The watch key could not be reset, Directory watcher will not run anymore");
break;
}
}
} catch (InterruptedException x) {
LOG.info("Cancelling the directory watcher", x);
return;
}
}
};
}
/**
* Routine to make the file watcher block the thread till the file is completely copied
* Check the length of the file continuously till there are 20 consecutive intervals when
* the file length does not change
* FILE_CHECK_INTERVAL_MILLIS defines the check interval both for detecting empty files
* and subsequent checks to detect if a file has finished copying
*
* The process which copies the jar into the views dir is external and we dont really
* know when it would finish, this is also highly OS and FS dependent. The following routine
* introduces a heuristic to detect when a file has finished copying by looking at subsequent
* lengths of the file which was detected as being created
*
* This would block for ~ 7 seconds in most cases
*
*
* @param resolvedPath
* @return false if the file check failed, true otherwise
*/
private boolean canBlockTillFileAvailable(Path resolvedPath) throws InterruptedException {
long oldLength;
long newSize;
long emptyCheck = 0;
int fixed = 0;
// get the underlying file
File file = resolvedPath.toAbsolutePath().toFile();
// empty file check
while (file.length() == 0 && emptyCheck < 5) {
sleep(FILE_CHECK_INTERVAL_MILLIS);
emptyCheck++;
}
// The file seems to be empty
if (emptyCheck == 5)
return false;
// check the file size
oldLength = file.length();
// Check if file copy is done
while (true) {
LOG.info("Waiting for file to be completely copied");
sleep(FILE_CHECK_INTERVAL_MILLIS);
newSize = file.length();
if (newSize > oldLength) {
oldLength = newSize;
continue;
} else if (oldLength == newSize) {
fixed++;
} else {
// this can never happen,
return false;
}
if (fixed > FIXED_FILE_COUNTER) {
LOG.info("File " + resolvedPath + " has finished copying");
return true;
}
}
}
/**
* Sanity check to validate if the detected path is a valid archive file
* is not a directory, also check that the file is not empty
*
* @param resolvedPath
* @return
*/
private boolean verify(Path resolvedPath) {
ZipFile zipFile = null;
try {
File file = resolvedPath.toAbsolutePath().toFile();
checkArgument(!file.isDirectory());
checkArgument(file.length() > 0);
zipFile = new ZipFile(file);
} catch (Exception e) {
LOG.info("Verification failed ", e);
return false;
} finally {
if (zipFile != null) {
try {
zipFile.close();
} catch (IOException e) {
}
}
}
return true;
}
private Path buildWatchService() throws IOException {
// Get the directory for view Archives
//Attach a file watcher at this directory, Extracted work directory will be ignored
File viewsDir = configuration.getViewsDir();
Path path = Paths.get(viewsDir.getAbsolutePath());
watchService = path.getFileSystem().newWatchService();
//Watch vews directory for creation events
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
return path;
}
@Override
public boolean isRunning() {
if (watchTask != null)
return !(watchTask.isDone());
return false;
}
@Override
public void stop() {
watchTask.cancel(true);
}
}