| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.samza.translation; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.io.File; |
| import java.net.URI; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import org.apache.beam.runners.core.construction.SerializablePipelineOptions; |
| import org.apache.beam.runners.core.serialization.Base64Serializer; |
| import org.apache.beam.runners.samza.SamzaExecutionEnvironment; |
| import org.apache.beam.runners.samza.SamzaPipelineOptions; |
| import org.apache.beam.runners.samza.container.BeamContainerRunner; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.samza.config.ApplicationConfig; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.ConfigFactory; |
| import org.apache.samza.config.JobConfig; |
| import org.apache.samza.config.JobCoordinatorConfig; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.config.TaskConfig; |
| import org.apache.samza.config.ZkConfig; |
| import org.apache.samza.config.factories.PropertiesConfigFactory; |
| import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; |
| import org.apache.samza.job.yarn.YarnJobFactory; |
| import org.apache.samza.runtime.LocalApplicationRunner; |
| import org.apache.samza.runtime.RemoteApplicationRunner; |
| import org.apache.samza.serializers.ByteSerdeFactory; |
| import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; |
| import org.apache.samza.zk.ZkJobCoordinatorFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Builder class to generate configs for BEAM samza runner during runtime. */ |
| public class ConfigBuilder { |
| private static final Logger LOG = LoggerFactory.getLogger(ConfigBuilder.class); |
| |
| private static final String APP_RUNNER_CLASS = "app.runner.class"; |
| private static final String YARN_PACKAGE_PATH = "yarn.package.path"; |
| private static final String JOB_FACTORY_CLASS = "job.factory.class"; |
| |
| private final Map<String, String> config = new HashMap<>(); |
| private final SamzaPipelineOptions options; |
| |
| public ConfigBuilder(SamzaPipelineOptions options) { |
| this.options = options; |
| } |
| |
| public void put(String name, String property) { |
| config.put(name, property); |
| } |
| |
| public void putAll(Map<String, String> properties) { |
| config.putAll(properties); |
| } |
| |
| public Config build() { |
| try { |
| // apply framework configs |
| config.putAll(createSystemConfig(options)); |
| |
| // apply user configs |
| config.putAll(createUserConfig(options)); |
| |
| config.put(ApplicationConfig.APP_NAME, options.getJobName()); |
| config.put(ApplicationConfig.APP_ID, options.getJobInstance()); |
| config.put(JobConfig.JOB_NAME(), options.getJobName()); |
| config.put(JobConfig.JOB_ID(), options.getJobInstance()); |
| |
| config.put( |
| "beamPipelineOptions", |
| Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options))); |
| |
| validateConfigs(options, config); |
| |
| return new MapConfig(config); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static Map<String, String> createUserConfig(SamzaPipelineOptions options) |
| throws Exception { |
| final Map<String, String> config = new HashMap<>(); |
| |
| // apply user configs |
| final String configFilePath = options.getConfigFilePath(); |
| |
| // If user provides a config file, use it as base configs. |
| if (StringUtils.isNoneEmpty(configFilePath)) { |
| LOG.info("configFilePath: " + configFilePath); |
| |
| final File configFile = new File(configFilePath); |
| final URI configUri = configFile.toURI(); |
| final ConfigFactory configFactory = |
| options.getConfigFactory().getDeclaredConstructor().newInstance(); |
| |
| LOG.info("configFactory: " + configFactory.getClass().getName()); |
| |
| // Config file must exist for default properties config |
| // TODO: add check to all non-empty files once we don't need to |
| // pass the command-line args through the containers |
| if (configFactory instanceof PropertiesConfigFactory) { |
| checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath); |
| } |
| |
| config.putAll(configFactory.getConfig(configUri)); |
| } |
| // Apply override on top |
| if (options.getConfigOverride() != null) { |
| config.putAll(options.getConfigOverride()); |
| } |
| |
| return config; |
| } |
| |
| private static void validateZKStandAloneRun(Map<String, String> config) { |
| checkArgument( |
| config.containsKey(APP_RUNNER_CLASS), |
| "Config %s not found for %s Deployment", |
| APP_RUNNER_CLASS, |
| SamzaExecutionEnvironment.STANDALONE); |
| checkArgument( |
| config.get(APP_RUNNER_CLASS).equals(LocalApplicationRunner.class.getName()), |
| "Config %s must be set to %s for %s Deployment", |
| APP_RUNNER_CLASS, |
| LocalApplicationRunner.class.getName(), |
| SamzaExecutionEnvironment.STANDALONE); |
| checkArgument( |
| config.containsKey(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY), |
| "Config %s not found for %s Deployment", |
| JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, |
| SamzaExecutionEnvironment.STANDALONE); |
| checkArgument( |
| config |
| .get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY) |
| .equals(ZkJobCoordinatorFactory.class.getName()), |
| "Config %s must be set to %s for %s Deployment", |
| JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, |
| ZkJobCoordinatorFactory.class.getName(), |
| SamzaExecutionEnvironment.STANDALONE); |
| checkArgument( |
| config.containsKey(ZkConfig.ZK_CONNECT), |
| "Config %s not found for %s Deployment", |
| ZkConfig.ZK_CONNECT, |
| SamzaExecutionEnvironment.STANDALONE); |
| } |
| |
| private static void validateYarnRun(Map<String, String> config) { |
| checkArgument( |
| config.containsKey(YARN_PACKAGE_PATH), |
| "Config %s not found for %s Deployment", |
| YARN_PACKAGE_PATH, |
| SamzaExecutionEnvironment.YARN); |
| final String appRunner = config.get(APP_RUNNER_CLASS); |
| checkArgument( |
| appRunner == null |
| || RemoteApplicationRunner.class.getName().equals(appRunner) |
| || BeamContainerRunner.class.getName().equals(appRunner), |
| "Config %s must be set to %s for %s Deployment", |
| APP_RUNNER_CLASS, |
| RemoteApplicationRunner.class.getName(), |
| SamzaExecutionEnvironment.YARN); |
| checkArgument( |
| config.containsKey(JOB_FACTORY_CLASS), |
| "Config %s not found for %s Deployment", |
| JOB_FACTORY_CLASS, |
| SamzaExecutionEnvironment.YARN); |
| } |
| |
| @VisibleForTesting |
| public static Map<String, String> localRunConfig() { |
| // Default Samza config using local deployment of a single JVM |
| return ImmutableMap.<String, String>builder() |
| .put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName()) |
| .put( |
| JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, |
| PassthroughJobCoordinatorFactory.class.getName()) |
| .put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()) |
| .put(TaskConfig.COMMIT_MS(), "-1") |
| .put("processor.id", "1") |
| .put( |
| // TODO: remove after SAMZA-1531 is resolved |
| ApplicationConfig.APP_RUN_ID, |
| String.valueOf(System.currentTimeMillis()) |
| + "-" |
| // use the most significant bits in UUID (8 digits) to avoid collision |
| + UUID.randomUUID().toString().substring(0, 8)) |
| .build(); |
| } |
| |
| public static Map<String, String> yarnRunConfig() { |
| // Default Samza config using yarn deployment |
| return ImmutableMap.<String, String>builder() |
| .put(APP_RUNNER_CLASS, RemoteApplicationRunner.class.getName()) |
| .put(JOB_FACTORY_CLASS, YarnJobFactory.class.getName()) |
| .build(); |
| } |
| |
| public static Map<String, String> standAloneRunConfig() { |
| // Default Samza config using stand alone deployment |
| return ImmutableMap.<String, String>builder() |
| .put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName()) |
| .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName()) |
| .build(); |
| } |
| |
| private static Map<String, String> createSystemConfig(SamzaPipelineOptions options) { |
| ImmutableMap.Builder<String, String> configBuilder = |
| ImmutableMap.<String, String>builder() |
| .put( |
| "stores.beamStore.factory", |
| "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory") |
| .put("stores.beamStore.key.serde", "byteSerde") |
| .put("stores.beamStore.msg.serde", "byteSerde") |
| .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName()); |
| |
| if (options.getStateDurable()) { |
| LOG.info("stateDurable is enabled"); |
| configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore")); |
| configBuilder.put("job.host-affinity.enabled", "true"); |
| } |
| |
| LOG.info("Execution environment is " + options.getSamzaExecutionEnvironment()); |
| switch (options.getSamzaExecutionEnvironment()) { |
| case YARN: |
| configBuilder.putAll(yarnRunConfig()); |
| break; |
| case STANDALONE: |
| configBuilder.putAll(standAloneRunConfig()); |
| break; |
| default: // LOCAL |
| configBuilder.putAll(localRunConfig()); |
| break; |
| } |
| |
| // TODO: remove after we sort out Samza task wrapper |
| configBuilder.put("samza.li.task.wrapper.enabled", "false"); |
| |
| return configBuilder.build(); |
| } |
| |
| private static void validateConfigs(SamzaPipelineOptions options, Map<String, String> config) { |
| |
| // validate execution environment |
| switch (options.getSamzaExecutionEnvironment()) { |
| case YARN: |
| validateYarnRun(config); |
| break; |
| case STANDALONE: |
| validateZKStandAloneRun(config); |
| break; |
| default: |
| // do nothing |
| break; |
| } |
| } |
| |
| static String getChangelogTopic(SamzaPipelineOptions options, String storeName) { |
| return String.format( |
| "%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(), storeName); |
| } |
| } |