blob: 9c30036e46e19f25eba29ecbff6b1ae752f880fd [file] [log] [blame]
package org.apache.samoa.topology.impl;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.List;
import java.util.Set;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.Topology;
import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
import org.apache.samoa.utils.SamzaConfigFactory;
import org.apache.samoa.utils.SystemsUtils;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.JobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class will submit a list of Samza jobs with the Configs generated from the input topology
*
* @author Anh Thu Vu
*
*/
public class SamzaEngine {
private static final Logger logger = LoggerFactory.getLogger(SamzaEngine.class);
/*
* Singleton instance
*/
private static SamzaEngine engine = new SamzaEngine();
private String zookeeper;
private String kafka;
private int kafkaReplicationFactor;
private boolean isLocalMode;
private String yarnPackagePath;
private String yarnConfHome;
private String kryoRegisterFile;
private int amMem;
private int containerMem;
private int piPerContainerRatio;
private int checkpointFrequency;
private void _submitTopology(SamzaTopology topology) {
// Setup SamzaConfigFactory
SamzaConfigFactory configFactory = new SamzaConfigFactory();
configFactory.setLocalMode(isLocalMode)
.setZookeeper(zookeeper)
.setKafka(kafka)
.setYarnPackage(yarnPackagePath)
.setAMMemory(amMem)
.setContainerMemory(containerMem)
.setPiPerContainerRatio(piPerContainerRatio)
.setKryoRegisterFile(kryoRegisterFile)
.setCheckpointFrequency(checkpointFrequency)
.setReplicationFactor(kafkaReplicationFactor);
// Generate the list of Configs
List<MapConfig> configs;
try {
// ConfigFactory generate a list of configs
// Serialize a map of PIs and store in a file in the jar at jarFilePath
// (in dat/ folder)
configs = configFactory.getMapConfigsForTopology(topology);
} catch (Exception e) {
e.printStackTrace();
return;
}
// Create kafka streams
Set<Stream> streams = topology.getStreams();
for (Stream stream : streams) {
SamzaStream samzaStream = (SamzaStream) stream;
List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams();
for (SamzaSystemStream systemStream : systemStreams) {
// all streams should be kafka streams
SystemsUtils.createKafkaTopic(systemStream.getStream(), systemStream.getParallelism(), kafkaReplicationFactor);
}
}
// Submit the jobs with those configs
for (MapConfig config : configs) {
logger.info("Config:{}", config);
JobRunner jobRunner = new JobRunner(config);
jobRunner.run();
}
}
private void _setupSystemsUtils() {
// Setup Utils
if (!isLocalMode)
SystemsUtils.setHadoopConfigHome(yarnConfHome);
SystemsUtils.setZookeeper(zookeeper);
}
/*
* Setter methods
*/
public static SamzaEngine getEngine() {
return engine;
}
public SamzaEngine setZooKeeper(String zk) {
this.zookeeper = zk;
return this;
}
public SamzaEngine setKafka(String kafka) {
this.kafka = kafka;
return this;
}
public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
this.kafkaReplicationFactor = replicationFactor;
return this;
}
public SamzaEngine setCheckpointFrequency(int freq) {
this.checkpointFrequency = freq;
return this;
}
public SamzaEngine setLocalMode(boolean isLocal) {
this.isLocalMode = isLocal;
return this;
}
public SamzaEngine setYarnPackage(String yarnPackagePath) {
this.yarnPackagePath = yarnPackagePath;
return this;
}
public SamzaEngine setConfigHome(String configHome) {
this.yarnConfHome = configHome;
return this;
}
public SamzaEngine setAMMemory(int mem) {
this.amMem = mem;
return this;
}
public SamzaEngine setContainerMemory(int mem) {
this.containerMem = mem;
return this;
}
public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
this.piPerContainerRatio = piPerContainer;
return this;
}
public SamzaEngine setKryoRegisterFile(String registerFile) {
this.kryoRegisterFile = registerFile;
return this;
}
/**
* Submit a list of Samza jobs correspond to the submitted topology
*
* @param topo
* the submitted topology
*/
public static void submitTopology(SamzaTopology topo) {
// Setup SystemsUtils
engine._setupSystemsUtils();
// Submit topology
engine._submitTopology(topo);
}
}