As Apache Eagle consumes the data via Kafka[^KAFKA] topics in some topologies, such as HDFS audit log. To enable the full function of monitoring, a user needs to stream its data into a Kafka topic.

There are two ways to do that. The first one is Logstash, which naturally supports Kafka as the output plugin; the second one is to install a namenode log4j Kafka appender.


  • Step 1: Create a Kafka topic as the streaming input.

    Here is an sample Kafka command to create topic ‘sandbox_hdfs_audit_log’

    cd <kafka-home>
    bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
  • Step 2: Install Logstash-kafka plugin

    • For Logstash 1.5.x, logstash-kafka has been intergrated into logstash-input-kafka and logstash-output-kafka, and released with the 1.5 version of Logstash. So you can directly use it.

    • For Logstash 1.4.x, a user should install logstash-kafka firstly. Notice that this version does not support partition_key_format.

  • Step 3: Create a Logstash configuration file under ${LOGSTASH_HOME}/conf. Here is a sample.

      input {
          file {
              type => "hdp-nn-audit"
              path => "/path/to/audit.log"
              start_position => end
              sincedb_path => "/var/log/logstash/"
          if [type] == "hdp-nn-audit" {
      	   grok {
      	       match => ["message", "ugi=(?<user>([\w\d\-]+))@|ugi=(?<user>([\w\d\-]+))/[\w\d\-.]+@|ugi=(?<user>([\w\d.\-_]+))[\s(]+"]
      output {
          if [type] == "hdp-nn-audit" {
              kafka {
                  codec => plain {
                      format => "%{message}"
                  broker_list => "localhost:9092"
                  topic_id => "sandbox_hdfs_audit_log"
                  request_required_acks => 0
                  request_timeout_ms => 10000
                  producer_type => "async"
                  message_send_max_retries => 3
                  retry_backoff_ms => 100
                  queue_buffering_max_ms => 5000
                  queue_enqueue_timeout_ms => 5000
                  batch_num_messages => 200
                  send_buffer_bytes => 102400
                  client_id => "hdp-nn-audit"
                  partition_key_format => "%{user}"
              # stdout { codec => rubydebug }
  • Step 4: Start Logstash

    bin/logstash -f conf/sample.conf
  • Step 5: Check whether logs are flowing into the kafka topic specified by topic_id

Log4j Kafka Appender

Notice that if you use Ambari[^AMBARI], such as in sandbox, you must follow below steps via Ambari UI. In addition, restarting namenode is required.

  • Step 1: Create a Kafka topic. Here is a example Kafka command for creating topic “sandbox_hdfs_audit_log”

    cd <kafka-home>
    bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sandbox_hdfs_audit_log
  • Step 2: Configure $HADOOP_CONF_DIR/, and add a log4j appender “KAFKA_HDFS_AUDIT” to hdfs audit logging

    log4j.appender.KAFKA_HDFS_AUDIT.Layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

    HDFS LOG4J Configuration

  • Step 3: Edit $HADOOP_CONF_DIR/, and add the reference to KAFKA_HDFS_AUDIT to HADOOP_NAMENODE_OPTS.


    HDFS Environment Configuration

  • Step 4: Edit $HADOOP_CONF_DIR/, and append the following command to it.

    export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:/path/to/eagle/lib/log4jkafka/lib/*

    HDFS Environment Configuration

  • Step 5: save the changes and restart the namenode.

  • Step 6: Check whether logs are flowing into Topic sandbox_hdfs_audit_log

    $ /usr/hdp/current/kafka-broker/bin/ --zookeeper localhost:2181 --topic sandbox_hdfs_audit_log


