blob: e82e9b3c3eca7585b27b722a0636e0bac522e8a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.OutputBlockedInvokable;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
import static org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.newBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
/**
* Tests for submission logic of the {@link TaskExecutor}.
*/
public class TaskExecutorSubmissionTest extends TestLogger {
@Rule
public final TestName testName = new TestName();
private static final Time timeout = Time.milliseconds(10000L);
private JobID jobId = new JobID();
private MetricRegistryImpl metricRegistry;
private TestingRpcService rpcService;
private String metricQueryServiceAddress;
@Before
public void setup() {
rpcService = new TestingRpcService();
metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
metricRegistry.startQueryService(rpcService, new ResourceID("mqs"));
metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
}
@After
public void teardown() throws ExecutionException, InterruptedException {
if (rpcService != null) {
rpcService.stopService().get();
}
if (metricRegistry != null) {
metricRegistry.shutdown().get();
}
}
/**
* Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
*/
@Test(timeout = 10000L)
public void testTaskSubmission() throws Exception {
final ExecutionAttemptID eid = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, FutureCompletingInvokable.class);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(1)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
}
}
/**
* Tests that the TaskManager sends a proper exception back to the sender if the submit task
* message fails.
*/
@Test(timeout = 10000L)
public void testSubmitTaskFailure() throws Exception {
final ExecutionAttemptID eid = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor(
"test task",
eid,
BlockingNoOpInvokable.class,
0); // this will make the submission fail because the number of key groups must be >= 1
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
} catch (Exception e) {
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
}
}
/**
* Tests that we can cancel the task of the TaskManager given that we've submitted it.
*/
@Test(timeout = 10000L)
public void testTaskSubmissionAndCancelling() throws Exception {
final ExecutionAttemptID eid1 = new ExecutionAttemptID();
final ExecutionAttemptID eid2 = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd1 = createTestTaskDeploymentDescriptor("test task", eid1, BlockingNoOpInvokable.class);
final TaskDeploymentDescriptor tdd2 = createTestTaskDeploymentDescriptor("test task", eid2, BlockingNoOpInvokable.class);
final CompletableFuture<Void> task1RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task1CanceledFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(2)
.addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture)
.addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture)
.addTaskManagerActionListener(eid1, ExecutionState.CANCELED, task1CanceledFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
task1RunningFuture.get();
taskSlotTable.allocateSlot(1, jobId, tdd2.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd2, env.getJobMasterId(), timeout).get();
task2RunningFuture.get();
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.RUNNING);
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.RUNNING);
tmGateway.cancelTask(eid1, timeout);
task1CanceledFuture.get();
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.CANCELED);
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.RUNNING);
}
}
/**
* Tests that submitted tasks will fail when attempting to send/receive data if no
* ResultPartitions/InputGates are set up.
*/
@Test(timeout = 10000L)
public void testGateChannelEdgeMismatch() throws Exception {
final ExecutionAttemptID eid1 = new ExecutionAttemptID();
final ExecutionAttemptID eid2 = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd1 =
createTestTaskDeploymentDescriptor("Sender", eid1, TestingAbstractInvokables.Sender.class);
final TaskDeploymentDescriptor tdd2 =
createTestTaskDeploymentDescriptor("Receiver", eid2, TestingAbstractInvokables.Receiver.class);
final CompletableFuture<Void> task1RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task1FailedFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2FailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture)
.addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture)
.addTaskManagerActionListener(eid1, ExecutionState.FAILED, task1FailedFuture)
.addTaskManagerActionListener(eid2, ExecutionState.FAILED, task2FailedFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.setSlotSize(2)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
task1RunningFuture.get();
taskSlotTable.allocateSlot(1, jobId, tdd2.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd2, env.getJobMasterId(), timeout).get();
task2RunningFuture.get();
task1FailedFuture.get();
task2FailedFuture.get();
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.FAILED);
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.FAILED);
}
}
@Test(timeout = 10000L)
public void testRunJobWithForwardChannel() throws Exception {
ResourceID producerLocation = ResourceID.generate();
NettyShuffleDescriptor sdd =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
TaskDeploymentDescriptor tdd1 = createSender(sdd);
TaskDeploymentDescriptor tdd2 = createReceiver(sdd);
ExecutionAttemptID eid1 = tdd1.getExecutionAttemptId();
ExecutionAttemptID eid2 = tdd2.getExecutionAttemptId();
final CompletableFuture<Void> task1RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task1FinishedFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2FinishedFuture = new CompletableFuture<>();
final JobMasterId jobMasterId = JobMasterId.generate();
TestingJobMasterGateway testingJobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setFencingTokenSupplier(() -> jobMasterId)
.setScheduleOrUpdateConsumersFunction(
resultPartitionID -> CompletableFuture.completedFuture(Acknowledge.get()))
.build();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setResourceID(producerLocation)
.setSlotSize(2)
.addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture)
.addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture)
.addTaskManagerActionListener(eid1, ExecutionState.FINISHED, task1FinishedFuture)
.addTaskManagerActionListener(eid2, ExecutionState.FINISHED, task2FinishedFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.setJobMasterId(jobMasterId)
.setJobMasterGateway(testingJobMasterGateway)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
task1RunningFuture.get();
taskSlotTable.allocateSlot(1, jobId, tdd2.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
task2RunningFuture.get();
task1FinishedFuture.get();
task2FinishedFuture.get();
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.FINISHED);
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.FINISHED);
}
}
/**
* This tests creates two tasks. The sender sends data but fails to send the
* state update back to the job manager.
* the second one blocks to be canceled
*/
@Test(timeout = 10000L)
public void testCancellingDependentAndStateUpdateFails() throws Exception {
ResourceID producerLocation = ResourceID.generate();
NettyShuffleDescriptor sdd =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
TaskDeploymentDescriptor tdd1 = createSender(sdd);
TaskDeploymentDescriptor tdd2 = createReceiver(sdd);
ExecutionAttemptID eid1 = tdd1.getExecutionAttemptId();
ExecutionAttemptID eid2 = tdd2.getExecutionAttemptId();
final CompletableFuture<Void> task1RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2RunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> task1FailedFuture = new CompletableFuture<>();
final CompletableFuture<Void> task2CanceledFuture = new CompletableFuture<>();
final JobMasterId jobMasterId = JobMasterId.generate();
TestingJobMasterGateway testingJobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setFencingTokenSupplier(() -> jobMasterId)
.setUpdateTaskExecutionStateFunction(taskExecutionState -> {
if (taskExecutionState != null && taskExecutionState.getID().equals(eid1)) {
return FutureUtils.completedExceptionally(
new ExecutionGraphException("The execution attempt " + eid2 + " was not found."));
} else {
return CompletableFuture.completedFuture(Acknowledge.get());
}
})
.build();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setResourceID(producerLocation)
.setSlotSize(2)
.addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture)
.addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture)
.addTaskManagerActionListener(eid1, ExecutionState.FAILED, task1FailedFuture)
.addTaskManagerActionListener(eid2, ExecutionState.CANCELED, task2CanceledFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.setJobMasterId(jobMasterId)
.setJobMasterGateway(testingJobMasterGateway)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
task1RunningFuture.get();
taskSlotTable.allocateSlot(1, jobId, tdd2.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
task2RunningFuture.get();
task1FailedFuture.get();
assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.FAILED);
tmGateway.cancelTask(eid2, timeout);
task2CanceledFuture.get();
assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.CANCELED);
}
}
/**
* Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = 10000L)
public void testRemotePartitionNotFound() throws Exception {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
// Remote location (on the same TM though) for the partition
NettyShuffleDescriptor sdd =
NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
TaskDeploymentDescriptor tdd = createReceiver(sdd);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(2)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.setConfiguration(config)
.setLocalCommunication(false)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
/**
* Tests that the TaskManager fails the task if the partition update fails.
*/
@Test
public void testUpdateTaskInputPartitionsFailure() throws Exception {
final ExecutionAttemptID eid = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, BlockingNoOpInvokable.class);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
final ShuffleEnvironment<?, ?> shuffleEnvironment = mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS);
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setShuffleEnvironment(shuffleEnvironment)
.setSlotSize(1)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
final ResourceID producerLocation = env.getTaskExecutor().getResourceID();
NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
final PartitionInfo partitionUpdate = new PartitionInfo(new IntermediateDataSetID(), shuffleDescriptor);
doThrow(new IOException()).when(shuffleEnvironment).updatePartitionInfo(eid, partitionUpdate);
final CompletableFuture<Acknowledge> updateFuture = tmGateway.updatePartitions(
eid,
Collections.singletonList(partitionUpdate),
timeout);
updateFuture.get();
taskFailedFuture.get();
Task task = taskSlotTable.getTask(tdd.getExecutionAttemptId());
assertThat(task.getExecutionState(), is(ExecutionState.FAILED));
assertThat(task.getFailureCause(), instanceOf(IOException.class));
}
}
/**
* Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test(timeout = 10000L)
public void testLocalPartitionNotFound() throws Exception {
ResourceID producerLocation = ResourceID.generate();
NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
TaskDeploymentDescriptor tdd = createReceiver(shuffleDescriptor);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
Configuration config = new Configuration();
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setResourceID(producerLocation)
.setSlotSize(1)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.setConfiguration(config)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
taskFailedFuture.get();
assertSame(taskSlotTable.getTask(eid).getExecutionState(), ExecutionState.FAILED);
assertThat(taskSlotTable.getTask(eid).getFailureCause(), instanceOf(PartitionNotFoundException.class));
}
}
/**
* Test that a failing schedule or update consumers call leads to the failing of the respective
* task.
*
* <p>IMPORTANT: We have to make sure that the invokable's cancel method is called, because only
* then the future is completed. We do this by not eagerly deploying consumer tasks and requiring
* the invokable to fill one memory segment. The completed memory segment will trigger the
* scheduling of the downstream operator since it is in pipeline mode. After we've filled the
* memory segment, we'll block the invokable and wait for the task failure due to the failed
* schedule or update consumers call.
*/
@Test(timeout = 10000L)
public void testFailingScheduleOrUpdateConsumers() throws Exception {
final Configuration configuration = new Configuration();
// set the memory segment to the smallest size possible, because we have to fill one
// memory buffer to trigger the schedule or update consumers message to the downstream
// operators
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
NettyShuffleDescriptor sdd =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
TaskDeploymentDescriptor tdd = createSender(sdd, TestingAbstractInvokables.TestInvokableRecordCancel.class);
ExecutionAttemptID eid = tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final Exception exception = new Exception("Failed schedule or update consumers");
final JobMasterId jobMasterId = JobMasterId.generate();
TestingJobMasterGateway testingJobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setFencingTokenSupplier(() -> jobMasterId)
.setUpdateTaskExecutionStateFunction(resultPartitionID -> FutureUtils.completedExceptionally(exception))
.build();
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(1)
.setConfiguration(configuration)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.setJobMasterId(jobMasterId)
.setJobMasterGateway(testingJobMasterGateway)
.useRealNonMockShuffleEnvironment()
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, jobMasterId, timeout).get();
taskRunningFuture.get();
CompletableFuture<Boolean> cancelFuture = TestingAbstractInvokables.TestInvokableRecordCancel.gotCanceled();
assertTrue(cancelFuture.get());
assertTrue(ExceptionUtils.findThrowableWithMessage(taskSlotTable.getTask(eid).getFailureCause(), exception.getMessage()).isPresent());
}
}
// ------------------------------------------------------------------------
// Back pressure request
// ------------------------------------------------------------------------
/**
* Tests request of task back pressure.
*/
@Test(timeout = 20000L)
public void testRequestTaskBackPressure() throws Exception {
final NettyShuffleDescriptor shuffleDescriptor = newBuilder().buildLocal();
final TaskDeploymentDescriptor tdd = createSender(shuffleDescriptor, OutputBlockedInvokable.class);
final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskCanceledFuture = new CompletableFuture<>();
final Configuration configuration = new Configuration();
configuration.set(WebOptions.BACKPRESSURE_NUM_SAMPLES, 40);
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
try (final TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId)
.setSlotSize(1)
.setMetricQueryServiceAddress(metricQueryServiceAddress)
.setConfiguration(configuration)
.useRealNonMockShuffleEnvironment()
.addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(executionAttemptID, ExecutionState.CANCELED, taskCanceledFuture)
.build()) {
final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
final TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
// 1) trigger request for non-existing task.
final int requestId = 1234;
final ExecutionAttemptID nonExistTaskEid = new ExecutionAttemptID();
final CompletableFuture<TaskBackPressureResponse> failedRequestFuture =
tmGateway.requestTaskBackPressure(nonExistTaskEid, requestId, timeout);
try {
failedRequestFuture.get();
} catch (Exception e) {
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), startsWith("Cannot request back pressure"));
}
// 2) trigger request for the blocking task.
double backPressureRatio = 0;
for (int i = 0; i < 5; ++i) {
CompletableFuture<TaskBackPressureResponse> successfulRequestFuture =
tmGateway.requestTaskBackPressure(executionAttemptID, i, timeout);
TaskBackPressureResponse response = successfulRequestFuture.get();
assertEquals(response.getRequestId(), i);
assertEquals(response.getExecutionAttemptID(), executionAttemptID);
if ((backPressureRatio = response.getBackPressureRatio()) >= 1.0) {
break;
}
}
assertEquals("Task was not back pressured in given time.", 1.0, backPressureRatio, 0.0);
// 3) trigger request for the blocking task, but cancel it before request finishes.
final int sleepTime = 1000;
CompletableFuture<TaskBackPressureResponse> canceledRequestFuture =
tmGateway.requestTaskBackPressure(executionAttemptID, requestId, timeout);
Thread.sleep(sleepTime);
tmGateway.cancelTask(executionAttemptID, timeout);
taskCanceledFuture.get();
TaskBackPressureResponse responseAfterCancel = canceledRequestFuture.get();
assertEquals(executionAttemptID, responseAfterCancel.getExecutionAttemptID());
assertEquals(requestId, responseAfterCancel.getRequestId());
assertTrue(responseAfterCancel.getBackPressureRatio() > 0);
}
}
private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor shuffleDescriptor) throws IOException {
return createSender(shuffleDescriptor, TestingAbstractInvokables.Sender.class);
}
private TaskDeploymentDescriptor createSender(
NettyShuffleDescriptor shuffleDescriptor,
Class<? extends AbstractInvokable> abstractInvokable) throws IOException {
PartitionDescriptor partitionDescriptor = PartitionDescriptorBuilder
.newBuilder()
.setPartitionId(shuffleDescriptor.getResultPartitionID().getPartitionId())
.build();
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
partitionDescriptor,
shuffleDescriptor,
1,
true);
return createTestTaskDeploymentDescriptor(
"Sender",
shuffleDescriptor.getResultPartitionID().getProducerId(),
abstractInvokable,
1,
Collections.singletonList(resultPartitionDeploymentDescriptor),
Collections.emptyList());
}
private TaskDeploymentDescriptor createReceiver(NettyShuffleDescriptor shuffleDescriptor) throws IOException {
InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
new ShuffleDescriptor[] {shuffleDescriptor});
return createTestTaskDeploymentDescriptor(
"Receiver",
new ExecutionAttemptID(),
TestingAbstractInvokables.Receiver.class,
1,
Collections.emptyList(),
Collections.singletonList(inputGateDeploymentDescriptor));
}
private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
String taskName,
ExecutionAttemptID eid,
Class<? extends AbstractInvokable> abstractInvokable
) throws IOException {
return createTestTaskDeploymentDescriptor(taskName, eid, abstractInvokable, 1);
}
private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
String taskName,
ExecutionAttemptID eid,
Class<? extends AbstractInvokable> abstractInvokable,
int maxNumberOfSubtasks
) throws IOException {
return createTestTaskDeploymentDescriptor(taskName,
eid,
abstractInvokable,
maxNumberOfSubtasks,
Collections.emptyList(),
Collections.emptyList());
}
private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
String taskName,
ExecutionAttemptID eid,
Class<? extends AbstractInvokable> abstractInvokable,
int maxNumberOfSubtasks,
Collection<ResultPartitionDeploymentDescriptor> producedPartitions,
Collection<InputGateDeploymentDescriptor> inputGates
) throws IOException {
Preconditions.checkNotNull(producedPartitions);
Preconditions.checkNotNull(inputGates);
return createTaskDeploymentDescriptor(
jobId, testName.getMethodName(), eid,
new SerializedValue<>(new ExecutionConfig()), taskName, maxNumberOfSubtasks, 0, 1, 0,
new Configuration(), new Configuration(), abstractInvokable.getName(),
producedPartitions,
inputGates,
Collections.emptyList(),
Collections.emptyList(),
0);
}
static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
JobID jobId,
String jobName,
ExecutionAttemptID executionAttemptId,
SerializedValue<ExecutionConfig> serializedExecutionConfig,
String taskName,
int maxNumberOfSubtasks,
int subtaskIndex,
int numberOfSubtasks,
int attemptNumber,
Configuration jobConfiguration,
Configuration taskConfiguration,
String invokableClassName,
Collection<ResultPartitionDeploymentDescriptor> producedPartitions,
Collection<InputGateDeploymentDescriptor> inputGates,
Collection<PermanentBlobKey> requiredJarFiles,
Collection<URL> requiredClasspaths,
int targetSlotNumber) throws IOException {
JobInformation jobInformation = new JobInformation(
jobId,
jobName,
serializedExecutionConfig,
jobConfiguration,
requiredJarFiles,
requiredClasspaths);
TaskInformation taskInformation = new TaskInformation(
new JobVertexID(),
taskName,
numberOfSubtasks,
maxNumberOfSubtasks,
invokableClassName,
taskConfiguration);
SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
return new TaskDeploymentDescriptor(
jobId,
new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
executionAttemptId,
new AllocationID(),
subtaskIndex,
attemptNumber,
targetSlotNumber,
null,
producedPartitions,
inputGates);
}
/**
* Test invokable which completes the given future when executed.
*/
public static class FutureCompletingInvokable extends AbstractInvokable {
static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();
public FutureCompletingInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
COMPLETABLE_FUTURE.complete(true);
}
}
}