blob: 03159005e8679bfc516fbf801f2468c87375b83c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.fuseki.async;
import static java.lang.String.format;
import java.util.*;
import java.util.concurrent.*;
import org.apache.jena.fuseki.Fuseki;
import org.apache.jena.fuseki.server.DataService;
import org.apache.jena.fuseki.servlets.ServletOps;
/** The set of currently active and recently completed tasks. */
public class AsyncPool
{
// Max concurrent tasks.
private static int nMaxThreads = 4;
// Number of finished tasks kept.
private static int MAX_FINISHED = 20;
// A ThreadPoolExecutor with
// * 0 to nMaxThreads
// * no queue of waiting tasks (tasks execute or are rejected)
// * dormant threads released after 120s.
//
// SynchronousQueue is a BlockingQueue that has zero length - it accepts and
// delivers an item or rejects immediately, no delay by queueing.
private ExecutorService executor = new ThreadPoolExecutor(0, nMaxThreads,
120L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(true));
private final Object mutex = new Object();
private long counter = 0;
private Map<String, AsyncTask> runningTasks = new LinkedHashMap<>();
private Map<String, AsyncTask> finishedTasks = new LinkedHashMap<>();
// Finite FIFO of finished tasks.
private LinkedList<AsyncTask> finishedTasksList = new LinkedList<>();
private static AsyncPool instance = new AsyncPool();
public static AsyncPool get() { return instance; }
private AsyncPool() { }
public AsyncTask submit(Runnable task, String displayName, DataService dataService, long requestId) {
synchronized(mutex) {
String taskId = Long.toString(++counter);
Fuseki.serverLog.info(format("Task : %s : %s",taskId, displayName));
Callable<Object> c = ()->{
try { task.run(); }
catch (Throwable th) {
Fuseki.serverLog.error(format("Exception in task %s execution", taskId), th);
}
return null;
};
AsyncTask asyncTask = new AsyncTask(c, this, taskId, displayName, dataService, requestId);
try {
/* Future<Object> future = */ executor.submit(asyncTask);
runningTasks.put(taskId, asyncTask);
return asyncTask;
} catch (RejectedExecutionException ex) {
ServletOps.errorBadRequest("Async task request rejected - exceeds the limit of "+nMaxThreads+" tasks");
return null;
}
}
}
public Collection<AsyncTask> tasks() {
synchronized(mutex) {
List<AsyncTask> x = new ArrayList<>(runningTasks.size()+finishedTasks.size());
x.addAll(runningTasks.values());
x.addAll(finishedTasks.values());
return x;
}
}
public void finished(AsyncTask task) {
synchronized(mutex) {
String id = task.getTaskId();
runningTasks.remove(id);
// Reduce old tasks list
while ( finishedTasksList.size() >= MAX_FINISHED ) {
AsyncTask oldTask = finishedTasksList.removeFirst();
finishedTasks.remove(oldTask.getTaskId());
}
finishedTasks.put(id, task);
finishedTasksList.add(task);
}
}
public AsyncTask getRunningTask(String taskId) {
synchronized(mutex) {
return runningTasks.get(taskId);
}
}
/**
* Get a task - whether running or finished. Return null for no such task and also
* retired tasks. Only a limited number of old, finished tasks are remembered.
*/
public AsyncTask getTask(String taskId) {
synchronized(mutex) {
AsyncTask task = runningTasks.get(taskId);
if ( task != null )
return task;
return finishedTasks.get(taskId);
}
}
}