blob: 6f33517f2d54e8af4231e22a56f245cfc9d457e0 [file] [log] [blame]
/*
*
* *
* * * Licensed to the Apache Software Foundation (ASF) under one or more
* * * contributor license agreements. See the NOTICE file distributed with
* * * this work for additional information regarding copyright ownership.
* * * The ASF licenses this file to You under the Apache License, Version 2.0
* * * (the "License"); you may not use this file except in compliance with
* * * the License. You may obtain a copy of the License at
* * * <p/>
* * * http://www.apache.org/licenses/LICENSE-2.0
* * * <p/>
* * * Unless required by applicable law or agreed to in writing, software
* * * distributed under the License is distributed on an "AS IS" BASIS,
* * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * * See the License for the specific language governing permissions and
* * * limitations under the License.
* *
*
*/
package org.apache.eagle.security.auditlog;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt;
/**
* Since 8/11/16.
*/
public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
@Override
public BaseRichBolt getParserBolt(Config config) {
return new HdfsAuditLogParserBolt(config);
}
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
BaseRichBolt parserBolt = getParserBolt(config);
BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
boltDeclarer.shuffleGrouping("ingest");
HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config);
BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks);
// sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
// ------------------------------
// sensitivityJoin -> ipZoneJoin
// ------------------------------
IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config);
BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks);
// ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin");
// ------------------------
// ipZoneJoin -> kafkaSink
// ------------------------
StormStreamSink sinkBolt = environment.getStreamSink("HDFS_AUDIT_LOG_ENRICHED_STREAM", config);
BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks);
kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin");
if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
builder.setSpout("trafficSpout", environment.getStreamSource("HADOOP_JMX_RESOURCE_STREAM", config), 1)
.setNumTasks(1);
builder.setBolt("trafficParserBolt", new TrafficParserBolt(config), 1)
.setNumTasks(1)
.shuffleGrouping("trafficSpout");
builder.setBolt("trafficSinkBolt", environment.getStreamSink("HDFS_AUDIT_LOG_TRAFFIC_STREAM", config), 1)
.setNumTasks(1)
.shuffleGrouping("trafficParserBolt");
}
return builder.createTopology();
}
@Override
public String getSinkStreamName() {
return "hdfs_audit_log_stream";
}
public static void main(String[] args) {
Config config = ConfigFactory.load();
HdfsAuditLogApplication app = new HdfsAuditLogApplication();
app.run(config);
}
}