package org.apache.samoa.utils;
* #%L
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.samoa.topology.EntranceProcessingItem;
import org.apache.samoa.topology.IProcessingItem;
import org.apache.samoa.topology.ProcessingItem;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.impl.SamoaSystemFactory;
import org.apache.samoa.topology.impl.SamzaEntranceProcessingItem;
import org.apache.samoa.topology.impl.SamzaProcessingItem;
import org.apache.samoa.topology.impl.SamzaStream;
import org.apache.samoa.topology.impl.SamzaTopology;
import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.local.LocalJobFactory;
import org.apache.samza.job.yarn.YarnJobFactory;
import org.apache.samza.system.kafka.KafkaSystemFactory;
* Generate Configs that will be used to submit Samza jobs from the input topology (one config per PI/EntrancePI in the
* topology)
* @author Anh Thu Vu
public class SamzaConfigFactory {
public static final String SYSTEM_NAME = "samoa";
private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
private static final String DEFAULT_BROKER_LIST = "localhost:9092";
public static final String COMMA = ",";
public static final String COLON = ":";
public static final String DOT = ".";
public static final char DOLLAR_SIGN = '$';
public static final char QUESTION_MARK = '?';
public static final String SHUFFLE = "shuffle";
public static final String KEY = "key";
public static final String BROADCAST = "broadcast";
// JOB
public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
public static final String JOB_NAME_KEY = "";
public static final String YARN_PACKAGE_KEY = "yarn.package.path";
public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb";
public static final String AM_MEMORY_KEY = "";
public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
// TASK (SAMZA original)
public static final String TASK_CLASS_KEY = "task.class";
public static final String TASK_INPUTS_KEY = "task.inputs";
// TASK (extra)
public static final String FILE_KEY = "task.processor.file";
public static final String FILESYSTEM_KEY = "task.processor.filesystem";
public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs";
public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect";
public static final String BROKER_URI_KEY = "";
public static final String KAFKA_BATCHSIZE_KEY = "producer.batch.num.messages";
public static final String KAFKA_PRODUCER_TYPE_KEY = "producer.producer.type";
public static final String SERDE_REGISTRATION_KEY = "kryo.register";
// Instance variables
private boolean isLocalMode;
private String zookeeper;
private String kafkaBrokerList;
private int replicationFactor;
private int amMemory;
private int containerMemory;
private int piPerContainerRatio;
private int checkpointFrequency; // in ms
private String jarPath;
private String kryoRegisterFile = null;
public SamzaConfigFactory() {
this.isLocalMode = false;
this.zookeeper = DEFAULT_ZOOKEEPER;
this.kafkaBrokerList = DEFAULT_BROKER_LIST;
this.checkpointFrequency = 60000; // default: 1 minute
this.replicationFactor = 1;
* Setter methods
public SamzaConfigFactory setYarnPackage(String packagePath) {
this.jarPath = packagePath;
return this;
public SamzaConfigFactory setLocalMode(boolean isLocal) {
this.isLocalMode = isLocal;
return this;
public SamzaConfigFactory setZookeeper(String zk) {
this.zookeeper = zk;
return this;
public SamzaConfigFactory setKafka(String brokerList) {
this.kafkaBrokerList = brokerList;
return this;
public SamzaConfigFactory setCheckpointFrequency(int freq) {
this.checkpointFrequency = freq;
return this;
public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
this.replicationFactor = replicationFactor;
return this;
public SamzaConfigFactory setAMMemory(int mem) {
this.amMemory = mem;
return this;
public SamzaConfigFactory setContainerMemory(int mem) {
this.containerMemory = mem;
return this;
public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
this.piPerContainerRatio = piPerContainer;
return this;
public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
this.kryoRegisterFile = kryoRegister;
return this;
* Generate a map of all config properties for the input SamzaProcessingItem
private Map<String, String> getMapForPI(SamzaProcessingItem pi, String filename, String filesystem) throws Exception {
Map<String, String> map = getBasicSystemConfig();
// Set job name, task class, task inputs (from SamzaProcessingItem)
setJobName(map, pi.getName());
setTaskClass(map, SamzaProcessingItem.class.getName());
StringBuilder streamNames = new StringBuilder();
boolean first = true;
for (SamzaSystemStream stream : pi.getInputStreams()) {
if (!first)
streamNames.append(stream.getSystem() + DOT + stream.getStream());
if (first)
first = false;
setTaskInputs(map, streamNames.toString());
// Processor file
setFileName(map, filename);
setFileSystem(map, filesystem);
List<String> nameList = new ArrayList<String>();
// Default kafka system: kafka0: sync producer
// This system is always required: it is used for checkpointing
setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
// Output streams: set kafka systems
for (SamzaStream stream : pi.getOutputStreams()) {
boolean found = false;
for (String name : nameList) {
if (stream.getSystemName().equals(name)) {
found = true;
if (!found) {
setKafkaSystem(map, stream.getSystemName(), this.zookeeper, this.kafkaBrokerList, stream.getBatchSize());
// Input streams: set kafka systems
for (SamzaSystemStream stream : pi.getInputStreams()) {
boolean found = false;
for (String name : nameList) {
if (stream.getSystem().equals(name)) {
found = true;
if (!found) {
setKafkaSystem(map, stream.getSystem(), this.zookeeper, this.kafkaBrokerList, 1);
// Checkpointing
setValue(map, "task.checkpoint.factory", "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
setValue(map, "task.checkpoint.system", "kafka0");
setValue(map, "", "1000");
setValue(map, "task.checkpoint.replication.factor", Integer.toString(this.replicationFactor));
// Number of containers
setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);
return map;
* Generate a map of all config properties for the input SamzaProcessingItem
public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String filesystem) {
Map<String, String> map = getBasicSystemConfig();
// Set job name, task class (from SamzaEntranceProcessingItem)
setJobName(map, epi.getName());
setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
// Input for the entrance task (from our custom consumer)
setTaskInputs(map, SYSTEM_NAME + "." + epi.getName());
// Output from entrance task
// Since entrancePI should have only 1 output stream
// there is no need for checking the batch size, setting different system
// names
// The custom consumer (samoa system) does not suuport reading from a
// specific index
// => no need for checkpointing
SamzaStream outputStream = (SamzaStream) epi.getOutputStream();
// Set samoa system factory
setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", SamoaSystemFactory.class.getName());
// Set Kafka system (only if there is an output stream)
if (outputStream != null)
setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, this.kafkaBrokerList,
// Processor file
setFileName(map, filename);
setFileSystem(map, filesystem);
// Number of containers
setNumberOfContainers(map, 1, this.piPerContainerRatio);
return map;
* Generate a list of map (of config properties) for all PIs and EPI in the
* input topology
public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) throws Exception {
List<Map<String, String>> maps = new ArrayList<Map<String, String>>();
// File to write serialized objects
String filename = topology.getTopologyName() + ".dat";
Path dirPath = FileSystems.getDefault().getPath("dat");
Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), filename);
String dstPath = filePath.toString();
String resPath;
String filesystem;
if (this.isLocalMode) {
filesystem = SystemsUtils.LOCAL_FS;
File dir = dirPath.toFile();
if (!dir.exists())
else {
filesystem = SystemsUtils.HDFS;
// Correct system name for streams
// Add all PIs to a collection (map)
Map<String, Object> piMap = new HashMap<String, Object>();
Set<EntranceProcessingItem> entranceProcessingItems = topology.getEntranceProcessingItems();
Set<IProcessingItem> processingItems = topology.getNonEntranceProcessingItems();
for (EntranceProcessingItem epi : entranceProcessingItems) {
SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
piMap.put(sepi.getName(), sepi);
for (IProcessingItem pi : processingItems) {
SamzaProcessingItem spi = (SamzaProcessingItem) pi;
piMap.put(spi.getName(), spi);
// Serialize all PIs
boolean serialized = false;
if (this.isLocalMode) {
serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath);
resPath = dstPath;
else {
resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath);
serialized = resPath != null;
if (!serialized) {
throw new Exception("Fail serialize map of PIs to file");
// MapConfig for all PIs
for (EntranceProcessingItem epi : entranceProcessingItems) {
SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem));
for (IProcessingItem pi : processingItems) {
SamzaProcessingItem spi = (SamzaProcessingItem) pi;
maps.add(this.getMapForPI(spi, resPath, filesystem));
return maps;
* Construct a list of MapConfigs for a Topology
* @return the list of MapConfigs
* @throws Exception
public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) throws Exception {
List<MapConfig> configs = new ArrayList<MapConfig>();
List<Map<String, String>> maps = this.getMapsForTopology(topology);
for (Map<String, String> map : maps) {
configs.add(new MapConfig(map));
return configs;
public void setSystemNameForStreams(Set<Stream> streams) {
Map<Integer, String> batchSizeMap = new HashMap<Integer, String>();
batchSizeMap.put(1, "kafka0"); // default system with sync producer
int counter = 0;
for (Stream stream : streams) {
SamzaStream samzaStream = (SamzaStream) stream;
String systemName = batchSizeMap.get(samzaStream.getBatchSize());
if (systemName == null) {
// Add new system
systemName = "kafka" + Integer.toString(counter);
batchSizeMap.put(samzaStream.getBatchSize(), systemName);
* Generate a map with common properties for PIs and EPI
private Map<String, String> getBasicSystemConfig() {
Map<String, String> map = new HashMap<String, String>();
// Job & Task
if (this.isLocalMode)
map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName());
else {
map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName());
// yarn
map.put(YARN_PACKAGE_KEY, jarPath);
map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory));
map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
map.put(CONTAINER_COUNT_KEY, "1");
map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome());
// Task opts (Heap size = 0.75 container memory)
int heapSize = (int) (0.75 * this.containerMemory);
map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M -XX:+PrintGCDateStamps");
map.put(JOB_NAME_KEY, "");
map.put(TASK_CLASS_KEY, "");
map.put(TASK_INPUTS_KEY, "");
// register serializer
map.put("serializers.registry.kryo.class", SamzaKryoSerdeFactory.class.getName());
// Serde registration
setKryoRegistration(map, this.kryoRegisterFile);
return map;
* Helper methods to set different properties in the input map
private static void setJobName(Map<String, String> map, String jobName) {
map.put(JOB_NAME_KEY, jobName);
private static void setFileName(Map<String, String> map, String filename) {
map.put(FILE_KEY, filename);
private static void setFileSystem(Map<String, String> map, String filesystem) {
map.put(FILESYSTEM_KEY, filesystem);
private static void setTaskClass(Map<String, String> map, String taskClass) {
map.put(TASK_CLASS_KEY, taskClass);
private static void setTaskInputs(Map<String, String> map, String inputs) {
map.put(TASK_INPUTS_KEY, inputs);
private static void setKryoRegistration(Map<String, String> map, String kryoRegisterFile) {
if (kryoRegisterFile != null) {
String value = readKryoRegistration(kryoRegisterFile);
private static void setNumberOfContainers(Map<String, String> map, int parallelism, int piPerContainer) {
int res = parallelism / piPerContainer;
if (parallelism % piPerContainer != 0)
map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
private static void setKafkaSystem(Map<String, String> map, String systemName, String zk, String brokers,
int batchSize) {
map.put("systems." + systemName + ".samza.factory", KafkaSystemFactory.class.getName());
map.put("systems." + systemName + ".samza.msg.serde", "kryo");
map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk);
map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers);
map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, Integer.toString(batchSize));
map.put("systems." + systemName + ".samza.offset.default", "oldest");
if (batchSize > 1) {
map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "async");
else {
map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync");
// Set custom properties
private static void setValue(Map<String, String> map, String key, String value) {
map.put(key, value);
* Helper method to parse Kryo registration file
private static String readKryoRegistration(String filePath) {
FileInputStream fis = null;
Properties props = new Properties();
StringBuilder result = new StringBuilder();
try {
fis = new FileInputStream(filePath);
boolean first = true;
String value = null;
for (String k : props.stringPropertyNames()) {
if (!first)
first = false;
// Need to avoid the dollar sign as samza pass all the properties in
// the config to containers via commandline parameters/enviroment
// variables
// We might escape the dollar sign, but it's more complicated than
// replacing it with something else
result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
value = props.getProperty(k);
if (value != null && value.trim().length() > 0) {
result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
} catch (IOException e) {
// TODO Auto-generated catch block
} finally {
if (fis != null)
try {
} catch (IOException e) {
// TODO Auto-generated catch block
return result.toString();