| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.client.program; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.JobSubmissionResult; |
| import org.apache.flink.api.common.Plan; |
| import org.apache.flink.api.java.ExecutionEnvironment; |
| import org.apache.flink.optimizer.plan.OptimizedPlan; |
| import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; |
| import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; |
| |
| import java.net.URI; |
| import java.net.URL; |
| import java.util.List; |
| |
| /** |
| * Execution Environment for remote execution with the Client. |
| */ |
| public class ContextEnvironment extends ExecutionEnvironment { |
| |
| protected final ClusterClient<?> client; |
| |
| protected final List<URL> jarFilesToAttach; |
| |
| protected final List<URL> classpathsToAttach; |
| |
| protected final List<URI> libjars; |
| |
| protected final List<URI> files; |
| |
| protected final ClassLoader userCodeClassLoader; |
| |
| protected final SavepointRestoreSettings savepointSettings; |
| |
| public ContextEnvironment(ClusterClient<?> remoteConnection, List<URL> jarFiles, List<URL> classpaths, |
| List<URI> libjars, List<URI> files, |
| ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) { |
| this.client = remoteConnection; |
| this.jarFilesToAttach = jarFiles; |
| this.classpathsToAttach = classpaths; |
| this.libjars = libjars; |
| this.files = files; |
| this.userCodeClassLoader = userCodeClassLoader; |
| this.savepointSettings = savepointSettings; |
| } |
| |
| @Override |
| public JobSubmissionResult executeInternal(String jobName, boolean detached) throws Exception { |
| Plan p = createProgramPlan(jobName); |
| JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach, |
| libjars, files, this.userCodeClassLoader); |
| JobSubmissionResult submissionResult = client.run(toRun, getParallelism(), savepointSettings, detached); |
| if (submissionResult.isJobExecutionResult()) { |
| this.lastJobExecutionResult = submissionResult.getJobExecutionResult(); |
| } |
| return this.lastJobExecutionResult; |
| } |
| |
| @Override |
| public void cancel(JobID jobId) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void stop() { |
| this.client.shutDownCluster(); |
| try { |
| this.client.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Override |
| public String getExecutionPlan() throws Exception { |
| Plan plan = createProgramPlan("unnamed job"); |
| |
| OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism()); |
| PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); |
| return gen.getOptimizerPlanAsJSON(op); |
| } |
| |
| @Override |
| public void startNewSession() throws Exception { |
| client.endSession(jobID); |
| jobID = JobID.generate(); |
| } |
| |
| @Override |
| public String toString() { |
| return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) |
| + ") : " + getIdString(); |
| } |
| |
| public ClusterClient<?> getClient() { |
| return this.client; |
| } |
| |
| public List<URL> getJars(){ |
| return jarFilesToAttach; |
| } |
| |
| public List<URL> getClasspaths(){ |
| return classpathsToAttach; |
| } |
| |
| public List<URI> getLibjars() { |
| return libjars; |
| } |
| |
| public List<URI> getFiles() { |
| return files; |
| } |
| |
| public ClassLoader getUserCodeClassLoader() { |
| return userCodeClassLoader; |
| } |
| |
| public SavepointRestoreSettings getSavepointRestoreSettings() { |
| return savepointSettings; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| static void setAsContext(ContextEnvironmentFactory factory) { |
| initializeContextEnvironment(factory); |
| } |
| |
| static void unsetContext() { |
| resetContextEnvironment(); |
| } |
| } |