blob: 9fde0f9e1304774de26a727b6842a6de09c72fea [file] [log] [blame]
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool.serial;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.InternalThreadLocal;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Executor ensuring that all {@link Runnable} tasks submitted are executed in order
* using the provided {@link Executor}, and serially such that no two will ever be
* running at the same time.
*/
public final class SerializingExecutor implements Executor, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(SerializingExecutor.class);
/**
* Use false to stop and true to run
*/
private final AtomicBoolean atomicBoolean = new AtomicBoolean();
private final Executor executor;
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
/**
* Creates a SerializingExecutor, running tasks using {@code executor}.
*
* @param executor Executor in which tasks should be run. Must not be null.
*/
public SerializingExecutor(Executor executor) {
this.executor = executor;
}
/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
*/
@Override
public void execute(Runnable r) {
runQueue.add(r);
schedule(r);
}
private void schedule(Runnable removable) {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
executor.execute(this);
success = true;
} finally {
// It is possible that at this point that there are still tasks in
// the queue, it would be nice to keep trying but the error may not
// be recoverable. So we update our state and propagate so that if
// our caller deems it recoverable we won't be stuck.
if (!success) {
if (removable != null) {
// This case can only be reached if 'this' was not currently running, and we failed to
// reschedule. The item should still be in the queue for removal.
// ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
// throw if the item to remove is null. If removable is present in the queue twice,
// the wrong one may be removed. It doesn't seem possible for this case to exist today.
// This is important to run in case of RejectedExecutionException, so that future calls
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
atomicBoolean.set(false);
}
}
}
}
@Override
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
try {
InternalThreadLocal.removeAll();
r.run();
} catch (RuntimeException e) {
LOGGER.error("Exception while executing runnable " + r, e);
} finally {
InternalThreadLocal.removeAll();
}
}
} finally {
atomicBoolean.set(false);
}
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null);
}
}
}