blob: 68121d6ff36c68189ab31bacb85c6a978019ac2c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.airavata.datacat.agent.monitor.impl;
import org.apache.airavata.datacat.agent.dispatcher.DispatcherService;
import org.apache.airavata.datacat.agent.monitor.IMonitor;
import org.apache.airavata.datacat.agent.util.AgentProperties;
import org.apache.airavata.datacat.agent.util.Constants;
import org.apache.airavata.datacat.models.OutputMonitorMessage;
import org.apache.airavata.datacat.models.OutputMonitorMessageType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.*;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static java.nio.file.StandardWatchEventKinds.*;
/**
* This monitor is specifically targeting the GridChem use case
*/
public class LocalFileSystemMonitor implements IMonitor {
private final Logger logger = LogManager.getLogger(LocalFileSystemMonitor.class);
private final WatchService watcher;
private final Map<WatchKey, Path> keys;
private boolean runMonitor = false;
private boolean trace = false;
private long waitTime;
private ArrayList<OutputInfo> snapshot;
private String serialisedFileName = "snapshot.ser";
private DispatcherService dispatcherService;
public LocalFileSystemMonitor() throws Exception {
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<WatchKey, Path>();
waitTime = Integer.parseInt(
AgentProperties.getInstance().getProperty(Constants.BATCH_MONITOR_WAIT_TIME, "1000"));
dispatcherService = DispatcherService.getInstance();
runMonitor = false;
if ((new File(serialisedFileName)).exists()) {
snapshot = (ArrayList<OutputInfo>) (new ObjectInputStream(new FileInputStream(serialisedFileName))).readObject();
} else {
logger.info("Starting Batch Monitor Scan...");
ArrayList<OutputInfo> currentFiles = getCurrentFilesList(
getCurrentFilesMap(AgentProperties.getInstance().getProperty(Constants.DATA_ROOT, "")));
ArrayList<OutputMonitorMessage> outputMonitorMessages = new ArrayList<OutputMonitorMessage>();
for (int i = 0; i < currentFiles.size(); i++) {
OutputMonitorMessage fileWatcherMessage = new OutputMonitorMessage();
//fileWatcherMessage.setOutputPath(currentFiles.get(i).getOutputName());
fileWatcherMessage.setOutputPath(currentFiles.get(i).getOutputPath());
fileWatcherMessage.setFileMonitorMessageType(OutputMonitorMessageType.FILE_CREATED);
outputMonitorMessages.add(fileWatcherMessage);
}
logger.info("Ending Batch Monitor Scan...");
for (int i = 0; i < outputMonitorMessages.size(); i++) {
logger.info("Adding file monitor message for "+ outputMonitorMessages.get(i).getOutputPath());
dispatcherService.addFileMonitorMessage(outputMonitorMessages.get(i));
}
ObjectOutputStream outputStream = new ObjectOutputStream(new FileOutputStream(serialisedFileName));
outputStream.writeObject(currentFiles);
this.snapshot = currentFiles;
}
}
/**
* Start directory monitoring
*/
@Override
public void startMonitor(Path path) throws IOException {
init(path);
runMonitor = true;
processEvents();
logger.info("Started directory watching");
}
/**
* Stop directory monitoring
*/
@Override
public void stopMonitor() {
runMonitor = false;
logger.info("Stopped directory watching");
}
/**
* Initializing the FileSystemMonitor
*
* @param path
* @throws java.io.IOException
*/
private void init(Path path) throws IOException {
this.trace = false;
logger.info("Scanning " + path);
registerAll(path);
logger.info("Done.");
// enable trace after initial registration
this.trace = true;
}
/**
* Register the given directory with the WatchService
*/
private void register(Path dir) throws IOException {
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
if (trace) {
Path prev = keys.get(key);
if (prev == null) {
logger.info("register: " + dir);
} else {
if (!dir.equals(prev)) {
logger.info("update: " + prev + " -> " + dir);
}
}
}
keys.put(key, dir);
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*/
private void registerAll(final Path start) throws IOException {
// register directory and sub-directories
Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
register(dir);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Processing directory update events
*/
@SuppressWarnings("unchecked")
private void processEvents() {
(new Thread(new Runnable() {
@Override
public void run() {
while (runMonitor) {
try {
logger.info("Starting Batch Monitor Scan...");
Thread.sleep(waitTime);
HashMap<String, OutputInfo> currentFilesMap = getCurrentFilesMap(
AgentProperties.getInstance().getProperty(Constants.DATA_ROOT, ""));
ArrayList<OutputInfo> currentFilesList = getCurrentFilesList(currentFilesMap);
ArrayList<OutputMonitorMessage> outputMonitorMessages = new ArrayList<OutputMonitorMessage>();
for (int i = 0; i < snapshot.size(); i++) {
OutputInfo fSnap = snapshot.get(i);
OutputInfo fCurrent = currentFilesMap.get(fSnap.getOutputPath());
if (fCurrent == null) {
OutputMonitorMessage fileWatcherMessage = new OutputMonitorMessage();
fileWatcherMessage.setOutputPath(fSnap.getOutputName());
fileWatcherMessage.setOutputPath(fSnap.getOutputPath());
fileWatcherMessage.setFileMonitorMessageType(OutputMonitorMessageType.FILE_DELETED);
outputMonitorMessages.add(fileWatcherMessage);
} else {
if (fSnap.getLastModifiedTime() != fCurrent.getLastModifiedTime()) {
OutputMonitorMessage fileWatcherMessage = new OutputMonitorMessage();
fileWatcherMessage.setOutputPath(fSnap.getOutputName());
fileWatcherMessage.setOutputPath(fSnap.getOutputPath());
fileWatcherMessage.setFileMonitorMessageType(OutputMonitorMessageType.FILE_MODIFIED);
outputMonitorMessages.add(fileWatcherMessage);
}
currentFilesMap.remove(fSnap.getOutputPath());
}
}
ArrayList<OutputInfo> newlyCreatedFiles = new ArrayList<OutputInfo>(currentFilesMap.values());
for (int i = 0; i < newlyCreatedFiles.size(); i++) {
OutputInfo fNew = newlyCreatedFiles.get(i);
OutputMonitorMessage outputMonitorMessage = new OutputMonitorMessage();
outputMonitorMessage.setOutputPath(fNew.getOutputName());
outputMonitorMessage.setOutputPath(fNew.getOutputPath());
outputMonitorMessage.setFileMonitorMessageType(OutputMonitorMessageType.FILE_MODIFIED);
outputMonitorMessages.add(outputMonitorMessage);
}
logger.info("Ending Batch Monitor Scan...");
for (int i = 0; i < outputMonitorMessages.size(); i++) {
logger.info("Adding file monitor message for "+ outputMonitorMessages.get(i).getOutputPath());
dispatcherService.addFileMonitorMessage(outputMonitorMessages.get(i));
}
ObjectOutputStream outputStream = new ObjectOutputStream(new FileOutputStream(serialisedFileName));
outputStream.writeObject(currentFilesList);
logger.info("Successfully serialized new directory snapshot...");
snapshot = currentFilesList;
} catch (InterruptedException e) {
logger.error(e.toString());
} catch (IOException e){
logger.error(e.toString());
}
}
}
})).start();
}
private HashMap<String, OutputInfo> getCurrentFilesMap(String path) {
HashMap<String, OutputInfo> currentFiles = new HashMap<String, OutputInfo>();
File root = new File(path);
File[] list = root.listFiles();
if (list == null) return currentFiles;
for (File f : list) {
if (f.isDirectory()) {
OutputInfo newOutputInfo = new OutputInfo(f.getName(), f.getPath(), f.lastModified());
currentFiles.put(f.getPath(), newOutputInfo);
}
}
return currentFiles;
}
private ArrayList<OutputInfo> getCurrentFilesList(HashMap<String, OutputInfo> currentFilesMap) {
ArrayList<OutputInfo> currentFiles = new ArrayList<OutputInfo>(currentFilesMap.values());
return currentFiles;
}
}