| #!/usr/bin/env python3 |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # Loggy (Jr) - A log file grobbler for Python 3 |
| |
| import time |
| import logging |
| import watchdog.observers |
| import watchdog.events |
| import os |
| import json |
| import re |
| import collections |
| import base64 |
| import hashlib |
| import elasticsearch |
| import elasticsearch.helpers |
| import threading |
| import socket |
| import yaml |
| |
| # Disable most ES logging, or it'll litter syslog |
| tracer = logging.getLogger('elasticsearch') |
| tracer.setLevel(logging.CRITICAL) |
| tracer.addHandler(logging.FileHandler('loggy.log')) |
| |
| DEBUG = False |
| inodes = {} |
| inodes_path = {} |
| filehandles = {} |
| mytags = '' |
| |
| json_pending = {} |
| last_push = {} |
| config = None |
| dd_config = None |
| gotindex = {} |
| fp = {} |
| tag_overrides = {} |
| |
| RSA_KEY = '/etc/ssh/ssh_host_rsa_key.pub' |
| FINGERPRINT = '' |
| FINGERPRINT_SHA = '' |
| |
| |
| es = None |
| hostname = socket.getfqdn() |
| |
| regexes = { |
| 'apache_access': re.compile( |
| r"(?P<client_ip>[\d\.]+)\s" |
| r"(?P<identity>\S*)\s" |
| r"(?P<user>\S*)\s" |
| r"\[(?P<time>.*?)\]\s" |
| r'"(?P<request>.*?)"\s' |
| r"(?P<status>\d+)\s" |
| r"(?P<bytes>\S*)\s" |
| r'"(?P<referer>.*?)"\s' |
| r'"(?P<user_agent>.*?)"\s*' |
| ), |
| 'apache_error': re.compile( |
| r"\[(?P<date>.*?)\]\s+" |
| r"\[(?P<module>.*?)\]\s+" |
| r"\[(?P<pid>.*?)\]\s+" |
| r"\[client\s+(?P<client_ip>[0-9.]+):\d+\]\s+" |
| r"(?P<message>.+)" |
| ), |
| 'syslog': re.compile( |
| r"(?P<date>\S+\s+\d+\s+\d+:\d+:\d+)\s+(<[0-9.]+>\s+)?" |
| r"(?P<host>\S+)\s+" |
| r"(?P<type>\S+):\s+" |
| r"(?P<message>.+)" |
| ), |
| 'fail2ban': re.compile( |
| r"(?P<date>\S+ \d+:\d+:[\d,]+)\s+" |
| r"(?P<type>fail2ban\.[^:]+):\s+" |
| r"(?P<message>.+)" |
| ), |
| 'rsync': re.compile( |
| r"(?P<date>\S+ \d+:\d+:[\d,]+)\s+" |
| r"\[(?P<pid>[\S.]+)\]\s+" |
| r"(?P<message>.+)" |
| ), |
| 'pylogs': re.compile( |
| r"(?P<date>\S+ \S+)\s+\[pylog\]\s+" |
| r"\[(?P<type>[\S.]+)\]:\s+" |
| r"(?P<message>.+)" |
| ), |
| 'qmail': re.compile( |
| r"(?P<mid>@[a-f0-9]+)\s+" |
| r"(?P<message>.+)" |
| ), |
| 'lastlog': re.compile( |
| r"(?P<user>[a-z0-9]+)\s+(?P<term>(pts/\d+|tty\d+|system))\s+" |
| r"(?P<stats>.+)" |
| ) |
| } |
| |
| |
| # The names must agree with the regexes above |
| tuples = { |
| 'apache_access': collections.namedtuple('apache_access', |
| ['client_ip', 'identity', 'user', 'time', 'request', |
| 'status', 'bytes', 'referer', 'user_agent', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'apache_error': collections.namedtuple('apache_error', [ |
| 'date', 'module', 'pid', 'client_ip', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'syslog': collections.namedtuple('syslog', [ |
| 'date', 'host', 'type', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'fail2ban': collections.namedtuple('fail2ban', [ |
| 'date', 'type', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'rsync': collections.namedtuple('rsync', [ |
| 'date', 'pid', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'pylogs': collections.namedtuple('pylogs', [ |
| 'date', 'type', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'qmail': collections.namedtuple('qmail', [ |
| 'mid', 'message', |
| 'filepath', 'logtype', 'timestamp'] |
| ), |
| 'lastlog': collections.namedtuple('lastlog', [ |
| 'user', 'term', 'stats', |
| 'filepath', 'logtype', 'timestamp'] |
| ) |
| } |
| |
| |
| def l2fp(txt): |
| key = base64.b64decode(txt.strip().split()[1].encode('ascii')) |
| fp_plain = hashlib.md5(key).hexdigest() |
| fp_md5 = ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2])) |
| fp_plain_sha = hashlib.sha256(key).digest() |
| fp_sha256 = base64.b64encode(fp_plain_sha).rstrip('=') |
| return fp_md5, fp_sha256 |
| |
| |
| class NodeThread(threading.Thread): |
| def assign(self, json_blob, logtype, xes): |
| self.json = json_blob |
| self.logtype = logtype |
| self.xes = xes |
| |
| def run(self): |
| global gotindex, config, json_pending |
| # print("Pushing %u json objects" % len(json_pending)) |
| iname = time.strftime("loggy-%Y.%m.%d") |
| if not iname in gotindex: |
| gotindex[iname] = True |
| if not self.xes.indices.exists(index=iname): |
| mappings = {} |
| for entry in config.options('RawFields'): |
| js = { |
| "_all": {"enabled": True}, |
| "properties": { |
| "@timestamp": {"store": True, "type": "date", "format": "yyyy/MM/dd HH:mm:ss"}, |
| "@node": {"store": True, "type": "string", "index": "not_analyzed"}, |
| "status": {"store": True, "type": "long"}, |
| "date": {"store": True, "type": "string", "index": "not_analyzed"}, |
| "geo_location": {"type": "geo_point", "geohash": True} |
| } |
| } |
| for field in config.get('RawFields', entry).split(","): |
| x = field.strip() |
| js['properties'][x] = {"store": True, "type": "string", "index": "not_analyzed", |
| "fields": {"keyword": {"type": "keyword"}}} |
| mappings[entry] = js |
| if not DEBUG: |
| res = self.xes.indices.create(index=iname, ignore=400, body={ |
| "settings": { |
| "index.mapping.ignore_malformed": True, |
| "number_of_shards": 2, |
| "number_of_replicas": 0 |
| }, |
| "mappings": mappings |
| }) |
| else: |
| print(mappings) |
| if not 'loggy-indices' in json_pending: |
| json_pending['loggy-indices'] = [] |
| last_push['loggy-indices'] = time.time() |
| json_pending['loggy-indices'].append({ |
| '@node': hostname, |
| 'index_created': iname, |
| 'logtype': 'loggy-indices', |
| '@timestamp': time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime()), |
| 'res': res, |
| 'mappings': mappings |
| }) |
| |
| js_arr = [] |
| for entry in self.json: |
| js = entry |
| # GeoHash conversion |
| if 'geo_lat' in js and 'geo_long' in js: |
| try: |
| js['geo_location'] = { |
| "lat": float(js['geo_lat']), |
| "lon": float(js['geo_long']) |
| } |
| except: |
| pass |
| js['@version'] = 2 |
| js['@timestamp'] = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime()) |
| js['host'] = hostname |
| js['@node'] = hostname |
| js['@fingerprint'] = FINGERPRINT |
| js['@fingerprint_sha'] = FINGERPRINT_SHA |
| # js['@rsa_key_mtime'] = RSA_KEY_MTIME |
| # Rogue string sometimes, we don't want that! |
| if 'bytes' in js: |
| try: |
| js['bytes'] = int(js['bytes']) |
| except: |
| js['bytes'] = 0 |
| if mytags: |
| js['@tags'] = mytags |
| if 'request' in js and not 'url' in js: |
| match = re.match(r"(GET|POST)\s+(.+)\s+HTTP/.+", js['request']) |
| if match: |
| js['url'] = match.group(2) |
| if 'bytes' in js and isinstance(js['bytes'], str) and js['bytes'].isdigit(): |
| js['bytes_int'] = int(js['bytes']) |
| |
| js_arr.append({ |
| '_op_type': 'index', |
| '_index': iname, |
| '_type': self.logtype, |
| 'doc': js, |
| '_source': js |
| }) |
| |
| if len(js_arr) > 0: |
| if DEBUG: |
| print(js_arr) |
| else: |
| elasticsearch.helpers.bulk(self.xes, js_arr) |
| # except Exception as err: |
| # print(err) |
| |
| |
| def connect_es(config): |
| esa = [] |
| for w in ['primary', 'backup']: |
| if w in config['elasticsearch'].keys(): |
| h = config['elasticsearch'][w]['host'] |
| p = config['elasticsearch'][w].get('port', 9200) |
| s = config['elasticsearch'][w].get('ssl', False) |
| u = config['elasticsearch'][w].get('prefix', '') |
| esa.append({ |
| 'host': h, |
| 'port': p, |
| 'use_ssl': s, |
| 'url_prefix': u |
| }) |
| print("Using http%s://%s:%u/%s as %s" % ("s" if s else "", h, p, u, w)) |
| |
| esx = elasticsearch.Elasticsearch( |
| esa, |
| max_retries=5, |
| retry_on_timeout=True |
| ) |
| return esx |
| |
| |
| def parse_line(path, data): |
| global json_pending, config |
| for line in (l.rstrip() for l in data.split("\n")): |
| m = re.match(r"^<%JSON:([^>%]+)%>\s*(.+)", line) |
| if m: |
| try: |
| # Try normally |
| try: |
| js = json.loads(m.group(2)) |
| # In case \x[..] has been used, try again! |
| except: |
| js = json.loads(re.sub(r"\\x..", "?", m.group(2))) |
| js['filepath'] = path |
| js['timestamp'] = time.time() |
| js['logtype'] = m.group(1) |
| if not js['logtype'] in json_pending: |
| json_pending[js['logtype']] = [] |
| last_push[js['logtype']] = time.time() |
| # print("got our first valid json as " + js['logtype'] + "!") |
| json_pending[js['logtype']].append(js) |
| except: |
| pass |
| else: |
| for r in regexes: |
| match = regexes[r].match(line) |
| if match: |
| js = tuples[r]( filepath=path, logtype=r, timestamp=time.time(), **match.groupdict()) |
| if js.logtype not in json_pending: |
| json_pending[js.logtype] = [] |
| last_push[js.logtype] = js.timestamp |
| json_pending[r].append(js._asdict()) |
| break |
| |
| |
| class LinuxHandler(watchdog.events.PatternMatchingEventHandler): |
| def process(self, event): |
| global filehandles, inodes, inodes_path |
| path = event.src_path |
| if (event.event_type == 'moved') and (path in filehandles): |
| # print("File moved, closing original handle") |
| try: |
| filehandles[path].close() |
| except Exception as err: |
| pass |
| # print(err) |
| del filehandles[path] |
| inode = inodes_path[path] |
| del inodes[inode] |
| |
| elif (event.event_type == 'modified' or event.event_type == 'created') and ( |
| path.find(".gz") == -1) and path not in filehandles: |
| try: |
| idata = os.stat(path) |
| inode = idata.st_ino |
| if inode not in inodes: |
| print("Opening: " + path) |
| filehandles[path] = open(path, "r") |
| print("Started watching %s (%u)" % (path, inode)) |
| filehandles[path].seek(0, 2) |
| inodes[inode] = path |
| inodes_path[path] = inode |
| print(path, filehandles[path]) |
| except Exception as err: |
| # print(err) |
| pass |
| elif event.event_type == 'modified' and path in filehandles: |
| # print(path + " was modified") |
| rd = 0 |
| data = "" |
| try: |
| while True: |
| line = filehandles[path].readline() |
| if not line: |
| break |
| else: |
| rd += len(line) |
| data += line |
| # print("Read %u bytes from %s" % (rd, path)) |
| parse_line(path, data) |
| except Exception as err: |
| try: |
| # print("Could not utilize " + path + ", closing.." + err) |
| filehandles[path].close() |
| except Exception as err: |
| # print(err) |
| pass |
| del filehandles[path] |
| inode = inodes_path[path] |
| del inodes[inode] |
| # File deleted? (close handle) |
| elif event.event_type == 'deleted': |
| if path in filehandles: |
| # print("Closed " + path) |
| try: |
| filehandles[path].close() |
| except Exception as err: |
| print(err) |
| del filehandles[path] |
| inode = inodes_path[path] |
| del inodes[inode] |
| # print("Stopped watching " + path) |
| |
| def on_modified(self, event): |
| self.process(event) |
| |
| def on_created(self, event): |
| self.process(event) |
| |
| def on_deleted(self, event): |
| self.process(event) |
| |
| def on_moved(self, event): |
| self.process(event) |
| |
| |
| if __name__ == "__main__": |
| config = yaml.safe_load(open("loggy.yaml").read()) |
| if os.path.exists('/etc/dd-agent/datadog.conf'): |
| dd_config.read('/etc/dd-agent/datadog.conf') |
| if dd_config.has_option('Main', 'tags'): |
| mytags = dd_config.get('Main', 'tags') |
| if hostname in tag_overrides: |
| mytags = tag_overrides[hostname] |
| |
| # print("Using %s as node name" % hostname) |
| try: |
| with open(RSA_KEY, 'r') as rsa: |
| FINGERPRINT, FINGERPRINT_SHA = l2fp(rsa.read()) |
| # print("Identifying as %s" % FINGERPRINT) |
| except: |
| pass |
| xes = connect_es(config) |
| |
| logging.basicConfig(level=logging.INFO, |
| format='%(asctime)s - %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S') |
| observer = watchdog.observers.Observer() |
| for path in config['analyzer']['paths']: |
| if os.path.isdir(path): |
| observer.schedule(LinuxHandler(), path, recursive=True) |
| observer.start() |
| try: |
| while True: |
| for x in json_pending: |
| if x not in last_push: |
| last_push[x] = time.time() |
| if len(json_pending[x]) > 0 and ((time.time() > (last_push[x] + 15)) or len(json_pending[x]) >= 500): |
| if x not in fp: |
| fp[x] = True |
| # print("First push for " + x + "!") |
| t = NodeThread() |
| t.assign(json_pending[x], x, xes) |
| t.start() |
| json_pending[x] = [] |
| last_push[x] = time.time() |
| time.sleep(1) |
| except KeyboardInterrupt: |
| observer.stop() |
| observer.join() |