blob: 005db44703a77c01d1f8d33829d880b813e69758 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.impl;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.AbstractEStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.StageConfiguration.*;
import org.apache.reef.wake.WakeParameters;
import org.apache.reef.wake.exception.WakeRuntimeException;
import javax.inject.Inject;
import java.util.List;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Stage that executes an event handler with a thread pool.
*
* @param <T> type
*/
public final class ThreadPoolStage<T> extends AbstractEStage<T> {
private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName());
private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
private final EventHandler<T> handler;
private final EventHandler<Throwable> errorHandler;
private final ExecutorService executor;
private final int numThreads;
/**
* Constructs a thread-pool stage.
*
* @param handler the event handler to execute
* @param numThreads the number of threads to use
* @throws WakeRuntimeException
*/
@Inject
public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(NumberOfThreads.class) final int numThreads) {
this(handler.getClass().getName(), handler, numThreads, null);
}
/**
* Constructs a thread-pool stage.
*
* @param name the stage name
* @param handler the event handler to execute
* @param numThreads the number of threads to use
* @param errorHandler the error handler
* @throws WakeRuntimeException
*/
@Inject
public ThreadPoolStage(@Parameter(StageName.class) final String name,
@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(NumberOfThreads.class) final int numThreads,
@Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
super(name);
this.handler = handler;
this.errorHandler = errorHandler;
if (numThreads <= 0) {
throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0");
}
this.numThreads = numThreads;
this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name));
StageManager.instance().register(this);
}
/**
* Constructs a thread-pool stage.
*
* @param name the stage name
* @param handler the event handler to execute
* @param numThreads the number of threads to use
* @throws WakeRuntimeException
*/
@Inject
public ThreadPoolStage(@Parameter(StageName.class) final String name,
@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(NumberOfThreads.class) final int numThreads) {
this(name, handler, numThreads, null);
}
/**
* Constructs a thread-pool stage.
*
* @param handler the event handler to execute
* @param executor the external executor service provided
*/
@Inject
public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(StageExecutorService.class) final ExecutorService executor) {
this(handler.getClass().getName(), handler, executor);
}
/**
* Constructs a thread-pool stage.
*
* @param handler the event handler to execute
* @param executor the external executor service provided
* @param errorHandler the error handler
*/
@Inject
public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(StageExecutorService.class) final ExecutorService executor,
@Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
this(handler.getClass().getName(), handler, executor, errorHandler);
}
/**
* Constructs a thread-pool stage.
*
* @param name the stage name
* @param handler the event handler to execute
* @param executor the external executor service provided
* for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory}
*/
@Inject
public ThreadPoolStage(@Parameter(StageName.class) final String name,
@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(StageExecutorService.class) final ExecutorService executor) {
this(name, handler, executor, null);
}
/**
* Constructs a thread-pool stage.
*
* @param name the stage name
* @param handler the event handler to execute
* @param executor the external executor service provided
* for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory}
* @param errorHandler the error handler
*/
@Inject
public ThreadPoolStage(@Parameter(StageName.class) final String name,
@Parameter(StageHandler.class) final EventHandler<T> handler,
@Parameter(StageExecutorService.class) final ExecutorService executor,
@Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
super(name);
this.handler = handler;
this.errorHandler = errorHandler;
this.numThreads = 0;
this.executor = executor;
StageManager.instance().register(this);
}
/**
* Handles the event using a thread in the thread pool.
*
* @param value the event
*/
@Override
@SuppressWarnings("checkstyle:illegalcatch")
public void onNext(final T value) {
beforeOnNext();
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
handler.onNext(value);
} catch (final Throwable t) {
if (errorHandler != null) {
errorHandler.onNext(t);
} else {
LOG.log(Level.SEVERE, name + " Exception from event handler", t);
throw t;
}
} finally {
afterOnNext();
}
}
});
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage.");
afterOnNext();
throw e;
}
}
/**
* Closes resources.
*/
@Override
public void close() {
if (closed.compareAndSet(false, true) && numThreads > 0) {
LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name);
executor.shutdown();
boolean isTerminated = false;
try {
isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ex) {
LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex);
}
if (!isTerminated) {
final List<Runnable> droppedRunnables = executor.shutdownNow();
LOG.log(Level.SEVERE,
"Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks",
new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
}
if (!executor.isTerminated()) {
LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name);
}
LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
}
}
/**
* Returns true if resources are closed.
*/
public boolean isClosed() {
return closed.get() && executor.isTerminated();
}
/**
* Gets the queue length of this stage.
*
* @return the queue length
*/
public int getQueueLength() {
return ((ThreadPoolExecutor) executor).getQueue().size();
}
/**
* Gets the active count of this stage.
* @return the active count
*/
public int getActiveCount() {
return (int)(getInMeter().getCount() - getOutMeter().getCount());
}
}