blob: 9ef6c9fcc993c3c2a09359a4aac88282ee5a484c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.felix.dm.impl;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.dm.Logger;
import org.osgi.service.log.LogService;
/**
* Allows you to enqueue tasks from multiple threads and then execute
* them on one thread sequentially. It assumes no more than one thread will
* try to execute the tasks and it will make an effort to pick the first
* task that comes along whilst making sure subsequent tasks return
* without waiting. <p>
*
* This class is <b>lock free</b> by design and ensures <b>"safe object publication"</b> between scheduling threads and
* actual executing thread: if one thread T1 schedules a task, but another thread T2 actually
* executes it, then all the objects from the T1 thread will be "safely published" to the executing T2 thread.
* Safe publication is ensured because we are using a ConcurrentLinkedQueue.
* (see [1], chapter 3.5.3 (Safe publication idioms).
*
* [1] Java Concurrency In Practice, Addison Wesley
*
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
public class SerialExecutor implements Executor {
/**
* All tasks scheduled are stored there and only one thread may run them.
**/
protected final ConcurrentLinkedQueue<Runnable> m_tasks = new ConcurrentLinkedQueue<Runnable>();
/**
* Thread currently executing the task queue.
**/
protected final AtomicReference<Thread> m_runningThread = new AtomicReference<>();
/**
* Logger used when a task execution throws an exception
**/
private final Logger m_logger;
/**
* Makes a new SerialExecutor
* @param logger used when a task execution throws an exception. Can be null if no exception should be logger.
*/
public SerialExecutor(Logger logger) {
m_logger = logger;
}
/**
* Enqueues a task for later execution. You must call {@link #execute()} in order
* to trigger the task execution, which may or may not be executed by
* your current thread.
*/
public void schedule(Runnable task) {
m_tasks.add(task); // No need to synchronize, m_tasks is a concurrent linked queue.
}
/**
* Executes any pending tasks, enqueued using the {@link SerialExecutor#schedule(Runnable)} method.
* This method is thread safe, so multiple threads can try to execute the pending
* tasks, but only the first will be used to actually do so. Other threads will return immediately.
*/
public void execute() {
Thread currentThread = Thread.currentThread();
if (m_runningThread.compareAndSet(null, currentThread)) {
runTasks(currentThread);
}
}
/**
* Schedules a task for execution, and then attempts to execute it. This method is thread safe, so
* multiple threads can try to execute a task but only the first will be executed, other threads will
* return immediately, and the first thread will execute the tasks scheduled by the other threads.<p>
* <p>
* This method is reentrant: if the current thread is currently being executed by this executor, then
* the task passed to this method will be executed immediately, from the current invoking thread
* (inline execution).
*/
public void execute(Runnable task) {
Thread currentThread = Thread.currentThread();
if (m_runningThread.get() == currentThread) {
runTask(task);
} else {
schedule(task);
execute();
}
}
/**
* Run all pending tasks
* @param currentRunninghread the current executing thread
*/
private void runTasks(Thread currentRunninghread) {
do {
try {
Runnable task;
ConcurrentLinkedQueue<Runnable> tasks = m_tasks;
while ((task = tasks.poll()) != null) {
runTask(task);
}
}
finally {
m_runningThread.set(null);
}
}
// We must test again if some tasks have been scheduled after our "while" loop above, but before the
// m_runningThread reference has been reset to null.
while (!m_tasks.isEmpty() && m_runningThread.compareAndSet(null, currentRunninghread));
}
/**
* Run a given task.
* @param task the task to execute.
*/
void runTask(Runnable command) {
try {
command.run();
}
catch (Throwable t) {
if (m_logger != null) {
m_logger.log(LogService.LOG_ERROR, "Error processing tasks", t);
} else {
t.printStackTrace();
}
}
}
@Override
public String toString() {
return "[Executor: queue size: " + m_tasks.size() + "]";
}
}