blob: 052216d6644c27ce1ea42e56d69e1e4d441450f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the queryable-state logic of the {@link JobMaster}. */
class JobMasterQueryableStateTest {
private static final Time testingTimeout = Time.seconds(10L);
private static TestingRpcService rpcService;
private static final int PARALLELISM = 4;
private static final JobVertex JOB_VERTEX_1;
private static final JobVertex JOB_VERTEX_2;
private static final JobGraph JOB_GRAPH;
static {
JOB_VERTEX_1 = new JobVertex("v1");
JOB_VERTEX_1.setParallelism(PARALLELISM);
JOB_VERTEX_1.setInvokableClass(AbstractInvokable.class);
JOB_VERTEX_2 = new JobVertex("v2");
JOB_VERTEX_2.setParallelism(PARALLELISM);
JOB_VERTEX_2.setInvokableClass(AbstractInvokable.class);
// explicit configuration required for #testRegisterAndUnregisterKvState
JOB_VERTEX_1.setMaxParallelism(16);
JOB_VERTEX_2.setMaxParallelism(16);
final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
JOB_VERTEX_1.setSlotSharingGroup(slotSharingGroup);
JOB_VERTEX_2.setSlotSharingGroup(slotSharingGroup);
JOB_GRAPH = JobGraphTestUtils.streamingJobGraph(JOB_VERTEX_1, JOB_VERTEX_2);
JOB_GRAPH.setJobType(JobType.STREAMING);
}
@BeforeAll
private static void setupClass() {
rpcService = new TestingRpcService();
}
@AfterEach
private void teardown() throws Exception {
rpcService.clearGateways();
}
@AfterAll
private static void teardownClass() {
if (rpcService != null) {
rpcService.closeAsync();
rpcService = null;
}
}
@Test
void testRequestKvStateWithoutRegistration() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
assertThatThrownBy(
() ->
jobMasterGateway
.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown")
.get())
.satisfies(FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class));
}
}
@Test
void testRequestKvStateOfWrongJob() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
assertThatThrownBy(
() ->
jobMasterGateway
.requestKvStateLocation(new JobID(), "unknown")
.get())
.satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
}
}
@Test
void testRequestKvStateWithIrrelevantRegistration() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
// register an irrelevant KvState
assertThatThrownBy(
() ->
registerKvState(
jobMasterGateway,
new JobID(),
new JobVertexID(),
"any-name"))
.satisfies(FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class));
}
}
@Test
void testRegisterKvState() throws Exception {
try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
final String registrationName = "register-me";
final KvStateID kvStateID = new KvStateID();
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
final InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), 1029);
jobMasterGateway
.notifyKvStateRegistered(
JOB_GRAPH.getJobID(),
JOB_VERTEX_1.getID(),
keyGroupRange,
registrationName,
kvStateID,
address)
.get();
final KvStateLocation location =
jobMasterGateway
.requestKvStateLocation(JOB_GRAPH.getJobID(), registrationName)
.get();
assertThat(location.getJobId()).isEqualTo(JOB_GRAPH.getJobID());
assertThat(location.getJobVertexId()).isEqualTo(JOB_VERTEX_1.getID());
assertThat(location.getNumKeyGroups()).isEqualTo(JOB_VERTEX_1.getMaxParallelism());
assertThat(location.getNumRegisteredKeyGroups()).isOne();
assertThat(keyGroupRange.getNumberOfKeyGroups()).isOne();
assertThat(location.getKvStateID(keyGroupRange.getStartKeyGroup()))
.isEqualTo(kvStateID);
assertThat(location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()))
.isEqualTo(address);
}
}
@Test
void testUnregisterKvState() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
final String registrationName = "register-me";
final KvStateID kvStateID = new KvStateID();
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
final InetSocketAddress address =
new InetSocketAddress(InetAddress.getLocalHost(), 1029);
jobMasterGateway
.notifyKvStateRegistered(
JOB_GRAPH.getJobID(),
JOB_VERTEX_1.getID(),
keyGroupRange,
registrationName,
kvStateID,
address)
.get();
jobMasterGateway
.notifyKvStateUnregistered(
JOB_GRAPH.getJobID(),
JOB_VERTEX_1.getID(),
keyGroupRange,
registrationName)
.get();
assertThatThrownBy(
() -> {
jobMasterGateway
.requestKvStateLocation(
JOB_GRAPH.getJobID(), registrationName)
.get();
})
.as("Expected to fail with an UnknownKvStateLocation.")
.isInstanceOf(Exception.class)
.hasCauseInstanceOf(UnknownKvStateLocation.class);
}
}
@Test
void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster()) {
jobMaster.start();
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
// duplicate registration fails task
final String registrationName = "duplicate-me";
registerKvState(
jobMasterGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), registrationName);
assertThatThrownBy(
() ->
registerKvState(
jobMasterGateway,
JOB_GRAPH.getJobID(),
JOB_VERTEX_2.getID(),
registrationName))
.as("Expected to fail because of clashing registration message.")
.isInstanceOf(Exception.class)
.hasMessageContaining("Registration name clash");
assertThat(jobMasterGateway.requestJobStatus(testingTimeout).get())
.satisfies(
(Consumer<JobStatus>)
jobStatus -> {
assert jobStatus == JobStatus.FAILED
|| jobStatus == JobStatus.FAILING;
});
}
}
private static void registerSlotsRequiredForJobExecution(
JobMasterGateway jobMasterGateway, JobID jobId)
throws ExecutionException, InterruptedException {
final OneShotLatch oneTaskSubmittedLatch = new OneShotLatch();
final TaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
.setSubmitTaskConsumer(
(taskDeploymentDescriptor, jobMasterId) -> {
oneTaskSubmittedLatch.trigger();
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
rpcService,
jobMasterGateway,
jobId,
PARALLELISM,
taskExecutorGateway,
testingTimeout);
oneTaskSubmittedLatch.await();
}
private static void registerKvState(
KvStateRegistryGateway stateRegistryGateway,
JobID jobId,
JobVertexID jobVertexId,
String registrationName)
throws UnknownHostException, ExecutionException, InterruptedException {
stateRegistryGateway
.notifyKvStateRegistered(
jobId,
jobVertexId,
new KeyGroupRange(0, 0),
registrationName,
new KvStateID(),
new InetSocketAddress(InetAddress.getLocalHost(), 1233))
.get();
}
}