| = Concurrency Utilities for Java EE |
| :index-group: Other Features |
| :jbake-type: page |
| :jbake-status: published |
| |
| In this example will be used https://docs.oracle.com/javaee/7/tutorial/concurrency-utilities.htm[Concurrency Utilities for Java EE], or JSR 236. |
| |
| This standard allows application developers to use concurrency utilities managed by the application server. |
| In this way, the developer no longer has the responsibility to manually manage thread polls or threads. |
| Also, in a non-managed Thread object, the container cannot guarantee that other Java EE platform services work correctly. For these reasons, it is recommended the usage of managed threads whenever the need arise. |
| More information can be found https://docs.oracle.com/javaee/7/tutorial/concurrency-utilities001.htm[here]. |
| |
| == Main Components of the Concurrency Utilities |
| |
| The standard specifies main components of the concurrency utilities. In short, these components are managed objects that offer concurrency facilities. These objects, since are managed by the application, can be injected either using CDI, either JNDI. More information can be found https://docs.oracle.com/javaee/7/tutorial/concurrency-utilities002.htm[here]. |
| |
| === ManagedExecutorService |
| |
| A `ManagedExecutorService` is an object that allows application developers to submit tasks asynchronously. Tasks are executed on threads that are managed by the container. |
| |
| ==== Example |
| |
| Here is a class that uses a `ManagedExecutorService` (full code can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/main/java/org/superbiz/executor/ManagedService.java[here]): |
| |
| [source,java] |
| ---- |
| |
| @RequestScoped |
| public class ManagedService { |
| |
| @Resource |
| private ManagedExecutorService executor; |
| |
| public CompletableFuture<Integer> asyncTask(final int value) { |
| return CompletableFuture |
| .supplyAsync(longTask(value, 100, null), executor) |
| .thenApply(i -> i + 1); |
| } |
| |
| public CompletableFuture<Integer> asyncTaskWithException(final int value) { |
| return CompletableFuture |
| .supplyAsync(longTask(value, 100, "Planned exception"), executor) |
| .thenApply(i -> i + 1); |
| } |
| |
| private Supplier<Integer> longTask(final int value, |
| final int taskDurationMs, |
| final String errorMessage) { |
| return () -> { |
| if (nonNull(errorMessage)) { |
| throw new RuntimeException(errorMessage); |
| } |
| |
| try { |
| TimeUnit.MILLISECONDS.sleep(taskDurationMs); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("Problem while waiting"); |
| } |
| return value + 1; |
| }; |
| } |
| |
| } |
| ---- |
| |
| The `ManagedExecutorService` object, being an managed object, is injected using the `@Resource` annotation. |
| |
| This example simulates a long running computation, defined in the `longTask` method. |
| |
| The capabilities of `ManagedExecutorService` are exemplified in the `asyncTask` and `asyncTaskWithException` methods. |
| Both methods invoke the `longTask` method defined above; each execution of `longTask` is performed in a thread managed by the application. |
| The method `asyncTask` simulates a successful execution, while the `asyncTaskWithException` simulates a execution that will throw an exception. |
| |
| The methods are used in the following test class (full example can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/test/java/org/superbiz/executor/ManagedServiceTest.java[here]): |
| [source,java] |
| ---- |
| |
| @RunWith(Arquillian.class) |
| public class ManagedServiceTest { |
| |
| @Inject |
| private ManagedService managedService; |
| |
| @Deployment() |
| public static final WebArchive app() { |
| return ShrinkWrap.create(WebArchive.class, "example.war") |
| .addClasses(ManagedService.class) |
| .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); |
| } |
| |
| @Test |
| public void managedInvocationTest() { |
| final CompletableFuture<Integer> future = managedService.asyncTask(1); |
| try { |
| assertEquals(3, future.get(200, TimeUnit.MILLISECONDS).intValue()); |
| } catch (Exception e) { |
| fail("Unexpected exception" + e); |
| } |
| } |
| |
| @Test(expected = TimeoutException.class) |
| public void managedInvocationTestWithTimeout() throws InterruptedException, ExecutionException, TimeoutException { |
| final CompletableFuture<Integer> future = managedService.asyncTask(1); |
| future.get(10, TimeUnit.MILLISECONDS); |
| } |
| |
| @Test |
| public void managedInvocationTestWithException() { |
| final CompletableFuture<Integer> future = managedService.asyncTaskWithException(1); |
| |
| try { |
| future.get(200, TimeUnit.MILLISECONDS); |
| } catch (ExecutionException e) { |
| assertEquals("Planned exception", e.getCause().getMessage()); |
| } catch (Exception e) { |
| fail("Unexpected exception" + e); |
| } |
| } |
| } |
| ---- |
| |
| === ManagedScheduledExecutorService |
| |
| A `ManagedScheduledExecutorService` is an object that allows developers to execute tasks asynchronously at specific times. The tasks are executed on threads started by the container. |
| |
| ==== Example |
| |
| Full example can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/main/java/org/superbiz/executor/ManagedScheduledService.java[here]: |
| |
| [source,java] |
| ---- |
| |
| @RequestScoped |
| public class ManagedScheduledService { |
| |
| @Resource |
| private ManagedScheduledExecutorService executor; |
| |
| public Future<Integer> singleFixedDelayTask(final int value, |
| final String errorMessage) { |
| return executor.schedule( |
| longCallableTask(value, 10, errorMessage), 100, TimeUnit.MILLISECONDS); |
| } |
| |
| public ScheduledFuture<?> periodicFixedDelayTask(final int value, |
| final String errorMessage, |
| final CountDownLatch countDownLatch) { |
| return executor.scheduleAtFixedRate( |
| longRunnableTask(value, 10, errorMessage, countDownLatch), 0, 100, TimeUnit.MILLISECONDS); |
| } |
| |
| private Runnable longRunnableTask(final int value, |
| final int taskDurationMs, |
| final String errorMessage, |
| final CountDownLatch countDownLatch) { |
| return () -> { |
| failOrWait(taskDurationMs, errorMessage); |
| Integer result = value + 1; |
| countDownLatch.countDown(); |
| }; |
| } |
| |
| private Callable<Integer> longCallableTask(final int value, |
| final int taskDurationMs, |
| final String errorMessage) { |
| return () -> { |
| failOrWait(taskDurationMs, errorMessage); |
| return value + 1; |
| }; |
| } |
| |
| private void failOrWait(final int taskDurationMs, |
| final String errorMessage) { |
| if (nonNull(errorMessage)) { |
| throw new RuntimeException(errorMessage); |
| } |
| try { |
| TimeUnit.MILLISECONDS.sleep(taskDurationMs); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("Problem while waiting"); |
| } |
| } |
| |
| } |
| ---- |
| |
| This example also defines a method, `longCallableTask`, simulating the execution of a long running computation. |
| |
| The method `singleFixedDelayTask` schedules a long running task (by calling `longCallableTask`), but the execution will start after 100 ms. |
| The method `periodicFixedDelayTask` schedules tasks to be run periodically, after each 100 ms, with an initial delay of 0. |
| |
| The methods are used in the following test class (full code can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/test/java/org/superbiz/executor/ManagedScheduledServiceTest.java[here]): |
| |
| [source,java] |
| ---- |
| |
| @RunWith(Arquillian.class) |
| public class ManagedScheduledServiceTest { |
| |
| @Inject |
| private ManagedScheduledService scheduledService; |
| |
| @Deployment() |
| public static final WebArchive app() { |
| return ShrinkWrap.create(WebArchive.class, "example.war") |
| .addClasses(ManagedScheduledService.class) |
| .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); |
| } |
| |
| @Test |
| public void singleFixedDelayTask() throws InterruptedException, ExecutionException, TimeoutException { |
| final Future<Integer> futureA = scheduledService.singleFixedDelayTask(1, null); |
| final Future<Integer> futureB = scheduledService.singleFixedDelayTask(50, null); |
| |
| assertEquals(2, futureA.get(200, TimeUnit.MILLISECONDS).intValue()); |
| assertEquals(51, futureB.get(200, TimeUnit.MILLISECONDS).intValue()); |
| |
| } |
| |
| @Test |
| public void periodicFixedDelayTask() throws InterruptedException { |
| final CountDownLatch countDownLatch = new CountDownLatch(4); // execute 4 times |
| final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, null, countDownLatch); |
| countDownLatch.await(500, TimeUnit.MILLISECONDS); |
| if (!scheduledFuture.isCancelled()) { |
| scheduledFuture.cancel(true); |
| } |
| } |
| |
| @Test |
| public void singleFixedDelayTaskWithException() { |
| final Future<Integer> future = scheduledService.singleFixedDelayTask(1, "Planned exception"); |
| try { |
| future.get(200, TimeUnit.MILLISECONDS); |
| } catch (ExecutionException e) { |
| assertEquals("Planned exception", e.getCause().getMessage()); |
| } catch (Exception e) { |
| fail("Unexpected exception" + e); |
| } |
| } |
| |
| @Test |
| public void periodicFixedDelayTaskWithException() { |
| final CountDownLatch countDownLatch = new CountDownLatch(1); |
| final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, "Planned exception", countDownLatch); |
| |
| try { |
| countDownLatch.await(200, TimeUnit.MILLISECONDS); |
| scheduledFuture.get(200, TimeUnit.MILLISECONDS); |
| } catch (ExecutionException e) { |
| assertEquals("Planned exception", e.getCause().getMessage()); |
| } catch (Exception e) { |
| fail("Unexpected exception" + e); |
| } |
| |
| if (!scheduledFuture.isCancelled()) { |
| scheduledFuture.cancel(true); |
| } |
| } |
| |
| } |
| ---- |
| |
| === ManagedThreadFactory |
| |
| A `ManagedThreadFactory` is an object that allows developers to create container managed threads. |
| |
| ==== Example |
| |
| Full example can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/main/java/org/superbiz/executor/ThreadFactoryService.java[here]: |
| |
| [source,java] |
| ---- |
| |
| @RequestScoped |
| public class ThreadFactoryService { |
| |
| @Resource |
| private ManagedThreadFactory factory; |
| |
| public void asyncTask(final LongTask longTask) throws InterruptedException { |
| final Thread thread = factory.newThread(longTask); |
| thread.setName("pretty asyncTask"); |
| thread.start(); |
| } |
| |
| public void asyncHangingTask(final Runnable longTask) { |
| final Thread thread = factory.newThread(longTask); |
| thread.setName("pretty asyncHangingTask"); |
| thread.start(); |
| |
| if (thread.isAlive()) { |
| thread.interrupt(); |
| } |
| } |
| |
| public static class LongTask implements Runnable { |
| private final int value; |
| private final long taskDurationMs; |
| private final CountDownLatch countDownLatch; |
| private int result; |
| private AtomicBoolean isTerminated = new AtomicBoolean(false); |
| |
| public LongTask(final int value, |
| final long taskDurationMs, |
| final CountDownLatch countDownLatch) { |
| this.value = value; |
| this.taskDurationMs = taskDurationMs; |
| this.countDownLatch = countDownLatch; |
| } |
| |
| public int getResult() { |
| return result; |
| } |
| |
| public boolean getIsTerminated() { |
| return isTerminated.get(); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| TimeUnit.MILLISECONDS.sleep(taskDurationMs); |
| } catch (InterruptedException e) { |
| isTerminated.set(true); |
| countDownLatch.countDown(); |
| throw new RuntimeException("Problem while waiting"); |
| } |
| |
| result = value + 1; |
| countDownLatch.countDown(); |
| } |
| } |
| } |
| ---- |
| |
| This example defines a class implementing `Runnable`, executing a long running task in the `run` method. |
| |
| The method `asyncTask` just creates a managed thread (using the injected `ManagedThreadFactory`) then starts it. |
| The method `asyncHangingTask` also creates a managed thread, starts it, but then stops it. |
| |
| The following class tests these methods (full code can be found https://github.com/apache/tomee/blob/master/examples/concurrency-utils/src/test/java/org/superbiz/executor/ThreadFactoryServiceTest.java[here]): |
| |
| [source,java] |
| ---- |
| |
| @RunWith(Arquillian.class) |
| public class ThreadFactoryServiceTest { |
| |
| @Inject |
| private ThreadFactoryService factoryService; |
| |
| @Deployment() |
| public static final WebArchive app() { |
| return ShrinkWrap.create(WebArchive.class, "example.war") |
| .addClasses(ThreadFactoryService.class) |
| .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); |
| } |
| |
| @Test |
| public void asyncTask() throws InterruptedException { |
| final CountDownLatch countDownLatch = new CountDownLatch(1); |
| final LongTask longTask = new LongTask(1, 50, countDownLatch); |
| factoryService.asyncTask(longTask); |
| |
| countDownLatch.await(200, TimeUnit.MILLISECONDS); |
| |
| assertEquals(2, longTask.getResult()); |
| } |
| |
| @Test |
| public void asyncHangingTask() throws InterruptedException { |
| final CountDownLatch countDownLatch = new CountDownLatch(1); |
| final LongTask longTask = new LongTask(1, 1000000, countDownLatch); |
| |
| factoryService.asyncHangingTask(longTask); |
| |
| countDownLatch.await(200, TimeUnit.MILLISECONDS); |
| |
| assertTrue(longTask.getIsTerminated()); |
| } |
| } |
| ---- |
| |
| Full project example can be found https://github.com/apache/tomee/tree/master/examples/concurrency-utils[here]. |
| It's a Maven project, and all the tests can be executed by running `mvn clean install` command. |