Log Feeder Shipper Descriptor

Top Level Descriptors

Input, Filter and Output configurations are defined in 3 (at least) different files. (note: there can be multiple input configuration files, but only 1 output and global configuration)

input.config-myservice.json example:

{
  "input" : [
  ],
  "filter" : [
  ]
}

output.config.json example:

{
  "output" : [
  ]
}

global.config.json example:

{
  "global" : {
    "source" : "file",
    "add_fields":{
      "cluster":"cl1"
    },
    "tail" : "true"
  }
}
PathDescriptionDefaultExamples
/filterA list of filter descriptionsEMPTY{"filter" : [ {"filter": "json", "conditions": {"fields": { "type": ["logsearch_app", "logsearch_perf"]} } } ]}
/inputA list of input descriptionsEMPTY{"input" : [ {"type": "myinput_service_type"}] }
/outputA list of output descriptors{}{"output": [{"is_enabled" : "true", "destination": "solr", "myfield": "myvalue"}]
/globalA map that contains field/value pairsEMPTY{"global": {"myfield": "myvalue"}}

Input Descriptor

Input configurations (for monitoring logs) can be defined in the input descriptor section. Example:

{
  "input" : [
    {
      "type": "simple_service",
      "rowtype": "service",
      "path": "/var/logs/my/service/service_sample.log",
      "group": "Ambari",
      "cache_enabled": "true",
      "cache_key_field": "log_message",
      "cache_size": "50"
    },
    {
      "type": "simple_service_json",
      "rowtype": "service",
      "path": "/var/logs/my/service/service_sample.json",
      "properties": {
        "myKey1" : "myValue1",
        "myKey2" : "myValue2"
      }
    },
    {
      "type": "simple_audit_service",
      "rowtype": "audit",
      "path": "/var/logs/my/service/service_audit_sample.log",
      "is_enabled": "true",
      "add_fields": {
        "logType": "AmbariAudit",
        "enforcer": "ambari-acl",
        "repoType": "1",
        "repo": "ambari",
        "level": "INFO"
      }
    },
    {
     "type": "wildcard_log_service",
     "rowtype": "service",
     "path": "/var/logs/my/service/*/service_audit_sample.log",
     "init_default_fields" : "true",
     "detach_interval_min": "50",
     "detach_time_min" : "300",
     "path_update_interval_min" : "10",
     "max_age_min" : "800"
    },
    {
      "type": "service_socket",
      "rowtype": "service",
      "port": 61999,
      "protocol" : "tcp",
      "secure" : "false",
      "source" : "socket",
      "log4j": "true"
    },
    {
      "type": "docker_service",
      "rowtype": "service",
      "docker" : "true",
      "default_log_levels" : [
         "FATAL", "ERROR", "WARN", "INFO", "DEBUG"
      ]
    }
  ]
}
PathDescriptionDefaultExamples
/input/[]/add_fieldsThe element contains field_name: field_value pairs which will be added to each rows data.EMPTY"cluster":"cluster_name"
/input/[]/cache_dedup_intervalThe maximum interval in ms which may pass between two identical log messages to filter the latter out.1000500
/input/[]/cache_enabledAllows the input to use a cache to filter out duplications.falsetruefalse
/input/[]/cache_key_fieldSpecifies the field for which to use the cache to find duplications of.log_messagesome_field_prone_to_repeating_value
/input/[]/cache_last_dedup_enabledAllow to filter out entries which are same as the most recent one irrelevant of it's time.falsetruefalse
/input/[]/cache_sizeThe number of entries to store in the cache.10050
/input/[]/checkpoint_interval_msThe time interval in ms when the checkpoint file should be updated.500010000
/input/[]/class_nameCustom class which implements an input typeEMPTYorg.example.MyInputSource
/input/[]/copy_fileShould the file be copied (only if not processed).falsetruefalse
/input/[]/default_log_levelsUse these as default log levels for the input - overrides the global default log levels.EMPTYdefault_log_levels: ["INFO", "WARN"]
/input/[]/detach_interval_minThe period in minutes for checking which files are too old (default: 300)180060
/input/[]/detach_time_minThe period in minutes when the application flags a file is too old (default: 2000)200060
/input/[]/dockerInput comes from a docker container.falsetruefalse
/input/[]/gen_event_md5Generate an event_md5 field for each row by creating a hash of the row data.truetruefalse
/input/[]/groupGroup of the input type.EMPTYAmbariYarn
/input/[]/init_default_fieldsInit default fields (ip, path etc.) before applying the filter.falsetruefalse
/input/[]/is_enabledA flag to show if the input should be used.truetruefalse
/input/[]/log4jUse Log4j serialized objects (e.g.: SocketAppender)falsetrue
/input/[]/max_age_minIf the file has not modified for long (this time value in minutes), then the checkpoint file can be deleted.02000
/input/[]/pathThe path of the source, may contain ‘*’ characters too.EMPTY/var/log/ambari-logsearch-logfeeder/logsearch-logfeeder.json/var/log/zookeeper/zookeeper*.log
/input/[]/path_update_interval_minThe period in minutes for checking new files (default: 5, based on detach values, its possible that a new input wont be monitored)55
/input/[]/portUnique port for specific socket inputEMPTY61999
/input/[]/process_fileShould the file be processed.truetruefalse
/input/[]/propertiesCustom key value pairsEMPTY{k1 : v1, k2: v2}
/input/[]/protocolProtocol type for socket server (tcp / udp - udp is not supported right now)tcpudptcp
/input/[]/rowtypeThe type of the row.EMPTYserviceaudit
/input/[]/secureUse SSLfalsetrue
/input/[]/sourceThe type of the input source.EMPTYfiles3_file
/input/[]/tailThe input should check for only the latest file matching the pattern, not all of them.truetruefalse
/input/[]/typeThe log id for this source.EMPTYzookeeperambari_server
/input/[]/use_event_md5_as_idGenerate an id for each row by creating a hash of the row data.falsetruefalse

Filter Descriptor

Filter configurations can be defined in the filter descriptor section.

  • Sample 1 for example (json - simple_service_json):
{"level":"WARN","file":"ClientCnxn.java","thread_name":"zkCallback-6-thread-10-SendThread(c6402.ambari.apache.org:2181)","line_number":1102,"log_message":"Session 0x355e0023b38001d for server null, unexpected error, closing socket connection and attempting reconnect\njava.net.SocketException: Network is unreachable\n\tat sun.nio.ch.Net.connect0(Native Method)\n\tat sun.nio.ch.Net.connect(Net.java:454)\n\tat sun.nio.ch.Net.connect(Net.java:446)\n\tat sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)\n\tat org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)\n\tat org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)\n\tat org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967)\n\tat org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003)\n","logger_name":"org.apache.zookeeper.ClientCnxn","logtime":"1468406756757"}
  • Sample 2 for example (grok - simple_service):
2016-07-13 10:45:49,640 [WARN] Sample log line 1 - warn level
that is a multiline
2016-07-13 10:45:49,640 [ERROR] Sample log line 2 - error level
2016-07-13 10:45:50,351 [INFO] Sample log line 3 - info level
  • Sample 3 for example (grok + key/value - ambari_audit):
2016-10-03T16:26:13.333Z, User(admin), RemoteIp(192.168.64.1), Operation(User login), Roles(
    Ambari: Ambari Administrator
), Status(Success)
2016-10-03T16:26:54.834Z, User(admin), RemoteIp(192.168.64.1), Operation(Repository update), RequestType(PUT), url(http://c6401.ambari.apache.org:8080/api/v1/stacks/HDP/versions/2.5/operating_systems/redhat6/repositories/HDP-UTILS-1.1.0.21), ResultStatus(200 OK), Stack(HDP), Stack version(2.5), OS(redhat6), Repo id(HDP-UTILS-1.1.0.21), Base URL(http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6)
2016-10-03T16:26:54.845Z, User(admin), RemoteIp(192.168.64.1), Operation(Repository update), RequestType(PUT), url(http://c6401.ambari.apache.org:8080/api/v1/stacks/HDP/versions/2.5/operating_systems/redhat7/repositories/HDP-2.5), ResultStatus(200 OK), Stack(HDP), Stack version(2.5), OS(redhat7), Repo id(HDP-2.5), Base URL(http://public-repo-1.hortonworks.com/HDP/centos7/2.x/updates/2.5.0.0/)

Example:

{
  "input" : [
  ],
  "filter": [
    {
      "filter": "json",
      "conditions": {
        "fields": {
          "type": [
            "simple_service_json"
          ]
        }
      }
    },
    {
      "filter": "grok",
      "deep_extract": "false",
      "conditions":{
        "fields":{
          "type":[
            "simple_service",
            "simple_audit_service",
            "docker_service"
          ]
        }
      },
      "log4j_format":"# can be anything - only use it as marker/helper as it does not supported yet",
      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
      "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}\\[%{LOGLEVEL:level}\\]%{SPACE}%{GREEDYDATA:log_message}}",
      "post_map_values":{
        "logtime":{
          "map_date":{
            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
          }
        }
      }
    },
    {
      "filter": "grok",
      "conditions": {
        "fields": {
          "type": [
            "ambari_audit"
          ]
        }
      },
      "log4j_format": "%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n",
      "multiline_pattern": "^(%{TIMESTAMP_ISO8601:evtTime})",
      "message_pattern": "(?m)^%{TIMESTAMP_ISO8601:evtTime},%{SPACE}%{GREEDYDATA:log_message}",
      "post_map_values": {
        "evtTime": {
          "map_date": {
            "target_date_pattern": "yyyy-MM-dd'T'HH:mm:ss.SSSXX"
          }
        }
      }
    },
    {
      "filter": "keyvalue",
      "sort_order": 1,
      "conditions": {
        "fields": {
          "type": [
            "ambari_audit"
          ]
        }
      },
      "source_field": "log_message",
      "field_split": ", ",
      "value_borders": "()",
      "post_map_values": {
        "User": {
          "map_field_value": {
            "pre_value": "null",
            "post_value": "unknown"
          },
          "map_field_name": {
            "new_field_name": "reqUser"
          }
        }
      }
    }
  ]
}
PathDescriptionDefaultExamples
/filter/[]/conditionsThe conditions of which input to filter.EMPTY
/filter/[]/conditions/fieldsThe fields in the input element of which's value should be met.EMPTY"fields"{"type": ["hdfs_audit", "hdfs_datanode"]}
/filter/[]/conditions/fields/typeThe acceptable values for the type field in the input element.EMPTY"ambari_server""spark_jobhistory_server", "spark_thriftserver", "livy_server"
/filter/[]/deep_extractKeep the full named regex collection for Grok filters.EMPTYtrue
/filter/[]/field_splitThe string that splits the key-value pairs.\t ,
/filter/[]/filterThe type of the filter.EMPTYgrokkeyvaluejson
/filter/[]/is_enabledA flag to show if the filter should be used.truetruefalse
/filter/[]/log4j_formatThe log4j pattern of the log, not used, it is only there for documentation.EMPTY%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
/filter/[]/message_patternThe grok pattern to use to parse the log entry.EMPTY(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}-%{SPACE}%{LOGLEVEL:level}%{SPACE}\[%{DATA:thread_name}\@%{INT:line_number}\]%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}
/filter/[]/multiline_patternThe grok pattern that shows that the line is not a log line on it's own but the part of a multi line entry.EMPTY^(%{TIMESTAMP_ISO8601:logtime})
/filter/[]/post_map_valuesMappings done after the filtering provided it's result.EMPTY
/filter/[]/remove_source_fieldRemove the source field after the filter is applied.falsetruefalse
/filter/[]/skip_on_errorSkip filter if an error occurred during applying the grok filter.EMPTYtrue
/filter/[]/sort_orderDescribes the order in which the filters should be applied.EMPTY13
/filter/[]/source_fieldThe source of the filter, must be set for keyvalue filters.log_messagefield_further_to_filter
/filter/[]/value_bordersThe borders around the value, must be 2 characters long, first before it, second after it.EMPTY()[]{}
/filter/[]/value_splitThe string that separates keys from values.=:->

Mapper Descriptor

Mapper configurations are defined inside filters, it can alter fields. Example:

{
  "input": [
  ],
  "filter": [
    {
      "filter": "keyvalue",
      "sort_order": 1,
      "conditions": {
        "fields": {
          "type": [
            "ambari_audit"
          ]
        }
      },
      "source_field": "log_message",
      "field_split": ", ",
      "value_borders": "()",
      "post_map_values": {
        "Status": {
          "map_field_value": {
            "pre_value": "null",
            "post_value": "unknown"
          },
          "map_field_name": {
            "new_field_name": "ws_status"
          }
        },
        "StatusWithRepeatedKeys": [
          {
            "map_field_value": {
              "pre_value": "Failed",
              "post_value": "0"
            }
          },
          {
            "map_field_value": {
              "pre_value": "Failed to queue",
              "post_value": "0"
            }
          }
        ]
      }
    }
  ]
}
PathDescriptionDefaultExamples
/filter/[]/post_map_values/{field_name}/[]/map_anonymize/hide_charThe character to hide with*X-
/filter/[]/post_map_values/{field_name}/[]/map_anonymize/patternThe pattern to use to identify parts to anonymize. The parts to hide should be marked with the “” string.EMPTYSome secret is here: <hide>, and another one is here: <hide>
/filter/[]/post_map_values/{field_name}/[]/map_copy/copy_nameThe name of the copied fieldEMPTYnew_name
/filter/[]/post_map_values/{field_name}/[]/map_custom/class_nameCustom class which implements a mapper typeEMPTYorg.example.MyMapper
/filter/[]/post_map_values/{field_name}/[]/map_custom/propertiesCustom key value pairsEMPTY{k1 : v1, k2: v2}
/filter/[]/post_map_values/{field_name}/[]/map_date/src_date_patternIf it is specified than the mapper converts from this format to the target, and also adds missing yearEMPTYMMM dd HH:mm:ss
/filter/[]/post_map_values/{field_name}/[]/map_date/target_date_patternIf ‘epoch’ then the field is parsed as seconds from 1970, otherwise the content used as patternEMPTYyyyy-MM-dd HH:mm:ss,SSSepoch
/filter/[]/post_map_values/{field_name}/[]/map_field_name/new_field_nameThe name of the renamed fieldEMPTYnew_name
/filter/[]/post_map_values/{field_name}/[]/map_field_value/post_valueThe value to which the field is modified toEMPTYnew_value
/filter/[]/post_map_values/{field_name}/[]/map_field_value/pre_valueThe value that the field must match (ignoring case) to be mappedEMPTYold_value

Output Descriptor

Output configurations can be defined in the output descriptor section. (it can support any extra - output specific - key value pairs) Example:

{
  "output": [
    {
      "is_enabled": "true",
      "comment": "Output to solr for service logs",
      "collection" : "hadoop_logs",
      "destination": "solr",
      "zk_connect_string": "localhost:9983",
      "type": "service",
      "conditions": {
        "fields": {
          "rowtype": [
            "service"
          ]
        }
      }
    },
    {
      "comment": "Output to solr for audit records",
      "is_enabled": "true",
      "collection" : "audit_logs",
      "destination": "solr",
      "zk_connect_string": "localhost:9983",
      "type": "audit",
      "conditions": {
        "fields": {
          "rowtype": [
            "audit"
          ]
        }
      }
    }
  ]
}
PathDescriptionDefaultExamples
/output/[]/conditionsThe conditions of which input to filter.EMPTY
/output/[]/conditions/fieldsThe fields in the input element of which's value should be met.EMPTY"fields"{"type": ["hdfs_audit", "hdfs_datanode"]}
/output/[]/conditions/fields/typeThe acceptable values for the type field in the input element.EMPTY"ambari_server""spark_jobhistory_server", "spark_thriftserver", "livy_server"
/output/[]/is_enabledA flag to show if the output should be used.truetruefalse
/output/[]/destinationAlias of a supported output (e.g.: solr). The class-alias mapping should exist in the alias config.EMPTY"solr""hdfs"
/output/[]/typeOutput type name, right now it can be service or auditEMPTY"service""audit"