blob: 05d12ff332902c932846bf541fc4a6b677b00ef5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* The helper class to deploy a table program on the cluster.
*/
public class ProgramDeployer<C> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ProgramDeployer.class);
private final ExecutionContext<C> context;
private final JobGraph jobGraph;
private final String jobName;
private final Result<C> result;
private final boolean awaitJobResult;
private final BlockingQueue<JobExecutionResult> executionResultBucket;
/**
* Deploys a table program on the cluster.
*
* @param context context with deployment information
* @param jobName job name of the Flink job to be submitted
* @param jobGraph Flink job graph
* @param result result that receives information about the target cluster
* @param awaitJobResult block for a job execution result from the cluster
*/
public ProgramDeployer(
ExecutionContext<C> context,
String jobName,
JobGraph jobGraph,
Result<C> result,
boolean awaitJobResult) {
this.context = context;
this.jobGraph = jobGraph;
this.jobName = jobName;
this.result = result;
this.awaitJobResult = awaitJobResult;
executionResultBucket = new LinkedBlockingDeque<>(1);
}
@Override
public void run() {
LOG.info("Submitting job {} for query {}`", jobGraph.getJobID(), jobName);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting job {} with the following environment: \n{}",
jobGraph.getJobID(), context.getMergedEnvironment());
}
deployJob(context, jobGraph, result);
}
public JobExecutionResult fetchExecutionResult() {
return executionResultBucket.poll();
}
/**
* Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
* the result and blocks until job completion.
*/
private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
// create or retrieve cluster and deploy job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
try {
// new cluster
if (context.getClusterId() == null) {
deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
}
// reuse existing cluster
else {
deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
private <T> void deployJobOnNewCluster(
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result,
ClassLoader classLoader) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// deploy job cluster with job attached
clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
// save information about the new cluster
result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
// get result
if (awaitJobResult) {
// we need to hard cast for now
final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
executionResultBucket.add(jobResult);
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
private <T> void deployJobOnExistingCluster(
T clusterId,
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(clusterId);
String webInterfaceUrl;
// retrieving the web interface URL might fail on legacy pre-FLIP-6 code paths
// TODO remove this once we drop support for legacy deployment code
try {
webInterfaceUrl = clusterClient.getWebInterfaceURL();
} catch (Exception e) {
webInterfaceUrl = "N/A";
}
// save the cluster information
result.setClusterInformation(clusterClient.getClusterId(), webInterfaceUrl);
// submit job (and get result)
if (awaitJobResult) {
clusterClient.setDetached(false);
final JobExecutionResult jobResult = clusterClient
.submitJob(jobGraph, context.getClassLoader())
.getJobExecutionResult(); // throws exception if job fails
executionResultBucket.add(jobResult);
} else {
clusterClient.setDetached(true);
clusterClient.submitJob(jobGraph, context.getClassLoader());
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
}