blob: dd09282c52b49664a4447729cdebe09dd54de0bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.pe.simulator;
import net.acesinc.data.json.generator.EventGenerator;
import net.acesinc.data.json.generator.SimulationRunner;
import net.acesinc.data.json.generator.config.SimulationConfig;
import net.acesinc.data.json.generator.config.WorkflowConfig;
import net.acesinc.data.json.generator.log.EventLogger;
import net.acesinc.data.json.generator.log.KafkaLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StreamPipesSimulationRunner {
private static final Logger log = LogManager.getLogger(SimulationRunner.class);
private SimulationConfig config;
private List<EventGenerator> eventGenerators;
private List<Thread> eventGenThreads;
private boolean running;
private List<EventLogger> eventLoggers;
private Map<String, TopicAwareWorkflow> simulationWorkflows;
private String kafkaHost;
private Integer kafkaPort;
public StreamPipesSimulationRunner(SimulationConfig config, Map<String,
TopicAwareWorkflow>
simulationWorkflows, String kafkaHost, Integer kafkaPort) {
this.config = config;
this.kafkaHost = kafkaHost;
this.kafkaPort = kafkaPort;
eventGenerators = new ArrayList<>();
eventLoggers = new ArrayList<>();
eventGenThreads = new ArrayList<>();
this.simulationWorkflows = simulationWorkflows;
setupSimulation();
}
private Map<String, Object> makeProducerConfig(String topic) {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("broker.server", this.kafkaHost);
producerConfig.put("broker.port", this.kafkaPort);
producerConfig.put("flatten", false);
producerConfig.put("sync", false);
producerConfig.put("topic", topic);
return producerConfig;
}
private void setupSimulation() {
running = false;
for (WorkflowConfig workflowConfig : config.getWorkflows()) {
if (simulationWorkflows.containsKey(workflowConfig.getWorkflowFilename())) {
TopicAwareWorkflow w = simulationWorkflows.get(workflowConfig.getWorkflowFilename());
KafkaLogger kafkaLogger = new KafkaLogger(makeProducerConfig(w.getTargetTopic()));
eventLoggers.add(kafkaLogger);
final EventGenerator gen = new EventGenerator(w, workflowConfig,
Collections.singletonList(kafkaLogger));
log.info("Adding EventGenerator for [ " + workflowConfig.getWorkflowName() + "," + workflowConfig.getWorkflowFilename() + " ]");
eventGenerators.add(gen);
eventGenThreads.add(new Thread(gen));
}
}
}
public void startSimulation() {
log.info("Starting Simulation");
if (eventGenThreads.size() > 0) {
for (Thread t : eventGenThreads) {
t.start();
}
running = true;
}
}
public void stopSimulation() {
log.info("Stopping Simulation");
for (Thread t : eventGenThreads) {
t.interrupt();
}
for (EventLogger l : eventLoggers) {
l.shutdown();
}
running = false;
}
public boolean isRunning() {
return running;
}
}