| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.compute.utils; |
| |
| import static java.util.stream.Collectors.toList; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.is; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.compute.task.ComputeJobRunner; |
| import org.apache.ignite.compute.task.MapReduceTask; |
| import org.apache.ignite.compute.task.TaskExecutionContext; |
| |
| /** |
| * Tests DSL for interactive tasks. "Interactive" means that you can send messages and get responses to/from running tasks. |
| * |
| * <p>For example, you can start {@link GlobalInteractiveMapReduceTask} on some node, get the name of worker node for split or reduce job, |
| * ask split or reduce job to complete successfully or throw exception. Also, this class gives useful assertions for task states. |
| */ |
| public final class InteractiveTasks { |
| /** |
| * ACK for {@link Signal#CONTINUE}. Returned by a task that has received the signal. Used to check that the task is alive. |
| */ |
| private static final Object ACK = new Object(); |
| |
| /** |
| * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a |
| * signal to the task via this channel and get a response from it via {@link #GLOBAL_CHANNEL}. |
| */ |
| private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>(); |
| |
| /** |
| * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a |
| * signal to the task via {@link #GLOBAL_SIGNALS} and get a response from it via this channel. |
| */ |
| private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>(); |
| |
| /** |
| * This counter indicated how many {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} methods are running |
| * now. This counter increased each time the {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} is called and |
| * decreased when the method is finished (whatever the result is). Checked in {@link #clearState}. |
| */ |
| private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new AtomicInteger(0); |
| |
| /** |
| * This counter indicated how many {@link GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This counter increased |
| * each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is called and decreased when the method is finished (whatever the |
| * result is). Checked in {@link #clearState}. |
| */ |
| private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new AtomicInteger(0); |
| |
| /** |
| * The timeout in seconds that defines how long should we wait for async calls. Almost all methods use this timeout. |
| */ |
| private static final long WAIT_TIMEOUT_SECONDS = 15; |
| |
| /** |
| * Clear global state. Must be called before each testing scenario. |
| */ |
| public static void clearState() { |
| assertThat( |
| "Global split job is running. Can not clear global state. Please, stop the job first.", |
| RUNNING_GLOBAL_SPLIT_CNT.get(), |
| is(0) |
| ); |
| assertThat( |
| "Global reduce job is running. Can not clear global state. Please, stop the job first.", |
| RUNNING_GLOBAL_REDUCE_CNT.get(), |
| is(0) |
| ); |
| |
| GLOBAL_SIGNALS.clear(); |
| GLOBAL_CHANNEL.clear(); |
| } |
| |
| /** |
| * Signals that are sent by test code to the tasks. |
| */ |
| private enum Signal { |
| /** |
| * Signal to the task to continue running and send ACK as a response. |
| */ |
| CONTINUE, |
| |
| /** |
| * Ask task to throw an exception. |
| */ |
| THROW, |
| |
| /** |
| * Ask split method to return a list of jobs which will be executed on all nodes. |
| */ |
| SPLIT_RETURN_ALL_NODES, |
| |
| /** |
| * Ask reduce method to return a concatenation of jobs results. |
| */ |
| REDUCE_RETURN |
| } |
| |
| private static Signal listenSignal() { |
| try { |
| return GLOBAL_SIGNALS.take(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * If any of the args are strings, convert them to signals and offer them to the job. |
| * |
| * @param args Job args. |
| */ |
| private static void offerArgsAsSignals(Object... args) { |
| for (Object arg : args) { |
| if (arg instanceof String) { |
| String signal = (String) arg; |
| try { |
| GLOBAL_SIGNALS.offer(Signal.valueOf(signal)); |
| } catch (IllegalArgumentException ignored) { |
| // Ignore non-signal strings |
| } |
| } |
| } |
| } |
| |
| /** |
| * Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}. |
| */ |
| private static class GlobalInteractiveMapReduceTask implements MapReduceTask<List<String>> { |
| @Override |
| public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { |
| RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet(); |
| |
| offerArgsAsSignals(args); |
| |
| try { |
| while (true) { |
| Signal receivedSignal = listenSignal(); |
| switch (receivedSignal) { |
| case THROW: |
| throw new RuntimeException(); |
| case CONTINUE: |
| GLOBAL_CHANNEL.offer(ACK); |
| break; |
| case SPLIT_RETURN_ALL_NODES: |
| return context.ignite().clusterNodes().stream().map(node -> |
| ComputeJobRunner.builder() |
| .jobClassName(InteractiveJobs.interactiveJobName()) |
| .nodes(Set.of(node)) |
| .build() |
| ).collect(toList()); |
| default: |
| throw new IllegalStateException("Unexpected value: " + receivedSignal); |
| } |
| } |
| } finally { |
| RUNNING_GLOBAL_SPLIT_CNT.decrementAndGet(); |
| } |
| } |
| |
| @Override |
| public List<String> reduce(Map<UUID, ?> results) { |
| RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet(); |
| try { |
| while (true) { |
| Signal receivedSignal = listenSignal(); |
| switch (receivedSignal) { |
| case THROW: |
| throw new RuntimeException(); |
| case CONTINUE: |
| GLOBAL_CHANNEL.offer(ACK); |
| break; |
| case REDUCE_RETURN: |
| return results.values().stream() |
| .map(String.class::cast) |
| .collect(toList()); |
| default: |
| throw new IllegalStateException("Unexpected value: " + receivedSignal); |
| } |
| } |
| } finally { |
| RUNNING_GLOBAL_REDUCE_CNT.decrementAndGet(); |
| } |
| } |
| } |
| |
| /** |
| * API for the interaction with {@link GlobalInteractiveMapReduceTask}. |
| */ |
| public static final class GlobalApi { |
| /** |
| * Checks that {@link GlobalInteractiveMapReduceTask} is alive. |
| */ |
| public static void assertAlive() throws InterruptedException { |
| GLOBAL_SIGNALS.offer(Signal.CONTINUE); |
| assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), equalTo(ACK)); |
| } |
| |
| /** |
| * Finishes the split job, returning job runners for all nodes. |
| */ |
| public static void finishSplit() { |
| GLOBAL_SIGNALS.offer(Signal.SPLIT_RETURN_ALL_NODES); |
| } |
| |
| /** |
| * Finishes the split job, returning job runners for all nodes. |
| */ |
| public static void throwException() { |
| GLOBAL_SIGNALS.offer(Signal.THROW); |
| } |
| |
| /** |
| * Finishes the reduce job, returning final result. |
| */ |
| public static void finishReduce() { |
| GLOBAL_SIGNALS.offer(Signal.REDUCE_RETURN); |
| } |
| |
| public static String name() { |
| return GlobalInteractiveMapReduceTask.class.getName(); |
| } |
| } |
| } |