blob: 60b0e3623c6d40249e4127a633ed513cd7c1eb18 [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.eagle.security.auditlog;
import backtype.storm.spout.SchemeAsMultiScheme;
import com.typesafe.config.Config;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
import org.apache.eagle.datastream.ExecutionEnvironments;
import org.apache.eagle.datastream.core.StreamProducer;
import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.apache.eagle.partition.DataDistributionDao;
import org.apache.eagle.partition.PartitionAlgorithm;
import org.apache.eagle.partition.PartitionStrategy;
import org.apache.eagle.partition.PartitionStrategyImpl;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class HdfsAuditLogProcessorMain {
public static PartitionStrategy createStrategy(Config config) {
// TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
String topic = config.getString("dataSourceConfig.topic");
DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
return strategy;
}
public static KafkaSourcedSpoutProvider createProvider(Config config) {
String deserClsName = config.getString("dataSourceConfig.deserializerClass");
final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
@Override
public List<Object> deserialize(byte[] ser) {
Object tmp = deserializer.deserialize(ser);
Map<String, Object> map = (Map<String, Object>)tmp;
if(tmp == null) return null;
return Arrays.asList(map.get("user"), tmp);
}
};
KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
@Override
public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
return new SchemeAsMultiScheme(scheme);
}
};
return provider;
}
@SuppressWarnings("unchecked")
public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0));
//StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
//source.streamUnion(reassembler)
source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
env.execute();
}
@SuppressWarnings("unchecked")
public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
PartitionStrategy strategy = createStrategy(env.getConfig());
StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy);
//StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
//source.streamUnion(reassembler)
source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
env.execute();
}
public static void main(String[] args) throws Exception{
StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
Config config = env.getConfig();
KafkaSourcedSpoutProvider provider = createProvider(env.getConfig());
Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
if (balancePartition) {
execWithBalancedPartition(env, provider);
} else {
execWithDefaultPartition(env, provider);
}
}
}