blob: c137e76cb0497615e026bf3dfaef7a586d57dc0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.dlab.backendapi.core.response.folderlistener;
import com.epam.dlab.backendapi.core.FileHandlerCallback;
import com.epam.dlab.backendapi.core.response.folderlistener.WatchItem.ItemStatus;
import com.epam.dlab.backendapi.core.response.handlers.dao.CallbackHandlerDao;
import com.epam.dlab.exceptions.DlabException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import static com.epam.dlab.backendapi.core.Constants.JSON_EXTENSION;
/**
* Listen the directories for the files creation and runs the file processing by {@link AsyncFileHandler}.
*/
public class FolderListener implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(FolderListener.class);
/**
* Timeout of the check the file creation in milliseconds.
*/
public static final long LISTENER_TIMEOUT_MILLLIS = 1000;
/**
* Timeout of the idle for the folder listener in milliseconds.
*/
public static final long LISTENER_IDLE_TIMEOUT_MILLLIS = 600L * 1000L;
/**
* Timeout of waiting for the directory creation in milliseconds.
*/
private static final long WAIT_DIR_TIMEOUT_MILLIS = 500;
/**
* List of the folder listeners.
*/
private static final List<FolderListener> listeners = new ArrayList<>();
/**
* Thread of the folder listener.
*/
private Thread thread;
/**
* List of the file handles.
*/
private WatchItemList itemList;
/**
* Flag of listening status.
*/
private boolean isListen = false;
/**
* Time when expired of idle for folder listener in milliseconds.
*/
private long expiredIdleMillis = 0;
private FolderListener() {
}
/**
* Creates thread of the folder listener
*
* @param directoryName Name of directory.
* @param dao
*/
private FolderListener(String directoryName, CallbackHandlerDao dao) {
itemList = new WatchItemList(directoryName, dao);
}
/**
* Appends the file handler for processing to the folder listener and returns instance of the file handler.
*
* @param directoryName Name of directory for listen.
* @param fileHandlerCallback File handler for processing.
* @param timeoutMillis Timeout waiting for the file creation in milliseconds.
* @param fileLengthCheckDelay Timeout waiting for the file writing in milliseconds.
* @param callbackHandlerDao callbackHandlerDao for handlers
* @return Instance of the file handler.
*/
public static WatchItem listen(String directoryName, FileHandlerCallback fileHandlerCallback,
long timeoutMillis, long fileLengthCheckDelay,
CallbackHandlerDao callbackHandlerDao) {
return listen(directoryName, fileHandlerCallback, timeoutMillis, fileLengthCheckDelay, null,
callbackHandlerDao);
}
/**
* Appends the file handler for processing to the folder listener for the existing file and returns
* instance of the file handler. If the file name is <b>null</b> this means that file does not exist
* and equal to call method
* {@link FolderListener#listen(String, FileHandlerCallback, long, long, CallbackHandlerDao)}.
*
* @param directoryName Name of directory for listen.
* @param fileHandlerCallback File handler for processing.
* @param timeoutMillis Timeout waiting for the file creation in milliseconds.
* @param fileLengthCheckDelay Timeout waiting for the file writing in milliseconds.
* @param fileName file name.
* @param callbackHandlerDao callbackHandlerDao for handlers
* @return Instance of the file handler.
*/
public static WatchItem listen(String directoryName, FileHandlerCallback fileHandlerCallback, long timeoutMillis,
long fileLengthCheckDelay, String fileName, CallbackHandlerDao callbackHandlerDao) {
FolderListener listener;
WatchItem item;
LOGGER.trace("Looking for folder listener to folder \"{}\" ...", directoryName);
synchronized (listeners) {
for (int i = 0; i < listeners.size(); i++) {
listener = listeners.get(i);
if (listener.itemList.getDirectoryName().equals(directoryName)) {
if (listener.isAlive()) {
LOGGER.debug("Folder listener \"{}\" found. Append file handler for UUID {}",
directoryName, fileHandlerCallback.getUUID());
item = listener.itemList.append(fileHandlerCallback, timeoutMillis, fileLengthCheckDelay,
fileName);
return item;
} else {
LOGGER.warn("Folder listener \"{}\" is dead and will be removed", directoryName);
listeners.remove(i);
break;
}
}
}
LOGGER.debug("Folder listener \"{}\" not found. Create new listener and append file handler for UUID {}",
directoryName, fileHandlerCallback.getUUID());
listener = new FolderListener(directoryName, callbackHandlerDao);
item = listener.itemList.append(fileHandlerCallback, timeoutMillis, fileLengthCheckDelay, fileName);
listeners.add(listener);
listener.start();
}
return item;
}
/**
* Terminates all the folder listeners.
*/
public static void terminateAll() {
FolderListener[] array;
synchronized (listeners) {
array = listeners.toArray(new FolderListener[listeners.size()]);
}
for (int i = 0; i < array.length; i++) {
array[i].terminate();
}
}
/**
* Returns the list of folder listeners.
*/
public static List<FolderListener> getListeners() {
return listeners;
}
/**
* Starts the thread of the folder listener.
*/
protected void start() {
thread = new Thread(this, getClass().getSimpleName() + "-" + listeners.size());
thread.start();
}
/**
* Terminates the thread of the folder listener.
*/
protected void terminate() {
if (thread != null) {
LOGGER.debug("Folder listener \"{}\" will be terminate", getDirectoryName());
thread.interrupt();
}
}
/**
* Returns <b>true</b> if the folder listener thread is running and is alive, otherwise <b>false</b>.
*/
public boolean isAlive() {
return (thread != null && thread.isAlive());
}
/**
* Returns <b>true</b> if the folder listener is listening the folder.
*/
public boolean isListen() {
return isListen;
}
/**
* Returns the list of the file handlers.
*/
public WatchItemList getItemList() {
return itemList;
}
/**
* Returns the full name of directory.
*/
public String getDirectoryName() {
return itemList.getDirectoryFullName();
}
/**
* Waiting for the directory creation and returns <b>true</b> if it exists or created.
* If timeout has expired and directory was not created returns <b>false</b>
*/
private boolean waitForDirectory() throws InterruptedException {
File file = new File(getDirectoryName());
if (file.exists()) {
return true;
} else {
LOGGER.trace("Folder listener \"{}\" waiting for the directory creation", getDirectoryName());
}
long expiredTimeMillis = itemList.get(0).getExpiredTimeMillis();
while (expiredTimeMillis >= System.currentTimeMillis()) {
Thread.sleep(WAIT_DIR_TIMEOUT_MILLIS);
if (file.exists()) {
return true;
}
}
LOGGER.error("Folder listener \"{}\" error. Timeout has expired and directory does not exist",
getDirectoryName());
return false;
}
/**
* Initializes the thread of the folder listener. Returns <b>true</b> if the initialization
* completed successfully. Returns <b>false</b> if all the file handlers has been processed
* or initialization fails.
*/
private boolean init() {
LOGGER.trace("Folder listener initializing for \"{}\" ...", getDirectoryName());
try {
if (!waitForDirectory()) {
return false;
}
} catch (InterruptedException e) {
LOGGER.debug("Folder listener \"{}\" has been interrupted", getDirectoryName());
Thread.currentThread().interrupt();
return false;
}
processStatusItems();
if (itemList.size() == 0) {
LOGGER.debug("Folder listener \"{}\" have no files and will be finished", getDirectoryName());
return false;
}
LOGGER.trace("Folder listener has been initialized for \"{}\" ...", getDirectoryName());
return true;
}
/**
* Process all the file handlers if need and removes all expired, processed or interrupted
* the file handlers from the list of the file handlers.
*/
private void processStatusItems() {
int i = 0;
if (itemList.size() > 0) {
expiredIdleMillis = 0;
}
itemList.processItemAll();
synchronized (itemList) {
while (i < itemList.size()) {
final WatchItem item = itemList.get(i);
final ItemStatus status = item.getStatus();
final String uuid = item.getFileHandlerCallback().getUUID();
switch (status) {
case WAIT_FOR_FILE:
case FILE_CAPTURED:
case INPROGRESS:
// Skip
i++;
continue;
case TIMEOUT_EXPIRED:
LOGGER.warn("Folder listener \"{}\" remove expired file handler for UUID {}", getDirectoryName
(), uuid);
try {
item.getFileHandlerCallback().handleError("Request timeout expired");
} catch (Exception e) {
LOGGER.error("Folder listener \"{}\" caused exception for UUID {}", getDirectoryName(),
uuid, e);
}
break;
case IS_DONE:
if (item.getFutureResult()) {
LOGGER.trace("Folder listener \"{}\" remove processed file handler for UUID {}, handler " +
"result is {}", getDirectoryName(), uuid, item.getFutureResult());
} else {
LOGGER.warn("Folder listener \"{}\" remove processed file handler for UUID {}, handler " +
"result is {}", getDirectoryName(), uuid, item.getFutureResult());
}
break;
case IS_CANCELED:
LOGGER.debug("Folder listener \"{}\" remove canceled file handler for UUID {}",
getDirectoryName(), uuid);
break;
case IS_FAILED:
LOGGER.warn("Folder listener \"{}\" remove failed file handler for UUID {}", getDirectoryName
(), uuid);
break;
case IS_INTERRUPTED:
LOGGER.debug("Folder listener \"{}\" remove iterrupted file handler for UUID {}",
getDirectoryName(), uuid);
break;
default:
continue;
}
itemList.remove(i);
}
}
if (expiredIdleMillis == 0 && itemList.size() == 0) {
expiredIdleMillis = System.currentTimeMillis() + LISTENER_IDLE_TIMEOUT_MILLLIS;
}
}
/**
* Removes the listener from the list of folder listeners if the the file handler list is empty
* and idle time has expired or if <b>force</b> flag has been set to <b>true</b>.
*
* @param force the flag of remove the folder listener immediately.
* @return <b>true</b> if the folder listener has been removed otherwise <b>false</>.
*/
private boolean removeListener(boolean force) {
synchronized (listeners) {
if (force || (expiredIdleMillis != 0 && expiredIdleMillis < System.currentTimeMillis())) {
for (int i = 0; i < listeners.size(); i++) {
if (listeners.get(i) == this) {
isListen = false;
listeners.remove(i);
LOGGER.debug("Folder listener \"{}\" has been removed from pool", getDirectoryName());
return true;
}
}
}
}
return false;
}
/**
* Find and return the list of files to process.
*/
private String[] getNewFiles() {
File dir = new File(getDirectoryName());
return dir.list((File dir1, String name) -> {
if (name.toLowerCase().endsWith(JSON_EXTENSION)) {
WatchItem item = itemList.getItem(name);
return (item != null && item.getStatus() == ItemStatus.WAIT_FOR_FILE);
}
return false;
});
}
/**
* Waiting for files and process it.
*/
private void pollFile() {
try {
isListen = true;
while (true) {
String[] fileList = getNewFiles();
if (fileList != null) {
for (String fileName : fileList) {
LOGGER.trace("Folder listener \"{}\" handes the file {}", getDirectoryName(), fileName);
processItem(fileName);
}
}
processStatusItems();
if (removeListener(false)) {
LOGGER.debug("Folder listener \"{}\" have no files and will be finished", getDirectoryName());
break;
}
Thread.sleep(LISTENER_TIMEOUT_MILLLIS);
}
} catch (InterruptedException e) {
removeListener(true);
LOGGER.debug("Folder listener \"{}\" has been interrupted", getDirectoryName());
Thread.currentThread().interrupt();
} catch (Exception e) {
removeListener(true);
LOGGER.error("Folder listener for \"{}\" closed with error.", getDirectoryName(), e);
throw new DlabException("Folder listener for \"" + getDirectoryName() + "\" closed with error. " + e
.getLocalizedMessage(), e);
}
}
private void processItem(String fileName) {
try {
WatchItem item = itemList.getItem(fileName);
item.setFileName(fileName);
if (itemList.processItem(item)) {
LOGGER.debug("Folder listener \"{}\" processes the file {}", getDirectoryName(),
fileName);
}
} catch (Exception e) {
LOGGER.warn("Folder listener \"{}\" has got exception for check or process the file {}",
getDirectoryName(), fileName, e);
}
}
@Override
public void run() {
if (init()) {
pollFile();
} else {
LOGGER.warn("Folder listener has not been initialized for \"{}\"", getDirectoryName());
removeListener(true);
}
}
}