blob: c19e7c669ce567a1d35a1a7bc6f6fca5687bf0b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.compute.utils;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.task.ComputeJobRunner;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
/**
* Tests DSL for interactive tasks. "Interactive" means that you can send messages and get responses to/from running tasks.
*
* <p>For example, you can start {@link GlobalInteractiveMapReduceTask} on some node, get the name of worker node for split or reduce job,
* ask split or reduce job to complete successfully or throw exception. Also, this class gives useful assertions for task states.
*/
public final class InteractiveTasks {
/**
* ACK for {@link Signal#CONTINUE}. Returned by a task that has received the signal. Used to check that the task is alive.
*/
private static final Object ACK = new Object();
/**
* Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
* signal to the task via this channel and get a response from it via {@link #GLOBAL_CHANNEL}.
*/
private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>();
/**
* Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
* signal to the task via {@link #GLOBAL_SIGNALS} and get a response from it via this channel.
*/
private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>();
/**
* This counter indicated how many {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} methods are running
* now. This counter increased each time the {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} is called and
* decreased when the method is finished (whatever the result is). Checked in {@link #clearState}.
*/
private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new AtomicInteger(0);
/**
* This counter indicated how many {@link GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This counter increased
* each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is called and decreased when the method is finished (whatever the
* result is). Checked in {@link #clearState}.
*/
private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new AtomicInteger(0);
/**
* The timeout in seconds that defines how long should we wait for async calls. Almost all methods use this timeout.
*/
private static final long WAIT_TIMEOUT_SECONDS = 15;
/**
* Clear global state. Must be called before each testing scenario.
*/
public static void clearState() {
assertThat(
"Global split job is running. Can not clear global state. Please, stop the job first.",
RUNNING_GLOBAL_SPLIT_CNT.get(),
is(0)
);
assertThat(
"Global reduce job is running. Can not clear global state. Please, stop the job first.",
RUNNING_GLOBAL_REDUCE_CNT.get(),
is(0)
);
GLOBAL_SIGNALS.clear();
GLOBAL_CHANNEL.clear();
}
/**
* Signals that are sent by test code to the tasks.
*/
private enum Signal {
/**
* Signal to the task to continue running and send ACK as a response.
*/
CONTINUE,
/**
* Ask task to throw an exception.
*/
THROW,
/**
* Ask split method to return a list of jobs which will be executed on all nodes.
*/
SPLIT_RETURN_ALL_NODES,
/**
* Ask reduce method to return a concatenation of jobs results.
*/
REDUCE_RETURN
}
private static Signal listenSignal() {
try {
return GLOBAL_SIGNALS.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* If any of the args are strings, convert them to signals and offer them to the job.
*
* @param args Job args.
*/
private static void offerArgsAsSignals(Object... args) {
for (Object arg : args) {
if (arg instanceof String) {
String signal = (String) arg;
try {
GLOBAL_SIGNALS.offer(Signal.valueOf(signal));
} catch (IllegalArgumentException ignored) {
// Ignore non-signal strings
}
}
}
}
/**
* Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
*/
private static class GlobalInteractiveMapReduceTask implements MapReduceTask<List<String>> {
@Override
public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) {
RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
offerArgsAsSignals(args);
try {
while (true) {
Signal receivedSignal = listenSignal();
switch (receivedSignal) {
case THROW:
throw new RuntimeException();
case CONTINUE:
GLOBAL_CHANNEL.offer(ACK);
break;
case SPLIT_RETURN_ALL_NODES:
return context.ignite().clusterNodes().stream().map(node ->
ComputeJobRunner.builder()
.jobClassName(InteractiveJobs.interactiveJobName())
.nodes(Set.of(node))
.build()
).collect(toList());
default:
throw new IllegalStateException("Unexpected value: " + receivedSignal);
}
}
} finally {
RUNNING_GLOBAL_SPLIT_CNT.decrementAndGet();
}
}
@Override
public List<String> reduce(Map<UUID, ?> results) {
RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
try {
while (true) {
Signal receivedSignal = listenSignal();
switch (receivedSignal) {
case THROW:
throw new RuntimeException();
case CONTINUE:
GLOBAL_CHANNEL.offer(ACK);
break;
case REDUCE_RETURN:
return results.values().stream()
.map(String.class::cast)
.collect(toList());
default:
throw new IllegalStateException("Unexpected value: " + receivedSignal);
}
}
} finally {
RUNNING_GLOBAL_REDUCE_CNT.decrementAndGet();
}
}
}
/**
* API for the interaction with {@link GlobalInteractiveMapReduceTask}.
*/
public static final class GlobalApi {
/**
* Checks that {@link GlobalInteractiveMapReduceTask} is alive.
*/
public static void assertAlive() throws InterruptedException {
GLOBAL_SIGNALS.offer(Signal.CONTINUE);
assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), equalTo(ACK));
}
/**
* Finishes the split job, returning job runners for all nodes.
*/
public static void finishSplit() {
GLOBAL_SIGNALS.offer(Signal.SPLIT_RETURN_ALL_NODES);
}
/**
* Finishes the split job, returning job runners for all nodes.
*/
public static void throwException() {
GLOBAL_SIGNALS.offer(Signal.THROW);
}
/**
* Finishes the reduce job, returning final result.
*/
public static void finishReduce() {
GLOBAL_SIGNALS.offer(Signal.REDUCE_RETURN);
}
public static String name() {
return GlobalInteractiveMapReduceTask.class.getName();
}
}
}