blob: 3b9ac865d274b30359e9dd471b33f05d30761452 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import java.net.URI;
import java.net.URL;
import java.util.List;
/**
* Execution Environment for remote execution with the Client.
*/
public class ContextEnvironment extends ExecutionEnvironment {
protected final ClusterClient<?> client;
protected final List<URL> jarFilesToAttach;
protected final List<URL> classpathsToAttach;
protected final List<URI> libjars;
protected final List<URI> files;
protected final ClassLoader userCodeClassLoader;
protected final SavepointRestoreSettings savepointSettings;
public ContextEnvironment(ClusterClient<?> remoteConnection, List<URL> jarFiles, List<URL> classpaths,
List<URI> libjars, List<URI> files,
ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
this.classpathsToAttach = classpaths;
this.libjars = libjars;
this.files = files;
this.userCodeClassLoader = userCodeClassLoader;
this.savepointSettings = savepointSettings;
}
@Override
public JobSubmissionResult executeInternal(String jobName, boolean detached) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
libjars, files, this.userCodeClassLoader);
JobSubmissionResult submissionResult = client.run(toRun, getParallelism(), savepointSettings, detached);
if (submissionResult.isJobExecutionResult()) {
this.lastJobExecutionResult = submissionResult.getJobExecutionResult();
}
return this.lastJobExecutionResult;
}
@Override
public void cancel(JobID jobId) {
throw new UnsupportedOperationException();
}
@Override
public void stop() {
this.client.shutDownCluster();
try {
this.client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan("unnamed job");
OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism());
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}
@Override
public void startNewSession() throws Exception {
client.endSession(jobID);
jobID = JobID.generate();
}
@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
}
public ClusterClient<?> getClient() {
return this.client;
}
public List<URL> getJars(){
return jarFilesToAttach;
}
public List<URL> getClasspaths(){
return classpathsToAttach;
}
public List<URI> getLibjars() {
return libjars;
}
public List<URI> getFiles() {
return files;
}
public ClassLoader getUserCodeClassLoader() {
return userCodeClassLoader;
}
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
// --------------------------------------------------------------------------------------------
static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
}
static void unsetContext() {
resetContextEnvironment();
}
}