| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.testframework.junits.common.GridCommonTest; |
| import org.junit.Test; |
| |
| /** |
| * Grid distributed executor test. |
| */ |
| @GridCommonTest(group = "Thread Tests") |
| public class IgniteExecutorServiceTest extends GridCommonAbstractTest { |
| /** */ |
| public IgniteExecutorServiceTest() { |
| super(true); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @Test |
| public void testExecute() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| srvc.execute(new Runnable() { |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| @Override public void run() { |
| System.out.println("Test message."); |
| |
| assert this.ignite != null; |
| } |
| }); |
| |
| srvc.execute(new TestRunnable()); |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @Test |
| public void testSubmit() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Future<?> fut = srvc.submit(new TestRunnable()); |
| |
| Object res = fut.get(); |
| |
| info("Default Runnable result:" + res); |
| |
| assert res == null : "Failed to get valid default result for submitted Runnable: " + res; |
| |
| String val = "test-value"; |
| |
| fut = srvc.submit(new TestRunnable(), val); |
| |
| res = fut.get(); |
| |
| info("Defined Runnable result:" + res); |
| |
| assert val.equals(res) : "Failed to get valid predefined result for submitted Runnable: " + res; |
| |
| fut = srvc.submit(new TestCallable<>(val)); |
| |
| res = fut.get(); |
| |
| info("Callable result:" + res); |
| |
| assert val.equals(res) : "Failed to get valid result for submitted Callable: " + res; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @Test |
| public void testSubmitWithFutureTimeout() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Future<Integer> fut = srvc.submit(new TestCallable<>(3000)); // Just sleep for 3 seconds. |
| |
| boolean ok = true; |
| |
| try { |
| fut.get(1, TimeUnit.SECONDS); |
| |
| ok = false; |
| } |
| catch (TimeoutException e) { |
| info("Task timeout elapsed: " + e.getMessage()); |
| } |
| |
| assert ok : "Timeout must be thrown."; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| @Test |
| public void testInvokeAll() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Collection<Callable<String>> cmds = new ArrayList<>(2); |
| |
| String val1 = "test-value-1"; |
| String val2 = "test-value-2"; |
| |
| cmds.add(new TestCallable<>(val1)); |
| cmds.add(new TestCallable<>(val2)); |
| |
| List<Future<String>> futs = srvc.invokeAll(cmds); |
| |
| assert futs != null; |
| assert futs.size() == 2; |
| |
| String res1 = futs.get(0).get(); |
| String res2 = futs.get(1).get(); |
| |
| assert val1.equals(res1) : "Failed to get valid result for first command: " + res1; |
| assert val2.equals(res2) : "Failed to get valid result for second command: " + res2; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @Test |
| public void testInvokeAllWithTimeout() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Collection<Callable<Integer>> cmds = new ArrayList<>(); |
| |
| cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. |
| cmds.add(new TestCallable<>(3000)); // Just sleeps for 3 seconds. |
| |
| List<Future<Integer>> fut = srvc.invokeAll(cmds, 1, TimeUnit.SECONDS); |
| |
| assert fut != null; |
| assert fut.size() == 2; |
| |
| boolean ok = true; |
| |
| try { |
| fut.get(0).get(); |
| |
| ok = false; |
| } |
| catch (CancellationException e) { |
| info("First timeout task is cancelled: " + e.getMessage()); |
| } |
| |
| assert ok : "First task must be cancelled."; |
| |
| try { |
| fut.get(1).get(); |
| |
| ok = false; |
| } |
| catch (CancellationException e) { |
| info("Second timeout task is cancelled: " + e.getMessage()); |
| } |
| |
| assert ok : "Second task must be cancelled."; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| @Test |
| public void testInvokeAny() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Collection<Callable<String>> cmds = new ArrayList<>(2); |
| |
| String val1 = "test-value-1"; |
| String val2 = "test-value-2"; |
| |
| cmds.add(new TestCallable<>(val1)); |
| cmds.add(new TestCallable<>(val2)); |
| |
| String res = srvc.invokeAny(cmds); |
| |
| info("Result: " + res); |
| |
| assert val1.equals(res) : "Failed to get valid result: " + res; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @throws Exception Thrown in case of test failure. |
| */ |
| @Test |
| public void testInvokeAnyWithTimeout() throws Exception { |
| Ignite ignite = G.ignite(getTestIgniteInstanceName()); |
| |
| ExecutorService srvc = createExecutorService(ignite); |
| |
| Collection<Callable<Integer>> timeoutCmds = new ArrayList<>(2); |
| |
| timeoutCmds.add(new TestCallable<>(5000)); |
| timeoutCmds.add(new TestCallable<>(5000)); |
| |
| boolean ok = true; |
| |
| try { |
| srvc.invokeAny(timeoutCmds, 1, TimeUnit.SECONDS); |
| |
| ok = false; |
| } |
| catch (TimeoutException e) { |
| info("Task timeout elapsed: " + e.getMessage()); |
| } |
| |
| assert ok : "Timeout must be thrown."; |
| |
| srvc.shutdown(); |
| } |
| |
| /** |
| * @param ignite Grid instance. |
| * @return Thrown in case of test failure. |
| */ |
| private ExecutorService createExecutorService(Ignite ignite) { |
| assert ignite != null; |
| |
| return ignite.executorService(); |
| } |
| |
| /** |
| * @param <T> Type of the {@link Callable} argument. |
| */ |
| private static class TestCallable<T> implements Callable<T>, Serializable { |
| /** */ |
| private T data; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** |
| * @param data Data. |
| */ |
| TestCallable(T data) { |
| this.data = data; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T call() throws Exception { |
| System.out.println("Test callable message."); |
| |
| assert ignite != null; |
| |
| if (data instanceof Integer) |
| Thread.sleep((Integer)data); |
| |
| return data; |
| } |
| } |
| |
| /** */ |
| private static class TestRunnable implements Runnable, Serializable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| System.out.println("Test Runnable message."); |
| |
| assert ignite != null; |
| } |
| } |
| } |