As Apache Eagle consumes the data via Kafka[^KAFKA] topics in some topologies, such as HDFS audit log. To enable the full function of monitoring, a user needs to stream its data into a Kafka topic.
There are two ways to do that. The first one is Logstash, which naturally supports Kafka as the output plugin; the second one is to install a namenode log4j Kafka appender.
Step 1: Create a Kafka topic as the streaming input.
Here is an sample Kafka command to create topic ‘sandbox_hdfs_audit_log’
cd <kafka-home> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
Step 2: Install Logstash-kafka plugin
For Logstash 1.5.x, logstash-kafka has been intergrated into logstash-input-kafka and logstash-output-kafka, and released with the 1.5 version of Logstash. So you can directly use it.
For Logstash 1.4.x, a user should install logstash-kafka firstly. Notice that this version does not support partition_key_format.
Step 3: Create a Logstash configuration file under ${LOGSTASH_HOME}/conf. Here is a sample.
input { file { type => "hdp-nn-audit" path => "/path/to/audit.log" start_position => end sincedb_path => "/var/log/logstash/" } } filter{ if [type] == "hdp-nn-audit" { grok { match => ["message", "ugi=(?<user>([\w\d\-]+))@|ugi=(?<user>([\w\d\-]+))/[\w\d\-.]+@|ugi=(?<user>([\w\d.\-_]+))[\s(]+"] } } } output { if [type] == "hdp-nn-audit" { kafka { codec => plain { format => "%{message}" } broker_list => "localhost:9092" topic_id => "sandbox_hdfs_audit_log" request_required_acks => 0 request_timeout_ms => 10000 producer_type => "async" message_send_max_retries => 3 retry_backoff_ms => 100 queue_buffering_max_ms => 5000 queue_enqueue_timeout_ms => 5000 batch_num_messages => 200 send_buffer_bytes => 102400 client_id => "hdp-nn-audit" partition_key_format => "%{user}" } # stdout { codec => rubydebug } } }
Step 4: Start Logstash
bin/logstash -f conf/sample.conf
Step 5: Check whether logs are flowing into the kafka topic specified by topic_id
Notice that if you use Ambari[^AMBARI], such as in sandbox, you must follow below steps via Ambari UI. In addition, restarting namenode is required.
Step 1: Create a Kafka topic. Here is a example Kafka command for creating topic “sandbox_hdfs_audit_log”
cd <kafka-home> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
Step 2: Configure $HADOOP_CONF_DIR/log4j.properties, and add a log4j appender “KAFKA_HDFS_AUDIT” to hdfs audit logging
log4j.appender.KAFKA_HDFS_AUDIT=org.apache.eagle.log4j.kafka.KafkaLog4jAppender log4j.appender.KAFKA_HDFS_AUDIT.Topic=sandbox_hdfs_audit_log log4j.appender.KAFKA_HDFS_AUDIT.BrokerList=sandbox.hortonworks.com:6667 log4j.appender.KAFKA_HDFS_AUDIT.KeyClass=org.apache.eagle.log4j.kafka.hadoop.AuditLogKeyer log4j.appender.KAFKA_HDFS_AUDIT.Layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA_HDFS_AUDIT.Layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n log4j.appender.KAFKA_HDFS_AUDIT.ProducerType=async #log4j.appender.KAFKA_HDFS_AUDIT.BatchSize=1 #log4j.appender.KAFKA_HDFS_AUDIT.QueueSize=1
Step 3: Edit $HADOOP_CONF_DIR/hadoop-env.sh, and add the reference to KAFKA_HDFS_AUDIT to HADOOP_NAMENODE_OPTS.
-Dhdfs.audit.logger=INFO,DRFAAUDIT,KAFKA_HDFS_AUDIT
Step 4: Edit $HADOOP_CONF_DIR/hadoop-env.sh, and append the following command to it.
export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:/path/to/eagle/lib/log4jkafka/lib/*
Step 5: save the changes and restart the namenode.
Step 6: Check whether logs are flowing into Topic sandbox_hdfs_audit_log
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sandbox_hdfs_audit_log
[^AMBARI]:all mentions of “ambari” on this page represent Apache Ambari. [^KAFKA]:All mentions of “kafka” on this page represent Apache Kafka.