| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.execution; |
| |
| import java.io.File; |
| import java.io.PrintWriter; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.application.descriptors.ApplicationDescriptor; |
| import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; |
| import org.apache.samza.application.LegacyTaskApplication; |
| import org.apache.samza.config.ApplicationConfig; |
| import org.apache.samza.config.ClusterManagerConfig; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.JobConfig; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.config.ShellCommandConfig; |
| import org.apache.samza.config.StreamConfig; |
| import org.apache.samza.config.TaskConfig; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * This is a temporary helper class to include all common logic to generate {@link JobConfig}s for high- and low-level |
| * applications in {@link org.apache.samza.runtime.LocalApplicationRunner} and {@link org.apache.samza.runtime.RemoteApplicationRunner}. |
| * |
| * TODO: Fix SAMZA-1811 to consolidate this class with {@link ExecutionPlanner} |
| */ |
| public abstract class JobPlanner { |
| private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class); |
| |
| protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; |
| protected final Config userConfig; |
| |
| JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) { |
| this.appDesc = descriptor; |
| this.userConfig = descriptor.getConfig(); |
| } |
| |
| public abstract List<JobConfig> prepareJobs(); |
| |
| StreamManager buildAndStartStreamManager(Config config) { |
| StreamManager streamManager = new StreamManager(config); |
| streamManager.start(); |
| return streamManager; |
| } |
| |
| ExecutionPlan getExecutionPlan() { |
| return getExecutionPlan(null); |
| } |
| |
| /* package private */ |
| ExecutionPlan getExecutionPlan(String runId) { |
| Map<String, String> allowedUserConfig = new HashMap<>(userConfig); |
| Map<String, String> generatedConfig = new HashMap<>(); |
| |
| // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 |
| // Don't generate any configurations for LegacyTaskApplications |
| if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) { |
| // Don't allow overriding task.inputs to a blank string |
| if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) { |
| allowedUserConfig.remove(TaskConfig.INPUT_STREAMS); |
| } |
| generatedConfig.putAll(getGeneratedConfig()); |
| } |
| |
| if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) { |
| allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED); |
| } |
| |
| // APP_RUN_ID should be generated for both LegacyTaskApplications & descriptor based applications |
| // This config is used in BATCH mode to create new intermediate streams on runs and in stream mode use by |
| // Container Placements to identify a deployment of Samza |
| if (StringUtils.isNoneEmpty(runId)) { |
| generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId); |
| } |
| |
| // merge user-provided configuration with generated configuration. generated configuration has lower priority. |
| Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig); |
| |
| // creating the StreamManager to get all input/output streams' metadata for planning |
| StreamManager streamManager = buildAndStartStreamManager(mergedConfig); |
| |
| try { |
| ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager); |
| return planner.plan(appDesc); |
| } finally { |
| streamManager.stop(); |
| } |
| } |
| |
| /** |
| * Write the execution plan JSON to a file |
| * @param planJson JSON representation of the plan |
| */ |
| final void writePlanJsonFile(String planJson) { |
| try { |
| String content = "plan='" + planJson + "'"; |
| String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR); |
| if (planPath != null && !planPath.isEmpty()) { |
| // Write the plan json to plan path |
| File file = new File(planPath + "/plan.json"); |
| file.setReadable(true, false); |
| PrintWriter writer = new PrintWriter(file, "UTF-8"); |
| writer.println(content); |
| writer.close(); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to write execution plan json to file", e); |
| } |
| } |
| |
| private Map<String, String> getGeneratedConfig() { |
| Map<String, String> generatedConfig = new HashMap<>(); |
| |
| Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc); |
| generatedConfig.putAll(systemStreamConfigs); |
| |
| StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig)); |
| Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds()); |
| inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams |
| |
| final ApplicationConfig.ApplicationMode mode; |
| if (inputStreamIds.isEmpty()) { |
| mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default |
| } else { |
| mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded) |
| ? ApplicationConfig.ApplicationMode.BATCH |
| : ApplicationConfig.ApplicationMode.STREAM; |
| } |
| |
| generatedConfig.put(ApplicationConfig.APP_MODE, mode.name()); |
| |
| // adding app.class in the configuration, unless it is LegacyTaskApplication |
| if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) { |
| generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName()); |
| } |
| return generatedConfig; |
| } |
| |
| private Map<String, String> generateSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { |
| Map<String, String> systemStreamConfigs = new HashMap<>(); |
| appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); |
| appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); |
| appDesc.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); |
| appDesc.getDefaultSystemDescriptor().ifPresent(dsd -> |
| systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM, dsd.getSystemName())); |
| return systemStreamConfigs; |
| } |
| |
| /** |
| * Generates configs for a single job in app, job.id from app.id and job.name from app.name config |
| * If both job.id and app.id is defined, app.id takes precedence and job.id is set to value of app.id |
| * If both job.name and app.name is defined, app.name takes precedence and job.name is set to value of app.name |
| * |
| * @param userConfigs configs passed from user |
| * |
| */ |
| public static MapConfig generateSingleJobConfig(Map<String, String> userConfigs) { |
| Map<String, String> generatedConfig = new HashMap<>(userConfigs); |
| |
| if (!userConfigs.containsKey(JobConfig.JOB_NAME) && !userConfigs.containsKey(ApplicationConfig.APP_NAME)) { |
| throw new SamzaException("Samza app name should not be null, Please set either app.name (preferred) or job.name (deprecated) in configs"); |
| } |
| |
| if (userConfigs.containsKey(JobConfig.JOB_ID)) { |
| LOG.warn("{} is a deprecated configuration, use app.id instead.", JobConfig.JOB_ID); |
| } |
| |
| if (userConfigs.containsKey(JobConfig.JOB_NAME)) { |
| LOG.warn("{} is a deprecated configuration, use use app.name instead.", JobConfig.JOB_NAME); |
| } |
| |
| if (userConfigs.containsKey(ApplicationConfig.APP_NAME)) { |
| String appName = userConfigs.get(ApplicationConfig.APP_NAME); |
| LOG.info("app.name is defined, generating job.name equal to app.name value: {}", appName); |
| generatedConfig.put(JobConfig.JOB_NAME, appName); |
| } |
| |
| if (userConfigs.containsKey(ApplicationConfig.APP_ID)) { |
| String appId = userConfigs.get(ApplicationConfig.APP_ID); |
| LOG.info("app.id is defined, generating job.id equal to app.name value: {}", appId); |
| generatedConfig.put(JobConfig.JOB_ID, appId); |
| } |
| |
| return new MapConfig(generatedConfig); |
| } |
| |
| } |