| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.client.cli; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.InvalidProgramException; |
| import org.apache.flink.api.common.JobExecutionResult; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.JobSubmissionResult; |
| import org.apache.flink.api.common.accumulators.AccumulatorHelper; |
| import org.apache.flink.client.deployment.ClusterDescriptor; |
| import org.apache.flink.client.deployment.ClusterSpecification; |
| import org.apache.flink.client.program.ClusterClient; |
| import org.apache.flink.client.program.PackagedProgram; |
| import org.apache.flink.client.program.PackagedProgramUtils; |
| import org.apache.flink.client.program.ProgramInvocationException; |
| import org.apache.flink.client.program.ProgramMissingJobException; |
| import org.apache.flink.client.program.ProgramParametrizationException; |
| import org.apache.flink.configuration.ConfigConstants; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.CoreOptions; |
| import org.apache.flink.configuration.GlobalConfiguration; |
| import org.apache.flink.configuration.JobManagerOptions; |
| import org.apache.flink.configuration.RestOptions; |
| import org.apache.flink.core.fs.FileSystem; |
| import org.apache.flink.optimizer.DataStatistics; |
| import org.apache.flink.optimizer.Optimizer; |
| import org.apache.flink.optimizer.costs.DefaultCostEstimator; |
| import org.apache.flink.optimizer.plan.FlinkPlan; |
| import org.apache.flink.optimizer.plan.OptimizedPlan; |
| import org.apache.flink.optimizer.plan.StreamingPlan; |
| import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; |
| import org.apache.flink.runtime.akka.AkkaUtils; |
| import org.apache.flink.runtime.client.JobStatusMessage; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobStatus; |
| import org.apache.flink.runtime.messages.Acknowledge; |
| import org.apache.flink.runtime.messages.JobManagerMessages; |
| import org.apache.flink.runtime.security.SecurityConfiguration; |
| import org.apache.flink.runtime.security.SecurityUtils; |
| import org.apache.flink.runtime.util.EnvironmentInformation; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.FlinkException; |
| import org.apache.flink.util.Preconditions; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.Options; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URL; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import scala.concurrent.duration.FiniteDuration; |
| |
| import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; |
| import static org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION; |
| import static org.apache.flink.client.program.ClusterClient.MAX_SLOTS_UNKNOWN; |
| |
| /** |
| * Implementation of a simple command line frontend for executing programs. |
| */ |
| public class CliFrontend { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); |
| |
| // actions |
| private static final String ACTION_RUN = "run"; |
| private static final String ACTION_INFO = "info"; |
| private static final String ACTION_LIST = "list"; |
| private static final String ACTION_CANCEL = "cancel"; |
| private static final String ACTION_STOP = "stop"; |
| private static final String ACTION_SAVEPOINT = "savepoint"; |
| private static final String ACTION_MODIFY = "modify"; |
| |
| // configuration dir parameters |
| private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; |
| private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private final Configuration configuration; |
| |
| private final List<CustomCommandLine<?>> customCommandLines; |
| |
| private final Options customCommandLineOptions; |
| |
| private final FiniteDuration clientTimeout; |
| |
| private final int defaultParallelism; |
| |
| private final boolean isNewMode; |
| |
| public CliFrontend( |
| Configuration configuration, |
| List<CustomCommandLine<?>> customCommandLines) throws Exception { |
| this.configuration = Preconditions.checkNotNull(configuration); |
| this.customCommandLines = Preconditions.checkNotNull(customCommandLines); |
| |
| try { |
| FileSystem.initialize(this.configuration); |
| } catch (IOException e) { |
| throw new Exception("Error while setting the default " + |
| "filesystem scheme from configuration.", e); |
| } |
| |
| this.customCommandLineOptions = new Options(); |
| |
| for (CustomCommandLine<?> customCommandLine : customCommandLines) { |
| customCommandLine.addGeneralOptions(customCommandLineOptions); |
| customCommandLine.addRunOptions(customCommandLineOptions); |
| } |
| |
| this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); |
| this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); |
| |
| this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Getter & Setter |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Getter which returns a copy of the associated configuration. |
| * |
| * @return Copy of the associated configuration |
| */ |
| public Configuration getConfiguration() { |
| Configuration copiedConfiguration = new Configuration(); |
| |
| copiedConfiguration.addAll(configuration); |
| |
| return copiedConfiguration; |
| } |
| |
| public Options getCustomCommandLineOptions() { |
| return customCommandLineOptions; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Execute Actions |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Executions the run action. |
| * |
| * @param args Command line arguments for the run action. |
| */ |
| protected void run(String[] args) throws Exception { |
| LOG.info("Running 'run' command."); |
| |
| final Options commandOptions = CliFrontendParser.getRunCommandOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); |
| |
| final RunOptions runOptions = new RunOptions(commandLine); |
| |
| // evaluate help flag |
| if (runOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForRun(customCommandLines); |
| return; |
| } |
| |
| if (runOptions.getJarFilePath() == null) { |
| throw new CliArgsException("The program JAR file was not specified."); |
| } |
| |
| final PackagedProgram program; |
| try { |
| LOG.info("Building program from JAR file"); |
| program = buildProgram(runOptions); |
| } |
| catch (FileNotFoundException e) { |
| throw new CliArgsException("Could not build the program from JAR file.", e); |
| } |
| |
| final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| try { |
| runProgram(customCommandLine, commandLine, runOptions, program); |
| } finally { |
| program.deleteExtractedLibraries(); |
| } |
| } |
| |
| private <T> void runProgram( |
| CustomCommandLine<T> customCommandLine, |
| CommandLine commandLine, |
| RunOptions runOptions, |
| PackagedProgram program) throws ProgramInvocationException, FlinkException { |
| final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); |
| |
| try { |
| final T clusterId = customCommandLine.getClusterId(commandLine); |
| |
| final ClusterClient<T> client; |
| |
| // directly deploy the job if the cluster is started in job mode and detached |
| if (isNewMode && clusterId == null && runOptions.getDetachedMode()) { |
| int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); |
| |
| final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); |
| |
| final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); |
| client = clusterDescriptor.deployJobCluster( |
| clusterSpecification, |
| jobGraph, |
| runOptions.getDetachedMode()); |
| |
| logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); |
| |
| try { |
| client.shutdown(); |
| } catch (Exception e) { |
| LOG.info("Could not properly shut down the client.", e); |
| } |
| } else { |
| if (clusterId != null) { |
| client = clusterDescriptor.retrieve(clusterId); |
| } else { |
| // also in job mode we have to deploy a session cluster because the job |
| // might consist of multiple parts (e.g. when using collect) |
| final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); |
| client = clusterDescriptor.deploySessionCluster(clusterSpecification); |
| } |
| |
| try { |
| client.setPrintStatusDuringExecution(runOptions.getStdoutLogging()); |
| client.setDetached(runOptions.getDetachedMode()); |
| LOG.debug("Client slots is set to {}", client.getMaxSlots()); |
| |
| LOG.debug("{}", runOptions.getSavepointRestoreSettings()); |
| |
| int userParallelism = runOptions.getParallelism(); |
| LOG.debug("User parallelism is set to {}", userParallelism); |
| if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) { |
| logAndSysout("Using the parallelism provided by the remote cluster (" |
| + client.getMaxSlots() + "). " |
| + "To use another parallelism, set it at the ./bin/flink client."); |
| userParallelism = client.getMaxSlots(); |
| } else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { |
| userParallelism = defaultParallelism; |
| } |
| |
| executeProgram(program, client, userParallelism); |
| } finally { |
| if (clusterId == null && !client.isDetached()) { |
| // terminate the cluster only if we have started it before and if it's not detached |
| try { |
| client.shutDownCluster(); |
| } catch (final Exception e) { |
| LOG.info("Could not properly terminate the Flink cluster.", e); |
| } |
| } |
| |
| try { |
| client.shutdown(); |
| } catch (Exception e) { |
| LOG.info("Could not properly shut down the client.", e); |
| } |
| } |
| } |
| } finally { |
| try { |
| clusterDescriptor.close(); |
| } catch (Exception e) { |
| LOG.info("Could not properly close the cluster descriptor.", e); |
| } |
| } |
| } |
| |
| /** |
| * Executes the info action. |
| * |
| * @param args Command line arguments for the info action. |
| */ |
| protected void info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { |
| LOG.info("Running 'info' command."); |
| |
| final Options commandOptions = CliFrontendParser.getInfoCommandOptions(); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true); |
| |
| InfoOptions infoOptions = new InfoOptions(commandLine); |
| |
| // evaluate help flag |
| if (infoOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForInfo(); |
| return; |
| } |
| |
| if (infoOptions.getJarFilePath() == null) { |
| throw new CliArgsException("The program JAR file was not specified."); |
| } |
| |
| // -------- build the packaged program ------------- |
| |
| LOG.info("Building program from JAR file"); |
| final PackagedProgram program = buildProgram(infoOptions); |
| |
| try { |
| int parallelism = infoOptions.getParallelism(); |
| if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) { |
| parallelism = defaultParallelism; |
| } |
| |
| LOG.info("Creating program plan dump"); |
| |
| Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); |
| FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism); |
| |
| String jsonPlan = null; |
| if (flinkPlan instanceof OptimizedPlan) { |
| jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan); |
| } else if (flinkPlan instanceof StreamingPlan) { |
| jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON(); |
| } |
| |
| if (jsonPlan != null) { |
| System.out.println("----------------------- Execution Plan -----------------------"); |
| System.out.println(jsonPlan); |
| System.out.println("--------------------------------------------------------------"); |
| } |
| else { |
| System.out.println("JSON plan could not be generated."); |
| } |
| |
| String description = program.getDescription(); |
| if (description != null) { |
| System.out.println(); |
| System.out.println(description); |
| } |
| else { |
| System.out.println(); |
| System.out.println("No description provided."); |
| } |
| } |
| finally { |
| program.deleteExtractedLibraries(); |
| } |
| } |
| |
| /** |
| * Executes the list action. |
| * |
| * @param args Command line arguments for the list action. |
| */ |
| protected void list(String[] args) throws Exception { |
| LOG.info("Running 'list' command."); |
| |
| final Options commandOptions = CliFrontendParser.getListCommandOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); |
| |
| ListOptions listOptions = new ListOptions(commandLine); |
| |
| // evaluate help flag |
| if (listOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForList(customCommandLines); |
| return; |
| } |
| |
| final boolean running; |
| final boolean scheduled; |
| |
| // print running and scheduled jobs if not option supplied |
| if (!listOptions.getRunning() && !listOptions.getScheduled()) { |
| running = true; |
| scheduled = true; |
| } else { |
| running = listOptions.getRunning(); |
| scheduled = listOptions.getScheduled(); |
| } |
| |
| final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> listJobs(clusterClient, running, scheduled)); |
| |
| } |
| |
| private <T> void listJobs( |
| ClusterClient<T> clusterClient, |
| boolean running, |
| boolean scheduled) throws FlinkException { |
| Collection<JobStatusMessage> jobDetails; |
| try { |
| CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs(); |
| |
| logAndSysout("Waiting for response..."); |
| jobDetails = jobDetailsFuture.get(); |
| |
| } catch (Exception e) { |
| Throwable cause = ExceptionUtils.stripExecutionException(e); |
| throw new FlinkException("Failed to retrieve job list.", cause); |
| } |
| |
| LOG.info("Successfully retrieved list of jobs"); |
| |
| SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); |
| Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime()); |
| |
| final List<JobStatusMessage> runningJobs = new ArrayList<>(); |
| final List<JobStatusMessage> scheduledJobs = new ArrayList<>(); |
| jobDetails.forEach(details -> { |
| if (details.getJobState() == JobStatus.CREATED) { |
| scheduledJobs.add(details); |
| } else if (!details.getJobState().isGloballyTerminalState()) { |
| runningJobs.add(details); |
| } |
| }); |
| |
| if (running) { |
| if (runningJobs.size() == 0) { |
| System.out.println("No running jobs."); |
| } |
| else { |
| runningJobs.sort(startTimeComparator); |
| |
| System.out.println("------------------ Running/Restarting Jobs -------------------"); |
| for (JobStatusMessage runningJob : runningJobs) { |
| System.out.println(dateFormat.format(new Date(runningJob.getStartTime())) |
| + " : " + runningJob.getJobId() + " : " + runningJob.getJobName() + " (" + runningJob.getJobState() + ")"); |
| } |
| System.out.println("--------------------------------------------------------------"); |
| } |
| } |
| if (scheduled) { |
| if (scheduledJobs.size() == 0) { |
| System.out.println("No scheduled jobs."); |
| } |
| else { |
| scheduledJobs.sort(startTimeComparator); |
| |
| System.out.println("----------------------- Scheduled Jobs -----------------------"); |
| for (JobStatusMessage scheduledJob : scheduledJobs) { |
| System.out.println(dateFormat.format(new Date(scheduledJob.getStartTime())) |
| + " : " + scheduledJob.getJobId() + " : " + scheduledJob.getJobName()); |
| } |
| System.out.println("--------------------------------------------------------------"); |
| } |
| } |
| } |
| |
| /** |
| * Executes the STOP action. |
| * |
| * @param args Command line arguments for the stop action. |
| */ |
| protected void stop(String[] args) throws Exception { |
| LOG.info("Running 'stop' command."); |
| |
| final Options commandOptions = CliFrontendParser.getStopCommandOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); |
| |
| StopOptions stopOptions = new StopOptions(commandLine); |
| |
| // evaluate help flag |
| if (stopOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForStop(customCommandLines); |
| return; |
| } |
| |
| String[] stopArgs = stopOptions.getArgs(); |
| JobID jobId; |
| |
| if (stopArgs.length > 0) { |
| String jobIdString = stopArgs[0]; |
| jobId = parseJobId(jobIdString); |
| } else { |
| throw new CliArgsException("Missing JobID"); |
| } |
| |
| final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| logAndSysout("Stopping job " + jobId + '.'); |
| |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> { |
| try { |
| clusterClient.stop(jobId); |
| } catch (Exception e) { |
| throw new FlinkException("Could not stop the job " + jobId + '.', e); |
| } |
| }); |
| |
| logAndSysout("Stopped job " + jobId + '.'); |
| } |
| |
| /** |
| * Executes the CANCEL action. |
| * |
| * @param args Command line arguments for the cancel action. |
| */ |
| protected void cancel(String[] args) throws Exception { |
| LOG.info("Running 'cancel' command."); |
| |
| final Options commandOptions = CliFrontendParser.getCancelCommandOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); |
| |
| CancelOptions cancelOptions = new CancelOptions(commandLine); |
| |
| // evaluate help flag |
| if (cancelOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForCancel(customCommandLines); |
| return; |
| } |
| |
| final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| final String[] cleanedArgs = cancelOptions.getArgs(); |
| |
| if (cancelOptions.isWithSavepoint()) { |
| final JobID jobId; |
| final String targetDirectory; |
| |
| if (cleanedArgs.length > 0) { |
| jobId = parseJobId(cleanedArgs[0]); |
| targetDirectory = cancelOptions.getSavepointTargetDirectory(); |
| } else { |
| jobId = parseJobId(cancelOptions.getSavepointTargetDirectory()); |
| targetDirectory = null; |
| } |
| |
| if (targetDirectory == null) { |
| logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); |
| } else { |
| logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); |
| } |
| |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> { |
| final String savepointPath; |
| try { |
| savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory); |
| } catch (Exception e) { |
| throw new FlinkException("Could not cancel job " + jobId + '.', e); |
| } |
| logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); |
| }); |
| } else { |
| final JobID jobId; |
| |
| if (cleanedArgs.length > 0) { |
| jobId = parseJobId(cleanedArgs[0]); |
| } else { |
| throw new CliArgsException("Missing JobID. Specify a JobID to cancel a job."); |
| } |
| |
| logAndSysout("Cancelling job " + jobId + '.'); |
| |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> { |
| try { |
| clusterClient.cancel(jobId); |
| } catch (Exception e) { |
| throw new FlinkException("Could not cancel job " + jobId + '.', e); |
| } |
| }); |
| |
| logAndSysout("Cancelled job " + jobId + '.'); |
| } |
| } |
| |
| /** |
| * Executes the SAVEPOINT action. |
| * |
| * @param args Command line arguments for the savepoint action. |
| */ |
| protected void savepoint(String[] args) throws Exception { |
| LOG.info("Running 'savepoint' command."); |
| |
| final Options commandOptions = CliFrontendParser.getSavepointCommandOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); |
| |
| final SavepointOptions savepointOptions = new SavepointOptions(commandLine); |
| |
| // evaluate help flag |
| if (savepointOptions.isPrintHelp()) { |
| CliFrontendParser.printHelpForSavepoint(customCommandLines); |
| return; |
| } |
| |
| final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| if (savepointOptions.isDispose()) { |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> disposeSavepoint(clusterClient, savepointOptions.getSavepointPath())); |
| } else { |
| String[] cleanedArgs = savepointOptions.getArgs(); |
| |
| final JobID jobId; |
| |
| if (cleanedArgs.length >= 1) { |
| String jobIdString = cleanedArgs[0]; |
| |
| jobId = parseJobId(jobIdString); |
| } else { |
| throw new CliArgsException("Missing JobID. " + |
| "Specify a Job ID to trigger a savepoint."); |
| } |
| |
| final String savepointDirectory; |
| if (cleanedArgs.length >= 2) { |
| savepointDirectory = cleanedArgs[1]; |
| } else { |
| savepointDirectory = null; |
| } |
| |
| // Print superfluous arguments |
| if (cleanedArgs.length >= 3) { |
| logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); |
| } |
| |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> triggerSavepoint(clusterClient, jobId, savepointDirectory)); |
| } |
| |
| } |
| |
| /** |
| * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint} |
| * message to the job manager. |
| */ |
| private String triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory) throws FlinkException { |
| logAndSysout("Triggering savepoint for job " + jobId + '.'); |
| CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory); |
| |
| logAndSysout("Waiting for response..."); |
| |
| final String savepointPath; |
| |
| try { |
| savepointPath = savepointPathFuture.get(); |
| } |
| catch (Exception e) { |
| Throwable cause = ExceptionUtils.stripExecutionException(e); |
| throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause); |
| } |
| |
| logAndSysout("Savepoint completed. Path: " + savepointPath); |
| logAndSysout("You can resume your program from this savepoint with the run command."); |
| |
| return savepointPath; |
| } |
| |
| /** |
| * Sends a {@link JobManagerMessages.DisposeSavepoint} message to the job manager. |
| */ |
| private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath) throws FlinkException { |
| Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " + |
| "Usage: bin/flink savepoint -d <savepoint-path>"); |
| |
| logAndSysout("Disposing savepoint '" + savepointPath + "'."); |
| |
| final CompletableFuture<Acknowledge> disposeFuture = clusterClient.disposeSavepoint(savepointPath); |
| |
| logAndSysout("Waiting for response..."); |
| |
| try { |
| disposeFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); |
| } catch (Exception e) { |
| throw new FlinkException("Disposing the savepoint '" + savepointPath + "' failed.", e); |
| } |
| |
| logAndSysout("Savepoint '" + savepointPath + "' disposed."); |
| } |
| |
| protected void modify(String[] args) throws CliArgsException, FlinkException { |
| LOG.info("Running 'modify' command."); |
| |
| final Options commandOptions = CliFrontendParser.getModifyOptions(); |
| |
| final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); |
| |
| final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); |
| |
| if (commandLine.hasOption(HELP_OPTION.getOpt())) { |
| CliFrontendParser.printHelpForModify(customCommandLines); |
| } |
| |
| final JobID jobId; |
| final String[] modifyArgs = commandLine.getArgs(); |
| |
| if (modifyArgs.length > 0) { |
| jobId = parseJobId(modifyArgs[0]); |
| } else { |
| throw new CliArgsException("Missing JobId"); |
| } |
| |
| final int newParallelism; |
| if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) { |
| try { |
| newParallelism = Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt())); |
| } catch (NumberFormatException e) { |
| throw new CliArgsException("Could not parse the parallelism which is supposed to be an integer.", e); |
| } |
| } else { |
| throw new CliArgsException("Missing new parallelism."); |
| } |
| |
| final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); |
| |
| logAndSysout("Modify job " + jobId + '.'); |
| runClusterAction( |
| activeCommandLine, |
| commandLine, |
| clusterClient -> { |
| CompletableFuture<Acknowledge> rescaleFuture = clusterClient.rescaleJob(jobId, newParallelism); |
| |
| try { |
| rescaleFuture.get(); |
| } catch (Exception e) { |
| throw new FlinkException("Could not rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e)); |
| } |
| logAndSysout("Rescaled job " + jobId + ". Its new parallelism is " + newParallelism + '.'); |
| } |
| ); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Interaction with programs and JobManager |
| // -------------------------------------------------------------------------------------------- |
| |
| protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { |
| logAndSysout("Starting execution of program"); |
| |
| final JobSubmissionResult result = client.run(program, parallelism); |
| |
| if (null == result) { |
| throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " + |
| "ExecutionEnvironment.execute()"); |
| } |
| |
| if (result.isJobExecutionResult()) { |
| logAndSysout("Program execution finished"); |
| JobExecutionResult execResult = result.getJobExecutionResult(); |
| System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); |
| System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); |
| Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); |
| if (accumulatorsResult.size() > 0) { |
| System.out.println("Accumulator Results: "); |
| System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult)); |
| } |
| } else { |
| logAndSysout("Job has been submitted with JobID " + result.getJobID()); |
| } |
| } |
| |
| /** |
| * Creates a Packaged program from the given command line options. |
| * |
| * @return A PackagedProgram (upon success) |
| * @throws java.io.FileNotFoundException |
| * @throws org.apache.flink.client.program.ProgramInvocationException |
| */ |
| protected PackagedProgram buildProgram(ProgramOptions options) |
| throws FileNotFoundException, ProgramInvocationException { |
| String[] programArgs = options.getProgramArgs(); |
| String jarFilePath = options.getJarFilePath(); |
| List<URL> classpaths = options.getClasspaths(); |
| |
| final List<URI> libjars = options.getLibjars(); |
| final List<URI> files = options.getFiles(); |
| |
| if (jarFilePath == null) { |
| throw new IllegalArgumentException("The program JAR file was not specified."); |
| } |
| |
| File jarFile = new File(jarFilePath); |
| |
| // Check if JAR file exists |
| if (!jarFile.exists()) { |
| throw new FileNotFoundException("JAR file does not exist: " + jarFile); |
| } |
| else if (!jarFile.isFile()) { |
| throw new FileNotFoundException("JAR file is not a file: " + jarFile); |
| } |
| |
| // Get assembler class |
| String entryPointClass = options.getEntryPointClassName(); |
| |
| PackagedProgram program = entryPointClass == null ? |
| new PackagedProgram(jarFile, classpaths, libjars, files, programArgs) : |
| new PackagedProgram(jarFile, classpaths, entryPointClass, libjars, files, programArgs); |
| |
| program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); |
| |
| return program; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Logging and Exception Handling |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Displays an exception message for incorrect command line arguments. |
| * |
| * @param e The exception to display. |
| * @return The return code for the process. |
| */ |
| private static int handleArgException(CliArgsException e) { |
| LOG.error("Invalid command line arguments.", e); |
| |
| System.out.println(e.getMessage()); |
| System.out.println(); |
| System.out.println("Use the help option (-h or --help) to get help on the command."); |
| return 1; |
| } |
| |
| /** |
| * Displays an optional exception message for incorrect program parametrization. |
| * |
| * @param e The exception to display. |
| * @return The return code for the process. |
| */ |
| private static int handleParametrizationException(ProgramParametrizationException e) { |
| LOG.error("Program has not been parametrized properly.", e); |
| System.err.println(e.getMessage()); |
| return 1; |
| } |
| |
| /** |
| * Displays a message for a program without a job to execute. |
| * |
| * @return The return code for the process. |
| */ |
| private static int handleMissingJobException() { |
| System.err.println(); |
| System.err.println("The program didn't contain a Flink job. " + |
| "Perhaps you forgot to call execute() on the execution environment."); |
| return 1; |
| } |
| |
| /** |
| * Displays an exception message. |
| * |
| * @param t The exception to display. |
| * @return The return code for the process. |
| */ |
| private static int handleError(Throwable t) { |
| LOG.error("Error while running the command.", t); |
| |
| System.err.println(); |
| System.err.println("------------------------------------------------------------"); |
| System.err.println(" The program finished with the following exception:"); |
| System.err.println(); |
| |
| if (t.getCause() instanceof InvalidProgramException) { |
| System.err.println(t.getCause().getMessage()); |
| StackTraceElement[] trace = t.getCause().getStackTrace(); |
| for (StackTraceElement ele: trace) { |
| System.err.println("\t" + ele); |
| if (ele.getMethodName().equals("main")) { |
| break; |
| } |
| } |
| } else { |
| t.printStackTrace(); |
| } |
| return 1; |
| } |
| |
| private static void logAndSysout(String message) { |
| LOG.info(message); |
| System.out.println(message); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Internal methods |
| // -------------------------------------------------------------------------------------------- |
| |
| private JobID parseJobId(String jobIdString) throws CliArgsException { |
| JobID jobId; |
| try { |
| jobId = JobID.fromHexString(jobIdString); |
| } catch (IllegalArgumentException e) { |
| throw new CliArgsException(e.getMessage()); |
| } |
| return jobId; |
| } |
| |
| /** |
| * Retrieves the {@link ClusterClient} from the given {@link CustomCommandLine} and runs the given |
| * {@link ClusterAction} against it. |
| * |
| * @param activeCommandLine to create the {@link ClusterDescriptor} from |
| * @param commandLine containing the parsed command line options |
| * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}. |
| * @param <T> type of the cluster id |
| * @throws FlinkException if something goes wrong |
| */ |
| private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException { |
| final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine); |
| |
| final T clusterId = activeCommandLine.getClusterId(commandLine); |
| |
| if (clusterId == null) { |
| throw new FlinkException("No cluster id was specified. Please specify a cluster to which " + |
| "you would like to connect."); |
| } else { |
| try { |
| final ClusterClient<T> clusterClient = clusterDescriptor.retrieve(clusterId); |
| |
| try { |
| clusterAction.runAction(clusterClient); |
| } finally { |
| try { |
| clusterClient.shutdown(); |
| } catch (Exception e) { |
| LOG.info("Could not properly shut down the cluster client.", e); |
| } |
| } |
| } finally { |
| try { |
| clusterDescriptor.close(); |
| } catch (Exception e) { |
| LOG.info("Could not properly close the cluster descriptor.", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Internal interface to encapsulate cluster actions which are executed via |
| * the {@link ClusterClient}. |
| * |
| * @param <T> type of the cluster id |
| */ |
| @FunctionalInterface |
| private interface ClusterAction<T> { |
| |
| /** |
| * Run the cluster action with the given {@link ClusterClient}. |
| * |
| * @param clusterClient to run the cluster action against |
| * @throws FlinkException if something goes wrong |
| */ |
| void runAction(ClusterClient<T> clusterClient) throws FlinkException; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Entry point for executable |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Parses the command line arguments and starts the requested action. |
| * |
| * @param args command line arguments of the client. |
| * @return The return code of the program |
| */ |
| public int parseParameters(String[] args) { |
| |
| // check for action |
| if (args.length < 1) { |
| CliFrontendParser.printHelp(customCommandLines); |
| System.out.println("Please specify an action."); |
| return 1; |
| } |
| |
| // get action |
| String action = args[0]; |
| |
| // remove action from parameters |
| final String[] params = Arrays.copyOfRange(args, 1, args.length); |
| |
| try { |
| // do action |
| switch (action) { |
| case ACTION_RUN: |
| run(params); |
| return 0; |
| case ACTION_LIST: |
| list(params); |
| return 0; |
| case ACTION_INFO: |
| info(params); |
| return 0; |
| case ACTION_CANCEL: |
| cancel(params); |
| return 0; |
| case ACTION_STOP: |
| stop(params); |
| return 0; |
| case ACTION_SAVEPOINT: |
| savepoint(params); |
| return 0; |
| case ACTION_MODIFY: |
| modify(params); |
| return 0; |
| case "-h": |
| case "--help": |
| CliFrontendParser.printHelp(customCommandLines); |
| return 0; |
| case "-v": |
| case "--version": |
| String version = EnvironmentInformation.getVersion(); |
| String commitID = EnvironmentInformation.getRevisionInformation().commitId; |
| System.out.print("Version: " + version); |
| System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); |
| return 0; |
| default: |
| System.out.printf("\"%s\" is not a valid action.\n", action); |
| System.out.println(); |
| System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); |
| System.out.println(); |
| System.out.println("Specify the version option (-v or --version) to print Flink version."); |
| System.out.println(); |
| System.out.println("Specify the help option (-h or --help) to get help on the command."); |
| return 1; |
| } |
| } catch (CliArgsException ce) { |
| return handleArgException(ce); |
| } catch (ProgramParametrizationException ppe) { |
| return handleParametrizationException(ppe); |
| } catch (ProgramMissingJobException pmje) { |
| return handleMissingJobException(); |
| } catch (Exception e) { |
| return handleError(e); |
| } |
| } |
| |
| /** |
| * Submits the job based on the arguments. |
| */ |
| public static void main(final String[] args) { |
| EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); |
| |
| // 1. find the configuration directory |
| final String configurationDirectory = getConfigurationDirectoryFromEnv(); |
| |
| // 2. load the global configuration |
| final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); |
| |
| // 3. load the custom command lines |
| final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines( |
| configuration, |
| configurationDirectory); |
| |
| try { |
| final CliFrontend cli = new CliFrontend( |
| configuration, |
| customCommandLines); |
| |
| SecurityUtils.install(new SecurityConfiguration(cli.configuration)); |
| int retCode = SecurityUtils.getInstalledContext() |
| .runSecured(() -> cli.parseParameters(args)); |
| System.exit(retCode); |
| } |
| catch (Throwable t) { |
| LOG.error("Fatal error while running command line interface.", t); |
| t.printStackTrace(); |
| System.exit(31); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Miscellaneous Utilities |
| // -------------------------------------------------------------------------------------------- |
| |
| public static String getConfigurationDirectoryFromEnv() { |
| String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); |
| |
| if (location != null) { |
| if (new File(location).exists()) { |
| return location; |
| } |
| else { |
| throw new RuntimeException("The configuration directory '" + location + "', specified in the '" + |
| ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist."); |
| } |
| } |
| else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { |
| location = CONFIG_DIRECTORY_FALLBACK_1; |
| } |
| else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { |
| location = CONFIG_DIRECTORY_FALLBACK_2; |
| } |
| else { |
| throw new RuntimeException("The configuration directory was not specified. " + |
| "Please specify the directory containing the configuration file through the '" + |
| ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable."); |
| } |
| return location; |
| } |
| |
| /** |
| * Writes the given job manager address to the associated configuration object. |
| * |
| * @param address Address to write to the configuration |
| * @param config The configuration to write to |
| */ |
| public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { |
| config.setString(JobManagerOptions.ADDRESS, address.getHostString()); |
| config.setInteger(JobManagerOptions.PORT, address.getPort()); |
| config.setString(RestOptions.ADDRESS, address.getHostString()); |
| config.setInteger(RestOptions.PORT, address.getPort()); |
| } |
| |
| public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { |
| List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2); |
| |
| // Command line interface of the YARN session, with a special initialization here |
| // to prefix all options with y/yarn. |
| // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the |
| // active CustomCommandLine in order and DefaultCLI isActive always return true. |
| final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; |
| try { |
| customCommandLines.add( |
| loadCustomCommandLine(flinkYarnSessionCLI, |
| configuration, |
| configurationDirectory, |
| "y", |
| "yarn")); |
| } catch (NoClassDefFoundError | Exception e) { |
| LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); |
| } |
| |
| // Command line interface of the kubernetes session, with a special initialization here |
| // to prefix all options with k/kubernetes. |
| final String flinkKubernetesSessionCLI = "org.apache.flink.kubernetes.cli.FlinkKubernetesSessionCli"; |
| try { |
| customCommandLines.add( |
| loadCustomCommandLine(flinkKubernetesSessionCLI, |
| configuration, |
| configurationDirectory, |
| "k", |
| "kubernetes")); |
| } catch (NoClassDefFoundError | Exception e) { |
| LOG.warn("Could not load CLI class {}.", flinkKubernetesSessionCLI, e); |
| } |
| |
| if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) { |
| customCommandLines.add(new DefaultCLI(configuration)); |
| } else { |
| customCommandLines.add(new LegacyCLI(configuration)); |
| } |
| |
| return customCommandLines; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Custom command-line |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Gets the custom command-line for the arguments. |
| * @param commandLine The input to the command-line. |
| * @return custom command-line which is active (may only be one at a time) |
| */ |
| public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) { |
| for (CustomCommandLine<?> cli : customCommandLines) { |
| if (cli.isActive(commandLine)) { |
| return cli; |
| } |
| } |
| throw new IllegalStateException("No command-line ran."); |
| } |
| |
| /** |
| * Loads a class from the classpath that implements the CustomCommandLine interface. |
| * @param className The fully-qualified class name to load. |
| * @param params The constructor parameters |
| */ |
| private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException { |
| |
| Class<? extends CustomCommandLine> customCliClass = |
| Class.forName(className).asSubclass(CustomCommandLine.class); |
| |
| // construct class types from the parameters |
| Class<?>[] types = new Class<?>[params.length]; |
| for (int i = 0; i < params.length; i++) { |
| Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null."); |
| types[i] = params[i].getClass(); |
| } |
| |
| Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types); |
| |
| return constructor.newInstance(params); |
| } |
| |
| } |