blob: b21d62de119842a15d6e748d7bed4dfac5d6f3bb [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* * <p/>
* * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.eagle.security.auditlog;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import com.typesafe.config.Config;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.messaging.EntityStreamPersist;
import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.partition.*;
import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
/**
* Since 8/10/16.
*/
public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks";
public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled";
private final static String TRAFFIC_MONITOR_TASK_NUM = "topology.numOfTrafficMonitorTasks";
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
int numOfTrafficMonitorTasks = config.hasPath(TRAFFIC_MONITOR_TASK_NUM) ? config.getInt(TRAFFIC_MONITOR_TASK_NUM) : numOfParserTasks;
builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
// ---------------------
// ingest -> parserBolt
// ---------------------
BaseRichBolt parserBolt = getParserBolt(config);
BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
boltDeclarer.shuffleGrouping("ingest");
// Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition");
// if (useDefaultPartition) {
// boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
// } else {
// boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config)));
// }
// ------------------------------
// parserBolt -> sensitivityJoin
// ------------------------------
HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config);
BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks);
// sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config);
BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfTrafficMonitorTasks);
auditLogAccumulatorDeclarer.setNumTasks(numOfTrafficMonitorTasks).shuffleGrouping("parserBolt");
}
// ------------------------------
// sensitivityJoin -> ipZoneJoin
// ------------------------------
IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config);
BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks);
// ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin");
// ------------------------
// ipZoneJoin -> kafkaSink
// ------------------------
StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream", config);
BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks);
kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin");
return builder.createTopology();
}
public abstract BaseRichBolt getParserBolt(Config config);
public abstract String getSinkStreamName();
public static PartitionStrategy createStrategy(Config config) {
// TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
String topic = config.getString("dataSourceConfig.topic");
DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
return strategy;
}
}