blob: accdd124686fa1c2e4e4f274404d904b9e915a58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.translation;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.container.BeamContainerRunner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.config.factories.PropertiesConfigFactory;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.yarn.YarnJobFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.serializers.ByteSerdeFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Builder class to generate configs for BEAM samza runner during runtime. */
public class ConfigBuilder {
private static final Logger LOG = LoggerFactory.getLogger(ConfigBuilder.class);
private static final String APP_RUNNER_CLASS = "app.runner.class";
private static final String YARN_PACKAGE_PATH = "yarn.package.path";
private static final String JOB_FACTORY_CLASS = "job.factory.class";
private final Map<String, String> config = new HashMap<>();
private final SamzaPipelineOptions options;
public ConfigBuilder(SamzaPipelineOptions options) {
this.options = options;
}
public void put(String name, String property) {
config.put(name, property);
}
public void putAll(Map<String, String> properties) {
config.putAll(properties);
}
public Config build() {
try {
// apply framework configs
config.putAll(createSystemConfig(options));
// apply user configs
config.putAll(createUserConfig(options));
config.put(ApplicationConfig.APP_NAME, options.getJobName());
config.put(ApplicationConfig.APP_ID, options.getJobInstance());
config.put(JobConfig.JOB_NAME(), options.getJobName());
config.put(JobConfig.JOB_ID(), options.getJobInstance());
config.put(
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
validateConfigs(options, config);
return new MapConfig(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static Map<String, String> createUserConfig(SamzaPipelineOptions options)
throws Exception {
final Map<String, String> config = new HashMap<>();
// apply user configs
final String configFilePath = options.getConfigFilePath();
// If user provides a config file, use it as base configs.
if (StringUtils.isNoneEmpty(configFilePath)) {
LOG.info("configFilePath: " + configFilePath);
final File configFile = new File(configFilePath);
final URI configUri = configFile.toURI();
final ConfigFactory configFactory =
options.getConfigFactory().getDeclaredConstructor().newInstance();
LOG.info("configFactory: " + configFactory.getClass().getName());
// Config file must exist for default properties config
// TODO: add check to all non-empty files once we don't need to
// pass the command-line args through the containers
if (configFactory instanceof PropertiesConfigFactory) {
checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath);
}
config.putAll(configFactory.getConfig(configUri));
}
// Apply override on top
if (options.getConfigOverride() != null) {
config.putAll(options.getConfigOverride());
}
return config;
}
private static void validateZKStandAloneRun(Map<String, String> config) {
checkArgument(
config.containsKey(APP_RUNNER_CLASS),
"Config %s not found for %s Deployment",
APP_RUNNER_CLASS,
SamzaExecutionEnvironment.STANDALONE);
checkArgument(
config.get(APP_RUNNER_CLASS).equals(LocalApplicationRunner.class.getName()),
"Config %s must be set to %s for %s Deployment",
APP_RUNNER_CLASS,
LocalApplicationRunner.class.getName(),
SamzaExecutionEnvironment.STANDALONE);
checkArgument(
config.containsKey(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY),
"Config %s not found for %s Deployment",
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
SamzaExecutionEnvironment.STANDALONE);
checkArgument(
config
.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY)
.equals(ZkJobCoordinatorFactory.class.getName()),
"Config %s must be set to %s for %s Deployment",
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
ZkJobCoordinatorFactory.class.getName(),
SamzaExecutionEnvironment.STANDALONE);
checkArgument(
config.containsKey(ZkConfig.ZK_CONNECT),
"Config %s not found for %s Deployment",
ZkConfig.ZK_CONNECT,
SamzaExecutionEnvironment.STANDALONE);
}
private static void validateYarnRun(Map<String, String> config) {
checkArgument(
config.containsKey(YARN_PACKAGE_PATH),
"Config %s not found for %s Deployment",
YARN_PACKAGE_PATH,
SamzaExecutionEnvironment.YARN);
final String appRunner = config.get(APP_RUNNER_CLASS);
checkArgument(
appRunner == null
|| RemoteApplicationRunner.class.getName().equals(appRunner)
|| BeamContainerRunner.class.getName().equals(appRunner),
"Config %s must be set to %s for %s Deployment",
APP_RUNNER_CLASS,
RemoteApplicationRunner.class.getName(),
SamzaExecutionEnvironment.YARN);
checkArgument(
config.containsKey(JOB_FACTORY_CLASS),
"Config %s not found for %s Deployment",
JOB_FACTORY_CLASS,
SamzaExecutionEnvironment.YARN);
}
@VisibleForTesting
public static Map<String, String> localRunConfig() {
// Default Samza config using local deployment of a single JVM
return ImmutableMap.<String, String>builder()
.put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName())
.put(
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName())
.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName())
.put(TaskConfig.COMMIT_MS(), "-1")
.put("processor.id", "1")
.put(
// TODO: remove after SAMZA-1531 is resolved
ApplicationConfig.APP_RUN_ID,
String.valueOf(System.currentTimeMillis())
+ "-"
// use the most significant bits in UUID (8 digits) to avoid collision
+ UUID.randomUUID().toString().substring(0, 8))
.build();
}
public static Map<String, String> yarnRunConfig() {
// Default Samza config using yarn deployment
return ImmutableMap.<String, String>builder()
.put(APP_RUNNER_CLASS, RemoteApplicationRunner.class.getName())
.put(JOB_FACTORY_CLASS, YarnJobFactory.class.getName())
.build();
}
public static Map<String, String> standAloneRunConfig() {
// Default Samza config using stand alone deployment
return ImmutableMap.<String, String>builder()
.put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName())
.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName())
.build();
}
private static Map<String, String> createSystemConfig(SamzaPipelineOptions options) {
ImmutableMap.Builder<String, String> configBuilder =
ImmutableMap.<String, String>builder()
.put(
"stores.beamStore.factory",
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
.put("stores.beamStore.key.serde", "byteSerde")
.put("stores.beamStore.msg.serde", "byteSerde")
.put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName());
if (options.getStateDurable()) {
LOG.info("stateDurable is enabled");
configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
configBuilder.put("job.host-affinity.enabled", "true");
}
LOG.info("Execution environment is " + options.getSamzaExecutionEnvironment());
switch (options.getSamzaExecutionEnvironment()) {
case YARN:
configBuilder.putAll(yarnRunConfig());
break;
case STANDALONE:
configBuilder.putAll(standAloneRunConfig());
break;
default: // LOCAL
configBuilder.putAll(localRunConfig());
break;
}
// TODO: remove after we sort out Samza task wrapper
configBuilder.put("samza.li.task.wrapper.enabled", "false");
return configBuilder.build();
}
private static void validateConfigs(SamzaPipelineOptions options, Map<String, String> config) {
// validate execution environment
switch (options.getSamzaExecutionEnvironment()) {
case YARN:
validateYarnRun(config);
break;
case STANDALONE:
validateZKStandAloneRun(config);
break;
default:
// do nothing
break;
}
}
static String getChangelogTopic(SamzaPipelineOptions options, String storeName) {
return String.format(
"%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(), storeName);
}
}