blob: 658eb58f0fb91027e6dfbc150d9906cea64abb36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.autoscaler.standalone.flinkcluster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.types.Either;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.fail;
/** Test for {@link FlinkClusterJobListFetcher}. */
class FlinkClusterJobListFetcherTest {
/** Test whether the job list and confs are expected. */
@Test
void testFetchJobListAndConfigurationInfo() throws Exception {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
var job2 =
new JobStatusMessage(
new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
Configuration expectedConf1 = new Configuration();
expectedConf1.setString("option_key1", "option_value1");
Configuration expectedConf2 = new Configuration();
expectedConf2.setString("option_key2", "option_value2");
expectedConf2.setString("option_key3", "option_value3");
var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
Either.Left(List.of(job1, job2)),
Either.Left(
Map.of(
job1.getJobId(),
ConfigurationInfo.from(expectedConf1),
job2.getJobId(),
ConfigurationInfo.from(expectedConf2))),
closeCounter),
Duration.ofSeconds(10));
// Fetch multiple times and check whether the results are as expected each time
for (int i = 1; i <= 3; i++) {
var fetchedJobList = jobListFetcher.fetch();
// Check whether rest client is closed.
assertThat(closeCounter).hasValue(i);
assertThat(fetchedJobList).hasSize(2);
for (var jobContext : fetchedJobList) {
JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
Configuration expectedConf = configurations.get(jobContext.getJobID());
assertThat(expectedJobStatusMessage).isNotNull();
assertThat(jobContext.getJobStatus())
.isEqualTo(expectedJobStatusMessage.getJobState());
assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
}
}
}
/**
* Test whether the exception is expected after rest client fetches job list throws exception,
* and restClient can be closed normally.
*/
@Test
void testFetchJobListException() {
var expectedException = new RuntimeException("Expected exception.");
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
Either.Right(expectedException),
Either.Left(Map.of()),
closeCounter),
Duration.ofSeconds(10));
assertThatThrownBy(jobListFetcher::fetch).getCause().isEqualTo(expectedException);
assertThat(closeCounter).hasValue(1);
}
/**
* Test whether the exception is expected after rest client fetches conf throws exception, and
* restClient can be closed normally.
*/
@Test
void testFetchConfigurationException() {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
var expectedException = new RuntimeException("Expected exception.");
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
Either.Left(List.of(job1)),
Either.Right(expectedException),
closeCounter),
Duration.ofSeconds(10));
assertThatThrownBy(jobListFetcher::fetch).getRootCause().isEqualTo(expectedException);
assertThat(closeCounter).hasValue(1);
}
/**
* Test whether the exception is expected after rest client fetches job list timeout, and
* restClient can be closed normally.
*/
@Test
void testFetchJobListTimeout() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getTimeoutableRestClusterClient(null, null, closeFuture),
Duration.ofSeconds(2));
assertThat(closeFuture).isNotDone();
assertThatThrownBy(jobListFetcher::fetch).isInstanceOf(TimeoutException.class);
assertThat(closeFuture).isDone();
}
/**
* Test whether the exception is expected after rest client fetches conf timeout, and restClient
* can be closed normally.
*/
@Test
void testFetchConfigurationTimeout() {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
Duration.ofSeconds(2));
assertThat(closeFuture).isNotDone();
assertThatThrownBy(jobListFetcher::fetch)
.getRootCause()
.isInstanceOf(TimeoutException.class);
assertThat(closeFuture).isDone();
}
/**
* @param jobListOrException When listJobs is called, return jobList if Either is left, return
* failedFuture if Either is right.
* @param configurationsOrException When fetch job conf, return configuration if Either is left,
* return failedFuture if Either is right.
* @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
* called
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getRestClusterClient(
Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
AtomicLong closeCounter) {
return conf ->
new RestClusterClient<>(
conf,
"test-cluster",
(c, e) -> new StandaloneClientHAServices("localhost")) {
@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
if (jobListOrException.isLeft()) {
return CompletableFuture.completedFuture(jobListOrException.left());
}
return CompletableFuture.failedFuture(jobListOrException.right());
}
@Override
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(M h, U p, R r) {
if (h instanceof JobManagerJobConfigurationHeaders) {
if (configurationsOrException.isRight()) {
return CompletableFuture.failedFuture(
configurationsOrException.right());
}
var jobID = ((JobMessageParameters) p).jobPathParameter.getValue();
return (CompletableFuture<P>)
CompletableFuture.completedFuture(
configurationsOrException.left().get(jobID));
}
fail("Unknown request");
return null;
}
@Override
public void close() {
super.close();
closeCounter.incrementAndGet();
}
};
}
/**
* @param jobList When listJobs is called, return jobList if it's not null, don't complete
* future if it's null.
* @param configuration When fetch job conf, return configuration if it's not null, don't
* complete future if it's null.
* @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called.
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getTimeoutableRestClusterClient(
@Nullable Collection<JobStatusMessage> jobList,
@Nullable ConfigurationInfo configuration,
CompletableFuture<Void> closeFuture) {
return conf ->
new RestClusterClient<>(
conf,
"test-cluster",
(c, e) -> new StandaloneClientHAServices("localhost")) {
@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
if (jobList == null) {
return new CompletableFuture<>();
}
return CompletableFuture.completedFuture(jobList);
}
@Override
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(M h, U p, R r) {
if (h instanceof JobManagerJobConfigurationHeaders) {
if (configuration == null) {
return new CompletableFuture<>();
}
return (CompletableFuture<P>)
CompletableFuture.completedFuture(configuration);
}
fail("Unknown request");
return null;
}
@Override
public void close() {
super.close();
closeFuture.complete(null);
}
};
}
}