blob: 7b9d9a54c6b4a18546c84f9c2eafe6389962648a [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* * <p/>
* * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.eagle.security.auditlog;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.typesafe.config.Config;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
/**
* Since 8/10/16.
*/
public class HdfsAuditLogParserBolt extends BaseRichBolt {
private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
private static final String DATASOURCE_TIMEZONE_PATH = "dataSourceConfig.timeZone";
private OutputCollector collector;
private HDFSAuditLogParser parser;
public HdfsAuditLogParserBolt(Config config) {
if (config.hasPath(DATASOURCE_TIMEZONE_PATH)) {
TimeZone timeZone = TimeZone.getTimeZone(config.getString(DATASOURCE_TIMEZONE_PATH));
parser = new HDFSAuditLogParser(timeZone);
} else {
parser = new HDFSAuditLogParser();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String logLine = input.getString(0);
HDFSAuditLogObject entity = null;
try {
entity = parser.parse(logLine);
Map<String, Object> map = new TreeMap<>();
map.put(HDFSAuditLogObject.HDFS_SRC_KEY, entity.src);
map.put(HDFSAuditLogObject.HDFS_DST_KEY, entity.dst);
map.put(HDFSAuditLogObject.HDFS_HOST_KEY, entity.host);
map.put(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY, entity.timestamp);
map.put(HDFSAuditLogObject.HDFS_ALLOWED_KEY, entity.allowed);
map.put(HDFSAuditLogObject.HDFS_USER_KEY, entity.user);
map.put(HDFSAuditLogObject.HDFS_CMD_KEY, entity.cmd);
collector.emit(input, Collections.singletonList(map));
} catch (Exception ex) {
LOG.error("Failing parse audit log message {}", logLine, ex);
} finally {
collector.ack(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("f1"));
}
}