blob: e99424c55406d0b9cb9446b0a0144fdc0b9d91b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.autoscaler.standalone.flinkcluster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.util.function.FunctionWithException;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** Fetch JobAutoScalerContext based on flink cluster. */
public class FlinkClusterJobListFetcher
implements JobListFetcher<JobID, JobAutoScalerContext<JobID>> {
private final FunctionWithException<Configuration, RestClusterClient<String>, Exception>
restClientGetter;
private final Duration restClientTimeout;
public FlinkClusterJobListFetcher(
FunctionWithException<Configuration, RestClusterClient<String>, Exception>
restClientGetter,
Duration restClientTimeout) {
this.restClientGetter = restClientGetter;
this.restClientTimeout = restClientTimeout;
}
@Override
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
return restClusterClient
.listJobs()
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
.stream()
.map(
jobStatusMessage -> {
try {
return generateJobContext(restClusterClient, jobStatusMessage);
} catch (Throwable e) {
throw new RuntimeException(
"generateJobContext throw exception", e);
}
})
.collect(Collectors.toList());
}
}
private JobAutoScalerContext<JobID> generateJobContext(
RestClusterClient<String> restClusterClient, JobStatusMessage jobStatusMessage)
throws Exception {
var jobId = jobStatusMessage.getJobId();
var conf = getConfiguration(restClusterClient, jobId);
return new JobAutoScalerContext<>(
jobId,
jobId,
jobStatusMessage.getJobState(),
conf,
new UnregisteredMetricsGroup(),
() -> restClientGetter.apply(conf));
}
private Configuration getConfiguration(RestClusterClient<String> restClusterClient, JobID jobId)
throws Exception {
var jobParameters = new JobMessageParameters();
jobParameters.jobPathParameter.resolve(jobId);
var configurationInfo =
restClusterClient
.sendRequest(
JobManagerJobConfigurationHeaders.getInstance(),
jobParameters,
EmptyRequestBody.getInstance())
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
var conf = new Configuration();
configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
return conf;
}
}