| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.wake.impl; |
| |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.wake.AbstractEStage; |
| import org.apache.reef.wake.EventHandler; |
| import org.apache.reef.wake.StageConfiguration.*; |
| import org.apache.reef.wake.WakeParameters; |
| import org.apache.reef.wake.exception.WakeRuntimeException; |
| |
| import javax.inject.Inject; |
| import java.util.List; |
| import java.util.concurrent.*; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * Stage that executes an event handler with a thread pool. |
| * |
| * @param <T> type |
| */ |
| public final class ThreadPoolStage<T> extends AbstractEStage<T> { |
| |
| private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName()); |
| |
| private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; |
| |
| private final EventHandler<T> handler; |
| private final EventHandler<Throwable> errorHandler; |
| private final ExecutorService executor; |
| private final int numThreads; |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param handler the event handler to execute |
| * @param numThreads the number of threads to use |
| * @throws WakeRuntimeException |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(NumberOfThreads.class) final int numThreads) { |
| this(handler.getClass().getName(), handler, numThreads, null); |
| } |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param name the stage name |
| * @param handler the event handler to execute |
| * @param numThreads the number of threads to use |
| * @param errorHandler the error handler |
| * @throws WakeRuntimeException |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageName.class) final String name, |
| @Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(NumberOfThreads.class) final int numThreads, |
| @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { |
| super(name); |
| this.handler = handler; |
| this.errorHandler = errorHandler; |
| if (numThreads <= 0) { |
| throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0"); |
| } |
| this.numThreads = numThreads; |
| this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name)); |
| StageManager.instance().register(this); |
| } |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param name the stage name |
| * @param handler the event handler to execute |
| * @param numThreads the number of threads to use |
| * @throws WakeRuntimeException |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageName.class) final String name, |
| @Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(NumberOfThreads.class) final int numThreads) { |
| this(name, handler, numThreads, null); |
| } |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param handler the event handler to execute |
| * @param executor the external executor service provided |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(StageExecutorService.class) final ExecutorService executor) { |
| this(handler.getClass().getName(), handler, executor); |
| } |
| |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param handler the event handler to execute |
| * @param executor the external executor service provided |
| * @param errorHandler the error handler |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(StageExecutorService.class) final ExecutorService executor, |
| @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { |
| this(handler.getClass().getName(), handler, executor, errorHandler); |
| } |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param name the stage name |
| * @param handler the event handler to execute |
| * @param executor the external executor service provided |
| * for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory} |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageName.class) final String name, |
| @Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(StageExecutorService.class) final ExecutorService executor) { |
| this(name, handler, executor, null); |
| } |
| |
| /** |
| * Constructs a thread-pool stage. |
| * |
| * @param name the stage name |
| * @param handler the event handler to execute |
| * @param executor the external executor service provided |
| * for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory} |
| * @param errorHandler the error handler |
| */ |
| @Inject |
| public ThreadPoolStage(@Parameter(StageName.class) final String name, |
| @Parameter(StageHandler.class) final EventHandler<T> handler, |
| @Parameter(StageExecutorService.class) final ExecutorService executor, |
| @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { |
| super(name); |
| this.handler = handler; |
| this.errorHandler = errorHandler; |
| this.numThreads = 0; |
| this.executor = executor; |
| StageManager.instance().register(this); |
| } |
| |
| /** |
| * Handles the event using a thread in the thread pool. |
| * |
| * @param value the event |
| */ |
| @Override |
| @SuppressWarnings("checkstyle:illegalcatch") |
| public void onNext(final T value) { |
| beforeOnNext(); |
| try { |
| executor.submit(new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| handler.onNext(value); |
| } catch (final Throwable t) { |
| if (errorHandler != null) { |
| errorHandler.onNext(t); |
| } else { |
| LOG.log(Level.SEVERE, name + " Exception from event handler", t); |
| throw t; |
| } |
| } finally { |
| afterOnNext(); |
| } |
| } |
| |
| }); |
| } catch (final Exception e) { |
| LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage."); |
| afterOnNext(); |
| throw e; |
| } |
| |
| } |
| |
| /** |
| * Closes resources. |
| */ |
| @Override |
| public void close() { |
| |
| if (closed.compareAndSet(false, true) && numThreads > 0) { |
| |
| LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name); |
| |
| executor.shutdown(); |
| |
| boolean isTerminated = false; |
| try { |
| isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); |
| } catch (final InterruptedException ex) { |
| LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex); |
| } |
| |
| if (!isTerminated) { |
| final List<Runnable> droppedRunnables = executor.shutdownNow(); |
| LOG.log(Level.SEVERE, |
| "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks", |
| new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()}); |
| } |
| |
| if (!executor.isTerminated()) { |
| LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name); |
| } |
| |
| LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name); |
| } |
| } |
| |
| /** |
| * Returns true if resources are closed. |
| */ |
| public boolean isClosed() { |
| return closed.get() && executor.isTerminated(); |
| } |
| |
| /** |
| * Gets the queue length of this stage. |
| * |
| * @return the queue length |
| */ |
| public int getQueueLength() { |
| return ((ThreadPoolExecutor) executor).getQueue().size(); |
| } |
| |
| /** |
| * Gets the active count of this stage. |
| * @return the active count |
| */ |
| public int getActiveCount() { |
| return (int)(getInMeter().getCount() - getOutMeter().getCount()); |
| } |
| } |