blob: 15b53b72c53e4c9a6c7db28e60148afc327d1ee3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.integration.components;
import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.parsers.topology.ParserTopologyBuilder;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.KillOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParserTopologyComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Properties topologyProperties;
private String brokerUrl;
private List<String> sensorTypes;
private LocalCluster stormCluster;
private String outputTopic;
private String errorTopic;
public static class Builder {
Properties topologyProperties;
String brokerUrl;
List<String> sensorTypes;
String outputTopic;
String errorTopic;
public Builder withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
return this;
}
public Builder withBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
return this;
}
public Builder withSensorTypes(List<String> sensorTypes) {
this.sensorTypes = sensorTypes;
return this;
}
public Builder withOutputTopic(String topic) {
this.outputTopic = topic;
return this;
}
public Builder withErrorTopic(String topic) {
this.errorTopic = topic;
return this;
}
public ParserTopologyComponent build() {
if(sensorTypes == null || sensorTypes.isEmpty()) {
throw new IllegalArgumentException("The sensor type must be defined.");
}
if(outputTopic == null) {
throw new IllegalArgumentException("The output topic must be defined.");
}
return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorTypes, outputTopic, errorTopic);
}
}
public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, List<String> sensorTypes, String outputTopic, String errorTopic) {
this.topologyProperties = topologyProperties;
this.brokerUrl = brokerUrl;
this.sensorTypes = sensorTypes;
this.outputTopic = outputTopic;
this.errorTopic = errorTopic;
}
public void updateSensorTypes(List<String> sensorTypes) {
this.sensorTypes = sensorTypes;
}
@Override
public void start() throws UnableToStartException {
try {
final Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
Optional.ofNullable(brokerUrl),
sensorTypes,
(x,y) -> Collections.nCopies(sensorTypes.size(), 1),
(x,y) -> Collections.nCopies(sensorTypes.size(), 1),
(x,y) -> 1,
(x,y) -> 1,
(x,y) -> 1,
(x,y) -> 1,
(x,y) -> Collections.nCopies(sensorTypes.size(), new HashMap<>()),
(x,y) -> null,
(x,y) -> outputTopic,
(x,y) -> errorTopic,
(x,y) -> {
Config c = new Config();
c.putAll(stormConf);
return c;
}
);
stormCluster = new LocalCluster();
stormCluster.submitTopology(getTopologyName(), stormConf, topologyBuilder.getBuilder().createTopology());
} catch (Exception e) {
throw new UnableToStartException("Unable to start parser topology for sensorTypes: " + sensorTypes, e);
}
}
@Override
public void stop() {
if (stormCluster != null) {
try {
try {
// Kill the topology directly instead of sitting through the wait period
killTopology();
stormCluster.shutdown();
} catch (IllegalStateException ise) {
if (!(ise.getMessage().contains("It took over") && ise.getMessage().contains("to shut down slot"))) {
throw ise;
}
else {
assassinateSlots();
LOG.error("Storm slots didn't shut down entirely cleanly *sigh*. " +
"I gave them the old one-two-skadoo and killed the slots with prejudice. " +
"If tests fail, we'll have to find a better way of killing them.", ise);
}
}
}
catch(Throwable t) {
LOG.error(t.getMessage(), t);
}
finally {
cleanupWorkerDir();
}
}
}
@Override
public void reset() {
if (stormCluster != null) {
killTopology();
}
}
protected void killTopology() {
KillOptions ko = new KillOptions();
ko.set_wait_secs(0);
stormCluster.killTopologyWithOpts(getTopologyName(), ko);
try {
// Actually wait for it to die.
Thread.sleep(2000);
} catch (InterruptedException e) {
// Do nothing
}
}
protected String getTopologyName() {
return StringUtils.join(sensorTypes, "__");
}
}