blob: f8e068455c91f122e929b72df0bd8e18cedd4f79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import java.io.File;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a temporary helper class to include all common logic to generate {@link JobConfig}s for high- and low-level
* applications in {@link org.apache.samza.runtime.LocalApplicationRunner} and {@link org.apache.samza.runtime.RemoteApplicationRunner}.
*
* TODO: Fix SAMZA-1811 to consolidate this class with {@link ExecutionPlanner}
*/
public abstract class JobPlanner {
private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class);
protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
protected final Config userConfig;
JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
this.appDesc = descriptor;
this.userConfig = descriptor.getConfig();
}
public abstract List<JobConfig> prepareJobs();
StreamManager buildAndStartStreamManager(Config config) {
StreamManager streamManager = new StreamManager(config);
streamManager.start();
return streamManager;
}
ExecutionPlan getExecutionPlan() {
return getExecutionPlan(null);
}
/* package private */
ExecutionPlan getExecutionPlan(String runId) {
Map<String, String> allowedUserConfig = new HashMap<>(userConfig);
Map<String, String> generatedConfig = new HashMap<>();
// TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
// Don't generate any configurations for LegacyTaskApplications
if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
// Don't allow overriding task.inputs to a blank string
if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) {
allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
}
generatedConfig.putAll(getGeneratedConfig(runId));
}
if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
}
// merge user-provided configuration with generated configuration. generated configuration has lower priority.
Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
// creating the StreamManager to get all input/output streams' metadata for planning
StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
try {
ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager);
return planner.plan(appDesc);
} finally {
streamManager.stop();
}
}
/**
* Write the execution plan JSON to a file
* @param planJson JSON representation of the plan
*/
final void writePlanJsonFile(String planJson) {
try {
String content = "plan='" + planJson + "'";
String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
if (planPath != null && !planPath.isEmpty()) {
// Write the plan json to plan path
File file = new File(planPath + "/plan.json");
file.setReadable(true, false);
PrintWriter writer = new PrintWriter(file, "UTF-8");
writer.println(content);
writer.close();
}
} catch (Exception e) {
LOG.warn("Failed to write execution plan json to file", e);
}
}
private Map<String, String> getGeneratedConfig(String runId) {
Map<String, String> generatedConfig = new HashMap<>();
if (StringUtils.isNoneEmpty(runId)) {
generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
}
Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
generatedConfig.putAll(systemStreamConfigs);
StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig));
Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
final ApplicationConfig.ApplicationMode mode;
if (inputStreamIds.isEmpty()) {
mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default
} else {
mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
? ApplicationConfig.ApplicationMode.BATCH
: ApplicationConfig.ApplicationMode.STREAM;
}
generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
// adding app.class in the configuration, unless it is LegacyTaskApplication
if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
}
return generatedConfig;
}
private Map<String, String> generateSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
Map<String, String> systemStreamConfigs = new HashMap<>();
appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
appDesc.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
appDesc.getDefaultSystemDescriptor().ifPresent(dsd ->
systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM, dsd.getSystemName()));
return systemStreamConfigs;
}
/**
* Generates configs for a single job in app, job.id from app.id and job.name from app.name config
* If both job.id and app.id is defined, app.id takes precedence and job.id is set to value of app.id
* If both job.name and app.name is defined, app.name takes precedence and job.name is set to value of app.name
*
* @param userConfigs configs passed from user
*
*/
public static MapConfig generateSingleJobConfig(Map<String, String> userConfigs) {
Map<String, String> generatedConfig = new HashMap<>(userConfigs);
if (!userConfigs.containsKey(JobConfig.JOB_NAME) && !userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
throw new SamzaException("Samza app name should not be null, Please set either app.name (preferred) or job.name (deprecated) in configs");
}
if (userConfigs.containsKey(JobConfig.JOB_ID)) {
LOG.warn("{} is a deprecated configuration, use app.id instead.", JobConfig.JOB_ID);
}
if (userConfigs.containsKey(JobConfig.JOB_NAME)) {
LOG.warn("{} is a deprecated configuration, use use app.name instead.", JobConfig.JOB_NAME);
}
if (userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
String appName = userConfigs.get(ApplicationConfig.APP_NAME);
LOG.info("app.name is defined, generating job.name equal to app.name value: {}", appName);
generatedConfig.put(JobConfig.JOB_NAME, appName);
}
if (userConfigs.containsKey(ApplicationConfig.APP_ID)) {
String appId = userConfigs.get(ApplicationConfig.APP_ID);
LOG.info("app.id is defined, generating job.id equal to app.name value: {}", appId);
generatedConfig.put(JobConfig.JOB_ID, appId);
}
return new MapConfig(generatedConfig);
}
}