blob: d77dda2fbcc9fe4fbaa6dd24717da1a43e6d5c9d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.configuration.updater;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
import org.apache.qpid.server.util.FutureHelper;
public class TaskExecutorImpl implements TaskExecutor
{
private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Config";
private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorImpl.class);
private final PrincipalAccessor _principalAccessor;
private volatile Thread _taskThread;
private final AtomicBoolean _running = new AtomicBoolean();
private volatile ListeningExecutorService _executor;
private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
private final String _name;
public TaskExecutorImpl()
{
this(TASK_EXECUTION_THREAD_NAME, null);
}
public TaskExecutorImpl(final String name, PrincipalAccessor principalAccessor)
{
_name = name;
_principalAccessor = principalAccessor;
}
@Override
public boolean isRunning()
{
return _running.get();
}
@Override
public void start()
{
if (_running.compareAndSet(false, true))
{
LOGGER.debug("Starting task executor {}", _name);
_executor = MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory()
{
@Override
public Thread
newThread(
final Runnable r)
{
_taskThread =
new TaskThread(
r,
_name,
TaskExecutorImpl.this);
return _taskThread;
}
}));
LOGGER.debug("Task executor is started");
}
}
@Override
public void stopImmediately()
{
if (_running.compareAndSet(true,false))
{
ExecutorService executor = _executor;
if (executor != null)
{
LOGGER.debug("Stopping task executor {} immediately", _name);
List<Runnable> cancelledTasks = executor.shutdownNow();
for (Runnable runnable : cancelledTasks)
{
if (runnable instanceof RunnableFuture<?>)
{
((RunnableFuture<?>) runnable).cancel(true);
}
}
_executor = null;
_taskThread = null;
LOGGER.debug("Task executor was stopped immediately. Number of unfinished tasks: " + cancelledTasks.size());
}
}
}
@Override
public void stop()
{
if (_running.compareAndSet(true, false))
{
ExecutorService executor = _executor;
if (executor != null)
{
LOGGER.debug("Stopping task executor {}", _name);
executor.shutdown();
_executor = null;
_taskThread = null;
LOGGER.debug("Task executor is stopped");
}
}
}
@Override
public <T, E extends Exception> ListenableFuture<T> submit(Task<T, E> userTask) throws E
{
return submitWrappedTask(new TaskLoggingWrapper<>(userTask));
}
private <T, E extends Exception> ListenableFuture<T> submitWrappedTask(TaskLoggingWrapper<T, E> task) throws E
{
checkState(task);
if (isTaskExecutorThread())
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Running {} immediately", task);
}
T result = task.execute();
return Futures.immediateFuture(result);
}
else
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Submitting {} to executor {}", task, _name);
}
return _executor.submit(new CallableWrapper<>(task));
}
}
@Override
public void execute(final Runnable command)
{
LOGGER.trace("Running runnable {} through executor interface", command);
_wrappedExecutor.execute(command);
}
@Override
public <T, E extends Exception> T run(Task<T, E> userTask) throws CancellationException, E
{
TaskLoggingWrapper<T, E> task = new TaskLoggingWrapper<>(userTask);
return FutureHelper.<T, E>await(submitWrappedTask(task));
}
private boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
private void checkState(Task<?, ?> task)
{
if (!_running.get())
{
LOGGER.error("Task executor {} is not in ACTIVE state, unable to execute : {} ", _name, task);
throw new IllegalStateException("Task executor " + _name + " is not in ACTIVE state");
}
}
private Subject getContextSubject()
{
Subject contextSubject = Subject.getSubject(AccessController.getContext());
if (contextSubject != null && _principalAccessor != null)
{
Principal additionalPrincipal = _principalAccessor.getPrincipal();
Set<Principal> principals = contextSubject.getPrincipals();
if (additionalPrincipal != null && !principals.contains(additionalPrincipal))
{
Set<Principal> extendedPrincipals = new HashSet<>(principals);
extendedPrincipals.add(additionalPrincipal);
contextSubject = new Subject(contextSubject.isReadOnly(),
extendedPrincipals,
contextSubject.getPublicCredentials(),
contextSubject.getPrivateCredentials());
}
}
return contextSubject;
}
private class TaskLoggingWrapper<T, E extends Exception> implements Task<T, E>
{
private final Task<T,E> _task;
public TaskLoggingWrapper(Task<T, E> task)
{
_task = task;
}
@Override
public T execute() throws E
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Performing {}", this);
}
boolean success = false;
T result = null;
try
{
result = _task.execute();
success = true;
}
finally
{
if (LOGGER.isDebugEnabled())
{
if (success)
{
LOGGER.debug("{} performed successfully with result: {}", this, result);
} else
{
LOGGER.debug("{} failed to perform successfully", this);
}
}
}
return result;
}
@Override
public String getObject()
{
return _task.getObject();
}
@Override
public String getAction()
{
return _task.getAction();
}
@Override
public String getArguments()
{
return _task.getArguments();
}
@Override
public String toString()
{
String arguments = getArguments();
if (arguments == null)
{
return String.format("Task['%s' on '%s']", getAction(), getObject());
}
return String.format("Task['%s' on '%s' with arguments '%s']", getAction(), getObject(), arguments);
}
}
private class CallableWrapper<T, E extends Exception> implements Callable<T>
{
private final Task<T, E> _userTask;
private final Subject _contextSubject;
private final AtomicReference<Throwable> _throwable;
public CallableWrapper(Task<T, E> userWork)
{
_userTask = userWork;
_contextSubject = getContextSubject();
_throwable = new AtomicReference<>();
}
@Override
public T call() throws Exception
{
T result = Subject.doAs(_contextSubject, new PrivilegedAction<T>()
{
@Override
public T run()
{
try
{
return _userTask.execute();
}
catch(Throwable t)
{
_throwable.set(t);
}
return null;
}
});
Throwable t = _throwable.get();
if (t != null)
{
if (t instanceof RuntimeException)
{
throw (RuntimeException) t;
}
else if (t instanceof Error)
{
throw (Error) t;
}
else
{
throw (Exception) t;
}
}
return result;
}
}
private static class ImmediateFuture<T> implements Future<T>
{
private T _result;
public ImmediateFuture(T result)
{
super();
_result = result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public T get()
{
return _result;
}
@Override
public T get(long timeout, TimeUnit unit)
{
return get();
}
}
private class ImmediateIfSameThreadExecutor implements Executor
{
@Override
public void execute(final Runnable command)
{
if(isTaskExecutorThread()
|| (_executor == null && (Thread.currentThread() instanceof TaskThread
&& ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
{
command.run();
}
else
{
final Subject subject = getContextSubject();
_executor.execute(new Runnable()
{
@Override
public void run()
{
Subject.doAs(subject, new PrivilegedAction<Void>()
{
@Override
public Void run()
{
command.run();
return null;
}
});
}
});
}
}
}
private static class TaskThread extends Thread
{
private final TaskExecutorImpl _taskExecutor;
public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
{
super(r, name);
_taskExecutor = taskExecutor;
}
public TaskExecutorImpl getTaskExecutor()
{
return _taskExecutor;
}
}
@Override
public Factory getFactory()
{
return new Factory()
{
@Override
public TaskExecutor newInstance()
{
return new TaskExecutorImpl();
}
@Override
public TaskExecutor newInstance(final String name, PrincipalAccessor principalAccessor)
{
return new TaskExecutorImpl(name, principalAccessor);
}
};
}
}