| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.kubernetes.operator.service; |
| |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.tuple.Tuple3; |
| import org.apache.flink.api.java.tuple.Tuple4; |
| import org.apache.flink.client.program.rest.RestClusterClient; |
| import org.apache.flink.configuration.CheckpointingOptions; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.JobManagerOptions; |
| import org.apache.flink.configuration.MemorySize; |
| import org.apache.flink.configuration.TaskManagerOptions; |
| import org.apache.flink.core.execution.SavepointFormatType; |
| import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; |
| import org.apache.flink.kubernetes.operator.TestUtils; |
| import org.apache.flink.kubernetes.operator.TestingClusterClient; |
| import org.apache.flink.kubernetes.operator.TestingRestClient; |
| import org.apache.flink.kubernetes.operator.api.FlinkDeployment; |
| import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; |
| import org.apache.flink.kubernetes.operator.api.spec.JobSpec; |
| import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; |
| import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; |
| import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; |
| import org.apache.flink.kubernetes.operator.api.status.JobStatus; |
| import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; |
| import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; |
| import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; |
| import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; |
| import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; |
| import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; |
| import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException; |
| import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; |
| import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; |
| import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; |
| import org.apache.flink.runtime.clusterframework.types.ResourceID; |
| import org.apache.flink.runtime.clusterframework.types.ResourceProfile; |
| import org.apache.flink.runtime.execution.ExecutionState; |
| import org.apache.flink.runtime.instance.HardwareDescription; |
| import org.apache.flink.runtime.messages.Acknowledge; |
| import org.apache.flink.runtime.messages.webmonitor.JobDetails; |
| import org.apache.flink.runtime.rest.RestClient; |
| import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; |
| import org.apache.flink.runtime.rest.handler.async.TriggerResponse; |
| import org.apache.flink.runtime.rest.messages.DashboardConfiguration; |
| import org.apache.flink.runtime.rest.messages.MessageHeaders; |
| import org.apache.flink.runtime.rest.messages.MessageParameters; |
| import org.apache.flink.runtime.rest.messages.RequestBody; |
| import org.apache.flink.runtime.rest.messages.ResponseBody; |
| import org.apache.flink.runtime.rest.messages.TriggerId; |
| import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; |
| import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters; |
| import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.metrics.Metric; |
| import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; |
| import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; |
| import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; |
| import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; |
| import org.apache.flink.runtime.rest.util.RestMapperUtils; |
| import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; |
| import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; |
| import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; |
| import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; |
| import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; |
| import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; |
| import org.apache.flink.util.ConfigurationException; |
| import org.apache.flink.util.FlinkRuntimeException; |
| import org.apache.flink.util.SerializedThrowable; |
| import org.apache.flink.util.concurrent.Executors; |
| import org.apache.flink.util.function.TriFunction; |
| |
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
| |
| import io.fabric8.kubernetes.api.model.DeletionPropagation; |
| import io.fabric8.kubernetes.api.model.ListMeta; |
| import io.fabric8.kubernetes.api.model.ObjectMeta; |
| import io.fabric8.kubernetes.api.model.PodList; |
| import io.fabric8.kubernetes.api.model.WatchEvent; |
| import io.fabric8.kubernetes.api.model.apps.Deployment; |
| import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; |
| import io.fabric8.kubernetes.api.model.apps.DeploymentList; |
| import io.fabric8.kubernetes.api.model.apps.DeploymentListBuilder; |
| import io.fabric8.kubernetes.client.KubernetesClient; |
| import io.fabric8.kubernetes.client.KubernetesClientException; |
| import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; |
| import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; |
| import org.junit.jupiter.api.Assertions; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.junit.jupiter.params.provider.ValueSource; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.HttpURLConnection; |
| import java.net.ServerSocket; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.flink.kubernetes.operator.api.status.SavepointFormatType.NATIVE; |
| import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; |
| import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE; |
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| /** |
| * @link FlinkService unit tests |
| */ |
| @EnableKubernetesMockClient(crud = true) |
| public class AbstractFlinkServiceTest { |
| |
| @TempDir Path tempDir; |
| File testJar; |
| |
| private KubernetesClient client; |
| private KubernetesMockServer mockServer; |
| private final Configuration configuration = new Configuration(); |
| |
| private final FlinkConfigManager configManager = new FlinkConfigManager(configuration); |
| private FlinkOperatorConfiguration operatorConfig; |
| private ExecutorService executorService; |
| |
| private ArtifactManager artifactManager; |
| |
| @BeforeEach |
| public void setup() { |
| configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME); |
| configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE); |
| configuration.set(FLINK_VERSION, FlinkVersion.v1_18); |
| operatorConfig = FlinkOperatorConfiguration.fromConfiguration(configuration); |
| executorService = Executors.newDirectExecutorService(); |
| testJar = tempDir.resolve("test.jar").toFile(); |
| artifactManager = |
| new ArtifactManager(configManager) { |
| @Override |
| public File fetch( |
| String jarURI, Configuration flinkConfiguration, String targetDirStr) |
| throws IOException { |
| Files.writeString(testJar.toPath(), "test"); |
| return testJar; |
| } |
| }; |
| } |
| |
| @ParameterizedTest |
| @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") |
| public void sessionJobSubmissionTest(FlinkVersion flinkVersion) throws Exception { |
| var jarRuns = new ArrayList<JarRunRequestBody>(); |
| var flinkService = |
| getTestingService( |
| (h, p, b) -> { |
| if (b instanceof JarRunRequestBody) { |
| jarRuns.add((JarRunRequestBody) b); |
| return CompletableFuture.completedFuture(null); |
| } else if (h instanceof JarUploadHeaders) { |
| return CompletableFuture.completedFuture( |
| new JarUploadResponseBody("test")); |
| } else if (h instanceof JarDeleteHeaders) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| throw new UnsupportedOperationException("Unknown request"); |
| }); |
| var session = TestUtils.buildSessionCluster(flinkVersion); |
| session.getStatus() |
| .getReconciliationStatus() |
| .serializeAndSetLastReconciledSpec(session.getSpec(), session); |
| |
| var job = TestUtils.buildSessionJob(); |
| var deployConf = configManager.getSessionJobConfig(session, job.getSpec()); |
| flinkService.submitJobToSessionCluster( |
| job.getMetadata(), job.getSpec(), JobID.generate(), deployConf, null); |
| |
| // Make sure that deploy conf was passed to jar run |
| if (flinkVersion.isEqualOrNewer(FlinkVersion.v1_17)) { |
| assertEquals(deployConf.toMap(), jarRuns.get(0).getFlinkConfiguration().toMap()); |
| } else { |
| assertTrue(jarRuns.get(0).getFlinkConfiguration().toMap().isEmpty()); |
| } |
| } |
| |
| @Test |
| public void jarRunErrorHandlingTest() throws Exception { |
| List<JarRunRequestBody> jarRuns = new ArrayList<>(); |
| AtomicBoolean deleted = new AtomicBoolean(false); |
| var flinkService = |
| getTestingService( |
| (h, p, b) -> { |
| if (b instanceof JarRunRequestBody) { |
| jarRuns.add((JarRunRequestBody) b); |
| return CompletableFuture.failedFuture( |
| new Exception("RunException")); |
| } else if (h instanceof JarDeleteHeaders) { |
| deleted.set(true); |
| return CompletableFuture.failedFuture( |
| new Exception("DeleteException")); |
| } |
| |
| fail(); |
| return null; |
| }); |
| |
| var job = TestUtils.buildSessionJob(); |
| var jobId = new JobID(); |
| |
| assertThrows( |
| FlinkRuntimeException.class, |
| () -> |
| flinkService.runJar( |
| job.getSpec().getJob(), |
| jobId, |
| new JarUploadResponseBody("test"), |
| configuration, |
| null)); |
| assertEquals(jobId, jarRuns.get(0).getJobId()); |
| assertTrue(deleted.get()); |
| } |
| |
| private TestingService getTestingService( |
| TriFunction< |
| MessageHeaders<?, ?, ?>, |
| MessageParameters, |
| RequestBody, |
| CompletableFuture<ResponseBody>> |
| requestProcessor) |
| throws Exception { |
| var testingClusterClient = new TestingClusterClient<String>(configuration); |
| testingClusterClient.setRequestProcessor(requestProcessor); |
| var testingRestClient = new TestingRestClient(configuration); |
| testingRestClient.setRequestProcessor(requestProcessor); |
| return new TestingService(testingClusterClient, testingRestClient); |
| } |
| |
| @Test |
| public void cancelJobWithStatelessUpgradeModeTest() throws Exception { |
| final TestingClusterClient<String> testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>(); |
| testingClusterClient.setCancelFunction( |
| jobID -> { |
| cancelFuture.complete(jobID); |
| return CompletableFuture.completedFuture(Acknowledge.get()); |
| }); |
| |
| var flinkService = new TestingService(testingClusterClient); |
| |
| JobID jobID = JobID.generate(); |
| FlinkDeployment deployment = TestUtils.buildApplicationCluster(); |
| JobStatus jobStatus = deployment.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); |
| |
| deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); |
| deployment.getStatus().getJobStatus().setState("RUNNING"); |
| flinkService.cancelJob( |
| deployment, |
| UpgradeMode.STATELESS, |
| configManager.getObserveConfig(deployment), |
| false); |
| assertTrue(cancelFuture.isDone()); |
| assertEquals(jobID, cancelFuture.get()); |
| assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint) |
| throws Exception { |
| var testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = |
| new CompletableFuture<>(); |
| var savepointPath = "file:///path/of/svp-1"; |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| testingClusterClient.setStopWithSavepointFunction( |
| (jobID, advanceToEndOfEventTime, savepointDir) -> { |
| stopWithSavepointFuture.complete( |
| new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir)); |
| return CompletableFuture.completedFuture(savepointPath); |
| }); |
| |
| var flinkService = new TestingService(testingClusterClient); |
| |
| JobID jobID = JobID.generate(); |
| FlinkDeployment deployment = TestUtils.buildApplicationCluster(); |
| deployment |
| .getSpec() |
| .getFlinkConfiguration() |
| .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); |
| deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); |
| JobStatus jobStatus = deployment.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); |
| ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); |
| |
| flinkService.cancelJob( |
| deployment, |
| UpgradeMode.SAVEPOINT, |
| configManager.getObserveConfig(deployment), |
| deleteAfterSavepoint); |
| assertTrue(stopWithSavepointFuture.isDone()); |
| assertEquals(jobID, stopWithSavepointFuture.get().f0); |
| assertFalse(stopWithSavepointFuture.get().f1); |
| assertEquals(savepointPath, stopWithSavepointFuture.get().f2); |
| assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); |
| |
| assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); |
| assertEquals( |
| deployment.getStatus().getJobManagerDeploymentStatus(), |
| deleteAfterSavepoint |
| ? JobManagerDeploymentStatus.MISSING |
| : JobManagerDeploymentStatus.READY); |
| if (deleteAfterSavepoint) { |
| assertEquals( |
| List.of( |
| Tuple2.of( |
| deployment.getMetadata().getNamespace(), |
| deployment.getMetadata().getName())), |
| flinkService.deleted); |
| } else { |
| assertTrue(flinkService.deleted.isEmpty()); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| public void cancelJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoint) |
| throws Exception { |
| var testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = |
| new CompletableFuture<>(); |
| var savepointPath = "file:///path/of/svp-1"; |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| |
| testingClusterClient.setStopWithSavepointFunction( |
| (jobID, advanceToEndOfEventTime, savepointDir) -> { |
| stopWithSavepointFuture.complete( |
| new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir)); |
| return CompletableFuture.completedFuture(savepointPath); |
| }); |
| |
| var flinkService = new TestingService(testingClusterClient); |
| |
| JobID jobID = JobID.generate(); |
| FlinkDeployment deployment = TestUtils.buildApplicationCluster(); |
| deployment |
| .getSpec() |
| .getFlinkConfiguration() |
| .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); |
| deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); |
| JobStatus jobStatus = deployment.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); |
| ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); |
| |
| if (drainOnSavepoint) { |
| deployment |
| .getSpec() |
| .getFlinkConfiguration() |
| .put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true"); |
| deployment |
| .getSpec() |
| .getFlinkConfiguration() |
| .put(KubernetesOperatorConfigOptions.DRAIN_ON_SAVEPOINT_DELETION.key(), "true"); |
| } |
| |
| flinkService.cancelJob( |
| deployment, |
| UpgradeMode.SAVEPOINT, |
| configManager.getObserveConfig(deployment), |
| true); |
| assertTrue(stopWithSavepointFuture.isDone()); |
| assertEquals(jobID, stopWithSavepointFuture.get().f0); |
| assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); |
| assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); |
| |
| if (drainOnSavepoint) { |
| assertTrue(stopWithSavepointFuture.get().f1); |
| } else { |
| assertFalse(stopWithSavepointFuture.get().f1); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| public void cancelSessionJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoint) |
| throws Exception { |
| var testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = |
| new CompletableFuture<>(); |
| var savepointPath = "file:///path/of/svp-1"; |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| |
| testingClusterClient.setStopWithSavepointFunction( |
| (jobID, advanceToEndOfEventTime, savepointDir) -> { |
| stopWithSavepointFuture.complete( |
| new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir)); |
| return CompletableFuture.completedFuture(savepointPath); |
| }); |
| |
| var flinkService = new TestingService(testingClusterClient); |
| |
| JobID jobID = JobID.generate(); |
| var session = TestUtils.buildSessionCluster(configuration.get(FLINK_VERSION)); |
| session.getStatus() |
| .getReconciliationStatus() |
| .serializeAndSetLastReconciledSpec(session.getSpec(), session); |
| var job = TestUtils.buildSessionJob(); |
| |
| job.getSpec() |
| .getFlinkConfiguration() |
| .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); |
| |
| JobStatus jobStatus = job.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); |
| ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration()); |
| |
| if (drainOnSavepoint) { |
| job.getSpec() |
| .getFlinkConfiguration() |
| .put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true"); |
| job.getSpec() |
| .getFlinkConfiguration() |
| .put(KubernetesOperatorConfigOptions.DRAIN_ON_SAVEPOINT_DELETION.key(), "true"); |
| } |
| var deployConf = configManager.getSessionJobConfig(session, job.getSpec()); |
| |
| flinkService.cancelSessionJob(job, UpgradeMode.SAVEPOINT, deployConf); |
| assertTrue(stopWithSavepointFuture.isDone()); |
| assertEquals(jobID, stopWithSavepointFuture.get().f0); |
| assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); |
| assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); |
| |
| if (drainOnSavepoint) { |
| assertTrue(stopWithSavepointFuture.get().f1); |
| } else { |
| assertFalse(stopWithSavepointFuture.get().f1); |
| } |
| } |
| |
| @Test |
| public void cancelJobWithLastStateUpgradeModeTest() throws Exception { |
| var deployment = TestUtils.buildApplicationCluster(); |
| ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); |
| var testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| var flinkService = new TestingService(testingClusterClient); |
| |
| JobID jobID = JobID.generate(); |
| JobStatus jobStatus = deployment.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| |
| flinkService.cancelJob( |
| deployment, |
| UpgradeMode.LAST_STATE, |
| configManager.getObserveConfig(deployment), |
| false); |
| assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); |
| } |
| |
| @Test |
| public void deletionPropagationTest() { |
| var propagation = new ArrayList<DeletionPropagation>(); |
| TestingService flinkService = |
| new TestingService(null) { |
| @Override |
| protected void deleteClusterInternal( |
| String ns, |
| String clusterId, |
| Configuration conf, |
| DeletionPropagation deletionPropagation) { |
| propagation.add(deletionPropagation); |
| } |
| }; |
| |
| flinkService.deleteClusterDeployment( |
| new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); |
| assertEquals(DeletionPropagation.FOREGROUND, propagation.get(0)); |
| |
| configuration.set( |
| KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION, |
| DeletionPropagation.BACKGROUND); |
| operatorConfig = FlinkOperatorConfiguration.fromConfiguration(configuration); |
| |
| flinkService = |
| new TestingService(null) { |
| @Override |
| protected void deleteClusterInternal( |
| String ns, |
| String clusterId, |
| Configuration conf, |
| DeletionPropagation deletionPropagation) { |
| propagation.add(deletionPropagation); |
| } |
| }; |
| flinkService.deleteClusterDeployment( |
| new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); |
| assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1)); |
| } |
| |
| @Test |
| public void triggerSavepointTest() throws Exception { |
| CompletableFuture<Tuple3<JobID, String, Boolean>> triggerSavepointFuture = |
| new CompletableFuture<>(); |
| String savepointPath = "file:///path/of/svp"; |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| var flinkService = |
| getTestingService( |
| (headers, parameters, requestBody) -> { |
| triggerSavepointFuture.complete( |
| new Tuple3<>( |
| ((SavepointTriggerMessageParameters) parameters) |
| .jobID.getValue(), |
| ((SavepointTriggerRequestBody) requestBody) |
| .getTargetDirectory() |
| .get(), |
| ((SavepointTriggerRequestBody) requestBody) |
| .isCancelJob())); |
| return CompletableFuture.completedFuture( |
| new TriggerResponse(new TriggerId())); |
| }); |
| |
| var jobID = JobID.generate(); |
| var flinkDeployment = TestUtils.buildApplicationCluster(); |
| ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new Configuration()); |
| JobStatus jobStatus = new JobStatus(); |
| jobStatus.setJobId(jobID.toString()); |
| flinkDeployment.getStatus().setJobStatus(jobStatus); |
| flinkService.triggerSavepoint( |
| flinkDeployment.getStatus().getJobStatus().getJobId(), |
| SnapshotTriggerType.MANUAL, |
| flinkDeployment.getStatus().getJobStatus().getSavepointInfo(), |
| configuration); |
| assertTrue(triggerSavepointFuture.isDone()); |
| assertEquals(jobID, triggerSavepointFuture.get().f0); |
| assertEquals(savepointPath, triggerSavepointFuture.get().f1); |
| assertFalse(triggerSavepointFuture.get().f2); |
| } |
| |
| @Test |
| public void testTriggerCheckpoint() throws Exception { |
| final CompletableFuture<JobID> triggerCheckpointFuture = new CompletableFuture<>(); |
| var flinkService = |
| getTestingService( |
| (headers, parameters, requestBody) -> { |
| triggerCheckpointFuture.complete( |
| ((CheckpointTriggerMessageParameters) parameters) |
| .jobID.getValue()); |
| return CompletableFuture.completedFuture( |
| new TriggerResponse(new TriggerId())); |
| }); |
| |
| final JobID jobID = JobID.generate(); |
| final FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); |
| ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new Configuration()); |
| JobStatus jobStatus = new JobStatus(); |
| jobStatus.setJobId(jobID.toString()); |
| flinkDeployment.getStatus().setJobStatus(jobStatus); |
| flinkService.triggerCheckpoint( |
| flinkDeployment.getStatus().getJobStatus().getJobId(), |
| SnapshotTriggerType.MANUAL, |
| flinkDeployment.getStatus().getJobStatus().getCheckpointInfo(), |
| configuration); |
| assertTrue(triggerCheckpointFuture.isDone()); |
| assertEquals(jobID, triggerCheckpointFuture.get()); |
| } |
| |
| @Test |
| public void disposeSavepointTest() throws Exception { |
| var savepointPath = "file:///path/of/svp"; |
| var tested = new AtomicBoolean(false); |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| var flinkService = |
| getTestingService( |
| (h, p, r) -> { |
| if (r instanceof SavepointDisposalRequest) { |
| var dr = (SavepointDisposalRequest) r; |
| assertEquals(savepointPath, dr.getSavepointPath()); |
| tested.set(true); |
| return CompletableFuture.completedFuture(null); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| flinkService.disposeSavepoint(savepointPath, configuration); |
| assertTrue(tested.get()); |
| } |
| |
| @Test |
| public void nativeSavepointFormatTest() throws Exception { |
| runNativeSavepointFormatTest(false); |
| } |
| |
| @Test |
| public void testSavepointCompletesButJobFailsAfterwards() throws Exception { |
| runNativeSavepointFormatTest(true); |
| } |
| |
| private void runNativeSavepointFormatTest(boolean failAfterSavepointCompletes) |
| throws Exception { |
| final TestingClusterClient<String> testingClusterClient = |
| new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); |
| final JobID jobID = JobID.generate(); |
| final String savepointPath = "file:///path/of/svp"; |
| final CompletableFuture<Tuple4<JobID, String, Boolean, SavepointFormatType>> |
| triggerSavepointFuture = new CompletableFuture<>(); |
| configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); |
| testingClusterClient.setRequestProcessor( |
| (headers, parameters, requestBody) -> { |
| triggerSavepointFuture.complete( |
| new Tuple4<>( |
| ((SavepointTriggerMessageParameters) parameters) |
| .jobID.getValue(), |
| ((SavepointTriggerRequestBody) requestBody) |
| .getTargetDirectory() |
| .get(), |
| ((SavepointTriggerRequestBody) requestBody).isCancelJob(), |
| ((SavepointTriggerRequestBody) requestBody).getFormatType())); |
| return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId())); |
| }); |
| final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>> |
| stopWithSavepointFuture = new CompletableFuture<>(); |
| testingClusterClient.setStopWithSavepointFormat( |
| (id, formatType, savepointDir) -> { |
| if (failAfterSavepointCompletes) { |
| stopWithSavepointFuture.completeExceptionally( |
| new CompletionException( |
| new SerializedThrowable( |
| new StopWithSavepointStoppingException( |
| savepointPath, jobID)))); |
| } else { |
| stopWithSavepointFuture.complete( |
| new Tuple3<>(id, formatType, savepointDir)); |
| } |
| return CompletableFuture.completedFuture(savepointPath); |
| }); |
| |
| var flinkService = new TestingService(testingClusterClient); |
| |
| final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); |
| deployment |
| .getSpec() |
| .getFlinkConfiguration() |
| .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); |
| deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); |
| JobStatus jobStatus = deployment.getStatus().getJobStatus(); |
| jobStatus.setJobId(jobID.toHexString()); |
| jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); |
| ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); |
| |
| jobStatus.setJobId(jobID.toString()); |
| deployment.getStatus().setJobStatus(jobStatus); |
| flinkService.triggerSavepoint( |
| deployment.getStatus().getJobStatus().getJobId(), |
| SnapshotTriggerType.MANUAL, |
| deployment.getStatus().getJobStatus().getSavepointInfo(), |
| new Configuration(configuration) |
| .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE)); |
| assertTrue(triggerSavepointFuture.isDone()); |
| assertEquals(jobID, triggerSavepointFuture.get().f0); |
| assertEquals(savepointPath, triggerSavepointFuture.get().f1); |
| assertFalse(triggerSavepointFuture.get().f2); |
| assertEquals(SavepointFormatType.NATIVE, triggerSavepointFuture.get().f3); |
| |
| flinkService.cancelJob( |
| deployment, |
| UpgradeMode.SAVEPOINT, |
| new Configuration(configManager.getObserveConfig(deployment)) |
| .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE), |
| false); |
| |
| assertTrue(stopWithSavepointFuture.isDone()); |
| assertEquals( |
| failAfterSavepointCompletes, stopWithSavepointFuture.isCompletedExceptionally()); |
| |
| var lastSavepoint = |
| deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint(); |
| assertEquals(NATIVE, lastSavepoint.getFormatType()); |
| assertEquals(savepointPath, lastSavepoint.getLocation()); |
| assertEquals(jobID.toHexString(), deployment.getStatus().getJobStatus().getJobId()); |
| } |
| |
| @Test |
| public void getLastCheckpointTest() throws Exception { |
| ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); |
| |
| var responseContainer = new ArrayList<CheckpointHistoryWrapper>(); |
| var flinkService = |
| getTestingService( |
| (headers, parameters, requestBody) -> { |
| if (headers instanceof CustomCheckpointingStatisticsHeaders) { |
| return CompletableFuture.completedFuture(responseContainer.get(0)); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| |
| String responseWithHistory = |
| "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},\"savepoint\":{\"className\":\"completed\",\"id\":51,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1653212748176,\"latest_ack_timestamp\":1653212748233,\"checkpointed_size\":53670,\"state_size\":53670,\"end_to_end_duration\":57,\"alignment_buffered\":0,\"processed_data\":483,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-e8ea2482ce4f\",\"discarded\":false},\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},{\"className\":\"completed\",\"id\":95,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212835603,\"latest_ack_timestamp\":1653212835622,\"checkpointed_size\":28473,\"state_size\":28473,\"end_to_end_duration\":19,\"alignment_buffered\":0,\"processed_data\":42,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-95\",\"discarded\":true},{\"className\":\"completed\",\"id\":94,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212833603,\"latest_ack_timestamp\":1653212833623,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":20,\"alignment_buffered\":0,\"processed_data\":28,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-94\",\"discarded\":true},{\"className\":\"completed\",\"id\":93,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212831603,\"latest_ack_timestamp\":1653212831621,\"checkpointed_size\":28113,\"state_size\":28113,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":138,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-93\",\"discarded\":true},{\"className\":\"completed\",\"id\":92,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212829603,\"latest_ack_timestamp\":1653212829621,\"checkpointed_size\":28293,\"state_size\":28293,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":196,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-92\",\"discarded\":true},{\"className\":\"completed\",\"id\":91,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212827603,\"latest_ack_timestamp\":1653212827629,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":26,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-91\",\"discarded\":true},{\"className\":\"completed\",\"id\":90,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212825603,\"latest_ack_timestamp\":1653212825641,\"checkpointed_size\":27735,\"state_size\":27735,\"end_to_end_duration\":38,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-90\",\"discarded\":true},{\"className\":\"completed\",\"id\":89,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212823603,\"latest_ack_timestamp\":1653212823618,\"checkpointed_size\":28545,\"state_size\":28545,\"end_to_end_duration\":15,\"alignment_buffered\":0,\"processed_data\":364,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-89\",\"discarded\":true},{\"className\":\"completed\",\"id\":88,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212821603,\"latest_ack_timestamp\":1653212821619,\"checkpointed_size\":28275,\"state_size\":28275,\"end_to_end_duration\":16,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-88\",\"discarded\":true},{\"className\":\"completed\",\"id\":87,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212819604,\"latest_ack_timestamp\":1653212819622,\"checkpointed_size\":28518,\"state_size\":28518,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-87\",\"discarded\":true}]}"; |
| String responseWithoutHistory = |
| "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[]}"; |
| String responseWithoutHistoryInternal = |
| "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"<checkpoint-not-externally-addressable>\"}},\"history\":[]}"; |
| |
| responseContainer.add( |
| objectMapper.readValue(responseWithHistory, CheckpointHistoryWrapper.class)); |
| var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); |
| assertEquals( |
| "file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96", |
| checkpointOpt.get().getLocation()); |
| |
| responseContainer.set( |
| 0, objectMapper.readValue(responseWithoutHistory, CheckpointHistoryWrapper.class)); |
| checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); |
| assertEquals( |
| "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7", |
| checkpointOpt.get().getLocation()); |
| |
| responseContainer.set( |
| 0, |
| objectMapper.readValue( |
| responseWithoutHistoryInternal, CheckpointHistoryWrapper.class)); |
| try { |
| flinkService.getLastCheckpoint(new JobID(), new Configuration()); |
| fail(); |
| } catch (RecoveryFailureException dpe) { |
| |
| } |
| } |
| |
| @Test |
| public void fetchSavepointInfoTest() throws Exception { |
| var triggerId = new TriggerId(); |
| var jobId = new JobID(); |
| var response = new AtomicReference<AsynchronousOperationResult<SavepointInfo>>(); |
| var flinkService = |
| getTestingService( |
| (h, p, r) -> { |
| if (p instanceof SavepointStatusMessageParameters) { |
| var params = (SavepointStatusMessageParameters) p; |
| assertEquals(jobId, params.jobIdPathParameter.getValue()); |
| assertEquals(triggerId, params.triggerIdPathParameter.getValue()); |
| if (response.get() == null) { |
| return CompletableFuture.failedFuture(new Exception("fail")); |
| } |
| return CompletableFuture.completedFuture(response.get()); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| |
| response.set(AsynchronousOperationResult.completed(new SavepointInfo("l", null))); |
| assertEquals( |
| SavepointFetchResult.completed("l"), |
| flinkService.fetchSavepointInfo( |
| triggerId.toString(), jobId.toString(), configuration)); |
| |
| response.set(AsynchronousOperationResult.inProgress()); |
| assertEquals( |
| SavepointFetchResult.pending(), |
| flinkService.fetchSavepointInfo( |
| triggerId.toString(), jobId.toString(), configuration)); |
| |
| response.set( |
| AsynchronousOperationResult.completed( |
| new SavepointInfo( |
| null, new SerializedThrowable(new Exception("testErr"))))); |
| assertTrue( |
| flinkService |
| .fetchSavepointInfo(triggerId.toString(), jobId.toString(), configuration) |
| .getError() |
| .contains("testErr")); |
| |
| response.set(null); |
| assertTrue( |
| flinkService |
| .fetchSavepointInfo(triggerId.toString(), jobId.toString(), configuration) |
| .getError() |
| .contains("fail")); |
| } |
| |
| @Test |
| public void fetchCheckpointInfoTest() throws Exception { |
| var triggerId = new TriggerId(); |
| var jobId = new JobID(); |
| var response = new AtomicReference<AsynchronousOperationResult<CheckpointInfo>>(); |
| var flinkService = |
| getTestingService( |
| (h, p, r) -> { |
| if (p instanceof CheckpointStatusMessageParameters) { |
| var params = (CheckpointStatusMessageParameters) p; |
| assertEquals(jobId, params.jobIdPathParameter.getValue()); |
| assertEquals(triggerId, params.triggerIdPathParameter.getValue()); |
| if (response.get() == null) { |
| return CompletableFuture.failedFuture(new Exception("fail")); |
| } |
| return CompletableFuture.completedFuture(response.get()); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| |
| response.set(AsynchronousOperationResult.completed(new CheckpointInfo(123L, null))); |
| assertEquals( |
| CheckpointFetchResult.completed(), |
| flinkService.fetchCheckpointInfo( |
| triggerId.toString(), jobId.toString(), configuration)); |
| |
| response.set(AsynchronousOperationResult.inProgress()); |
| assertEquals( |
| CheckpointFetchResult.pending(), |
| flinkService.fetchCheckpointInfo( |
| triggerId.toString(), jobId.toString(), configuration)); |
| |
| response.set( |
| AsynchronousOperationResult.completed( |
| new CheckpointInfo( |
| null, new SerializedThrowable(new Exception("testErr"))))); |
| assertTrue( |
| flinkService |
| .fetchCheckpointInfo(triggerId.toString(), jobId.toString(), configuration) |
| .getError() |
| .contains("testErr")); |
| |
| response.set(null); |
| assertTrue( |
| flinkService |
| .fetchCheckpointInfo(triggerId.toString(), jobId.toString(), configuration) |
| .getError() |
| .contains("fail")); |
| } |
| |
| @Test |
| public void removeOperatorConfigTest() { |
| var key = "kubernetes.operator.meyKey"; |
| var deployConfig = Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v")); |
| var newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig); |
| assertFalse(newConf.containsKey(key)); |
| } |
| |
| @Test |
| public void getMetricsTest() throws Exception { |
| var jobId = new JobID(); |
| var metricNames = List.of("m1", "m2"); |
| var flinkService = |
| getTestingService( |
| (h, p, r) -> { |
| if (p instanceof JobMetricsMessageParameters) { |
| var jmmp = ((JobMetricsMessageParameters) p); |
| assertEquals(jobId, jmmp.jobPathParameter.getValue()); |
| var output = |
| jmmp.metricsFilterParameter.getValue().stream() |
| .map(s -> new Metric(s, s)) |
| .collect(Collectors.toList()); |
| return CompletableFuture.completedFuture( |
| new MetricCollectionResponseBody(output)); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| assertEquals( |
| Map.of("m1", "m1", "m2", "m2"), |
| flinkService.getMetrics(configuration, jobId.toHexString(), metricNames)); |
| } |
| |
| @Test |
| public void getClusterInfoTest() throws Exception { |
| var config = new CustomDashboardConfiguration(); |
| var testVersion = "testVersion"; |
| var testRevision = "testRevision"; |
| config.setFlinkVersion(testVersion); |
| config.setFlinkRevision(testRevision); |
| |
| var tmInfo = |
| new TaskManagerInfo( |
| ResourceID.generate(), |
| "", |
| 0, |
| 0, |
| 0L, |
| 0, |
| 0, |
| ResourceProfile.UNKNOWN, |
| ResourceProfile.UNKNOWN, |
| new HardwareDescription(1, 0L, 0L, 0L), |
| new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), |
| null); |
| var tmsInfo = new TaskManagersInfo(List.of(tmInfo)); |
| |
| var flinkService = |
| getTestingService( |
| (h, p, r) -> { |
| if (h instanceof CustomDashboardConfigurationHeaders) { |
| return CompletableFuture.completedFuture(config); |
| } else if (h instanceof TaskManagersHeaders) { |
| return CompletableFuture.completedFuture(tmsInfo); |
| } |
| fail("unknown request"); |
| return null; |
| }); |
| |
| var conf = new Configuration(); |
| conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); |
| conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); |
| |
| assertEquals( |
| Map.of( |
| DashboardConfiguration.FIELD_NAME_FLINK_VERSION, |
| testVersion, |
| DashboardConfiguration.FIELD_NAME_FLINK_REVISION, |
| testRevision, |
| AbstractFlinkService.FIELD_NAME_TOTAL_CPU, |
| "2.0", |
| AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, |
| "" + MemorySize.ofMebiBytes(1000).getBytes() * 2), |
| flinkService.getClusterInfo(conf)); |
| } |
| |
| @Test |
| public void effectiveStatusTest() { |
| JobDetails allRunning = |
| getJobDetails( |
| org.apache.flink.api.common.JobStatus.RUNNING, |
| Tuple2.of(ExecutionState.RUNNING, 4)); |
| assertEquals( |
| org.apache.flink.api.common.JobStatus.RUNNING, |
| AbstractFlinkService.getEffectiveStatus(allRunning)); |
| |
| JobDetails allRunningOrFinished = |
| getJobDetails( |
| org.apache.flink.api.common.JobStatus.RUNNING, |
| Tuple2.of(ExecutionState.RUNNING, 2), |
| Tuple2.of(ExecutionState.FINISHED, 2)); |
| assertEquals( |
| org.apache.flink.api.common.JobStatus.RUNNING, |
| AbstractFlinkService.getEffectiveStatus(allRunningOrFinished)); |
| |
| JobDetails allRunningOrScheduled = |
| getJobDetails( |
| org.apache.flink.api.common.JobStatus.RUNNING, |
| Tuple2.of(ExecutionState.RUNNING, 2), |
| Tuple2.of(ExecutionState.SCHEDULED, 2)); |
| assertEquals( |
| org.apache.flink.api.common.JobStatus.CREATED, |
| AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled)); |
| |
| JobDetails allFinished = |
| getJobDetails( |
| org.apache.flink.api.common.JobStatus.FINISHED, |
| Tuple2.of(ExecutionState.FINISHED, 4)); |
| assertEquals( |
| org.apache.flink.api.common.JobStatus.FINISHED, |
| AbstractFlinkService.getEffectiveStatus(allFinished)); |
| } |
| |
| private JobDetails getJobDetails( |
| org.apache.flink.api.common.JobStatus status, |
| Tuple2<ExecutionState, Integer>... tasksPerState) { |
| int[] countPerState = new int[ExecutionState.values().length]; |
| for (var taskPerState : tasksPerState) { |
| countPerState[taskPerState.f0.ordinal()] = taskPerState.f1; |
| } |
| int numTasks = Arrays.stream(countPerState).sum(); |
| return new JobDetails( |
| new JobID(), |
| "test-job", |
| System.currentTimeMillis(), |
| -1, |
| 0, |
| status, |
| System.currentTimeMillis(), |
| countPerState, |
| numTasks); |
| } |
| |
| @Test |
| public void isJobManagerReadyTest() throws Exception { |
| AtomicReference<String> url = new AtomicReference<>(); |
| var clusterClient = |
| new TestingClusterClient<String>(configuration) { |
| @Override |
| public String getWebInterfaceURL() { |
| return url.get(); |
| } |
| }; |
| var flinkService = new TestingService(clusterClient); |
| |
| assertThrows( |
| FlinkRuntimeException.class, |
| () -> flinkService.isJobManagerPortReady(configuration)); |
| |
| int port = 6868; |
| url.set("http://127.0.0.1:" + port); |
| |
| assertFalse(flinkService.isJobManagerPortReady(configuration)); |
| try (var socket = new ServerSocket(port)) { |
| assertTrue(flinkService.isJobManagerPortReady(configuration)); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource( |
| strings = {"http://127.0.0.1:8081", "http://dev-test:8081", "http://dev-test.01:8081"}) |
| void testValidSocketAddresses(String inputAddress) throws Exception { |
| |
| var clusterClient = |
| new TestingClusterClient<String>(configuration) { |
| @Override |
| public String getWebInterfaceURL() { |
| return inputAddress; |
| } |
| }; |
| var flinkService = new TestingService(clusterClient); |
| |
| assertDoesNotThrow( |
| () -> { |
| flinkService.getSocketAddress(clusterClient); |
| }); |
| } |
| |
| @Test |
| public void testBlockingDeploymentDeletion() { |
| String deploymentName = "test-cluster"; |
| String namespace = "test-namespace"; |
| String getUrl = |
| String.format( |
| "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s", |
| namespace, deploymentName); |
| String watchUrl = |
| String.format( |
| "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", |
| namespace, deploymentName); |
| |
| var flinkService = new TestingService(null); |
| |
| Deployment deployment = |
| new DeploymentBuilder() |
| .withNewMetadata() |
| .withName(deploymentName) |
| .withNamespace(namespace) |
| .endMetadata() |
| .build(); |
| |
| DeploymentList deploymentList = |
| new DeploymentListBuilder() |
| .withMetadata(new ListMeta()) |
| .withItems(deployment) |
| .build(); |
| |
| mockServer |
| .expect() |
| .get() |
| .withPath(getUrl) |
| .andReturn(HttpURLConnection.HTTP_OK, deploymentList) |
| .always(); |
| |
| long deleteDelay = 1000; |
| mockServer |
| .expect() |
| .get() |
| .withPath(watchUrl) |
| .andUpgradeToWebSocket() |
| .open() |
| .waitFor(deleteDelay) |
| .andEmit(new WatchEvent(deployment, "DELETED")) |
| .done() |
| .always(); |
| |
| long start = System.currentTimeMillis(); |
| long remainingMillis = |
| flinkService |
| .deleteDeploymentBlocking( |
| "Test", |
| client.apps() |
| .deployments() |
| .inNamespace(namespace) |
| .withName(deploymentName), |
| DeletionPropagation.BACKGROUND, |
| Duration.ofMillis(10000)) |
| .toMillis(); |
| long deleteTime = System.currentTimeMillis() - start; |
| |
| // We make sure that delete waits until it gets the event |
| // This logic is not the best but seems to be good enough to capture the expectation |
| assertTrue(deleteTime > deleteDelay / 2); |
| assertEquals(3, mockServer.getRequestCount()); |
| assertTrue(remainingMillis > 0); |
| assertTrue(remainingMillis < 10000 - deleteDelay / 2); |
| |
| // Test actual timeout |
| remainingMillis = |
| flinkService |
| .deleteDeploymentBlocking( |
| "Test", |
| client.apps() |
| .deployments() |
| .inNamespace(namespace) |
| .withName(deploymentName), |
| DeletionPropagation.BACKGROUND, |
| Duration.ofMillis(10)) |
| .toMillis(); |
| assertEquals(0, remainingMillis); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(ints = {HttpURLConnection.HTTP_NOT_FOUND, HttpURLConnection.HTTP_BAD_REQUEST}) |
| public void testBlockingDeletionWaitErrorHandling(int errorCode) { |
| int reqCount = mockServer.getRequestCount(); |
| String deploymentName = "test-cluster"; |
| String namespace = "test-namespace"; |
| String getUrl = |
| String.format( |
| "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s", |
| namespace, deploymentName); |
| |
| // Throw error when we try to wait for deletion |
| mockServer |
| .expect() |
| .get() |
| .withPath(getUrl) |
| .andReply( |
| errorCode, |
| recordedRequest -> { |
| // Send error after a short delay |
| try { |
| Thread.sleep(10); |
| } catch (Exception e) { |
| } |
| return null; |
| }) |
| .always(); |
| |
| var remaining = |
| AbstractFlinkService.deleteBlocking( |
| "Test", |
| () -> |
| client.apps() |
| .deployments() |
| .inNamespace(namespace) |
| .withName(deploymentName), |
| Duration.ofMillis(1000)); |
| |
| assertEquals(1, mockServer.getRequestCount() - reqCount); |
| assertTrue(remaining.toMillis() < 1000); |
| } |
| |
| @Test |
| public void testBlockingDeletionDeleteCallErrorHandling() { |
| // Non not-found errors should be thrown |
| Assertions.assertThrows( |
| KubernetesClientException.class, |
| () -> |
| AbstractFlinkService.deleteBlocking( |
| "Test", |
| () -> { |
| throw new KubernetesClientException( |
| null, HttpURLConnection.HTTP_BAD_REQUEST, null); |
| }, |
| Duration.ofMillis(1000))); |
| |
| // Not found errors should be ignored |
| var remaining = |
| AbstractFlinkService.deleteBlocking( |
| "Test", |
| () -> { |
| Thread.sleep(10); |
| throw new KubernetesClientException( |
| null, HttpURLConnection.HTTP_NOT_FOUND, null); |
| }, |
| Duration.ofMillis(1000)); |
| |
| assertTrue(remaining.toMillis() > 0); |
| assertTrue(remaining.toMillis() < 1000); |
| } |
| |
| class TestingService extends AbstractFlinkService { |
| |
| RestClusterClient<String> clusterClient; |
| RestClient restClient; |
| List<Tuple2<String, String>> deleted = new ArrayList<>(); |
| |
| Map<Tuple2<String, String>, PodList> jmPods = new HashMap<>(); |
| Map<Tuple2<String, String>, PodList> tmPods = new HashMap<>(); |
| |
| TestingService(RestClusterClient<String> clusterClient) { |
| this(clusterClient, null); |
| } |
| |
| TestingService(RestClusterClient<String> clusterClient, RestClient restClient) { |
| super( |
| client, |
| AbstractFlinkServiceTest.this.artifactManager, |
| AbstractFlinkServiceTest.this.executorService, |
| AbstractFlinkServiceTest.this.operatorConfig); |
| this.clusterClient = clusterClient; |
| this.restClient = restClient; |
| } |
| |
| @Override |
| public RestClusterClient<String> getClusterClient(Configuration config) { |
| return clusterClient; |
| } |
| |
| @Override |
| protected RestClient getRestClient(Configuration conf) throws ConfigurationException { |
| return restClient; |
| } |
| |
| @Override |
| protected PodList getJmPodList(String namespace, String clusterId) { |
| return jmPods.getOrDefault(Tuple2.of(namespace, clusterId), new PodList()); |
| } |
| |
| @Override |
| protected PodList getTmPodList(String namespace, String clusterId) { |
| return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new PodList()); |
| } |
| |
| @Override |
| protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void deploySessionCluster(Configuration conf) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void cancelJob( |
| FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public boolean scale(FlinkResourceContext<?> resourceContext, Configuration conf) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| protected void deleteClusterInternal( |
| String namespace, |
| String cluserId, |
| Configuration conf, |
| DeletionPropagation deletionPropagation) { |
| deleted.add(Tuple2.of(namespace, cluserId)); |
| } |
| } |
| } |