| #!/usr/bin/env python3 |
| |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # Loggy (Jr) - A log file grobbler for Python 3 |
| |
| import time |
| import logging |
| import watchdog.observers |
| import watchdog.events |
| import os |
| import json |
| import re |
| import base64 |
| import hashlib |
| import elasticsearch |
| import elasticsearch.helpers |
| import threading |
| import socket |
| import yaml |
| import logtypes |
| import typing |
| |
| # Disable most ES logging, or it'll litter syslog |
| tracer = logging.getLogger("elasticsearch") |
| tracer.setLevel(logging.CRITICAL) |
| tracer.addHandler(logging.FileHandler("loggy.log")) |
| |
| DEBUG = False |
| YAML_FILE = "loggy.yaml" |
| INDEX_PATTERN = "loggy-%Y-%m-%d" # Name pattern for when creating new indices |
| MAX_PENDING_DOCS = 250 # If more than 250 pending log entries, push to ES |
| MAX_IDLE_TIME = 15 # If we waited more than 15 seconds to push entries, push even if < 250 docs. |
| RSA_KEY = "/etc/ssh/ssh_host_rsa_key.pub" # RSA public key for SSH. if it exists. |
| FINGERPRINT = "" |
| FINGERPRINT_SHA = "" |
| SUPPORTED_PROVIDERS = ("elasticsearch", "opensearch", ) # Supported database providers to look for in the config |
| |
| |
| def l2fp(txt: str): |
| """public key to md5/sha256 fingerprint""" |
| key = base64.b64decode(txt.strip().split()[1].encode("ascii")) |
| fp_plain = hashlib.md5(key).hexdigest() |
| fp_md5 = ":".join(a + b for a, b in zip(fp_plain[::2], fp_plain[1::2])) |
| fp_plain_sha = hashlib.sha256(key).digest() |
| fp_sha256 = base64.b64encode(fp_plain_sha).decode("ascii").rstrip("=") |
| return fp_md5, fp_sha256 |
| |
| |
| def who_am_i(): |
| """Returns the FQDN of the box the program runs on""" |
| try: |
| # Get local hostname (what you see in the terminal) |
| local_hostname = socket.gethostname() |
| # Get all address info segments for the local host |
| canonical_names = [ |
| address[3] |
| for address in socket.getaddrinfo(local_hostname, None, 0, socket.SOCK_DGRAM, 0, socket.AI_CANONNAME) |
| if address[3] |
| ] |
| # For each canonical name, see if we find $local_hostname.something.tld, and if so, return that. |
| if canonical_names: |
| prefix = f"{local_hostname}." |
| for name in canonical_names: |
| if name.startswith(prefix): |
| return name |
| # No match, just return the first occurrence. |
| return canonical_names[0] |
| except socket.error: |
| pass |
| # Fall back to socket.getfqdn |
| return socket.getfqdn() |
| |
| |
| class NodeThread(threading.Thread): |
| """Offloading thread for pushing entries to ES""" |
| def __init__(self, log_object: "ElasticLogger"): |
| super().__init__() |
| self.json = log_object.pending |
| self.parent = log_object.parent |
| |
| def run(self): |
| iname = time.strftime(INDEX_PATTERN) |
| if iname not in self.parent.indices: |
| self.parent.indices.append(iname) |
| if not self.parent.elastic.indices.exists(index=iname): |
| mappings = {} |
| for name, entry in self.parent.config.get("rawfields").items(): |
| map_js: typing.Dict[str, typing.Dict[typing.Union[str, bool]]] = { |
| "_all": {"enabled": True}, |
| "properties": { |
| "@timestamp": {"store": True, "type": "date", "format": "yyyy/MM/dd HH:mm:ss"}, |
| "@node": {"store": True, "type": "keyword"}, |
| "status": {"store": True, "type": "long"}, |
| "date": {"store": True, "type": "keyword"}, |
| "geo_location": {"type": "geo_point", "geohash": True}, |
| }, |
| } |
| for field in entry.split(","): |
| x = field.strip() |
| map_js["properties"][x] = {"store": True, "type": "keyword"} |
| mappings[entry] = map_js |
| if not DEBUG: |
| self.parent.elastic.indices.create( |
| index=iname, |
| ignore=400, |
| body={ |
| "settings": { |
| "index.mapping.ignore_malformed": True, |
| "number_of_shards": 2, |
| "number_of_replicas": 0, |
| }, |
| "mappings": mappings, |
| }, |
| ) |
| else: |
| print(mappings) |
| |
| js_arr = [] |
| for js in self.json: |
| # GeoHash conversion |
| if "geo_lat" in js and "geo_long" in js: |
| try: |
| js["geo_location"] = {"lat": float(js["geo_lat"]), "lon": float(js["geo_long"])} |
| except ValueError: |
| pass |
| js["@version"] = 3 |
| js["@timestamp"] = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime()) |
| js["host"] = self.parent.logger.nodename |
| js["@node"] = self.parent.logger.nodename |
| if FINGERPRINT: |
| js["@fingerprint"] = FINGERPRINT |
| js["@fingerprint_sha"] = FINGERPRINT_SHA |
| # Rogue string sometimes, we don't want that! |
| if "bytes" in js: |
| try: |
| js["bytes"] = int(js["bytes"]) |
| except ValueError: |
| js["bytes"] = 0 |
| if "request" in js and "url" not in js: |
| match = re.match(r"(GET|POST)\s+(.+)\s+HTTP/.+", js["request"]) |
| if match: |
| js["url"] = match.group(2) |
| if "bytes" in js and isinstance(js["bytes"], str) and js["bytes"].isdigit(): |
| js["bytes_int"] = int(js["bytes"]) |
| |
| js_arr.append({"_op_type": "index", "_index": iname, "doc": js, "_source": js}) |
| |
| if len(js_arr) > 0: |
| if DEBUG: |
| print(js_arr) |
| else: |
| try: |
| elasticsearch.helpers.bulk(self.parent.elastic, js_arr) |
| except elasticsearch.helpers.BulkIndexError as e: |
| print(e) |
| |
| |
| class LinuxHandler(watchdog.events.PatternMatchingEventHandler): |
| """Generic watchdog class, to be consumed and tweaked by Logger below""" |
| |
| def __init__(self, parent_logger: "Logger"): |
| super().__init__() |
| self.logger = parent_logger |
| |
| def process(self, event): |
| self.logger.process(event) |
| |
| def on_modified(self, event): |
| self.process(event) |
| |
| def on_created(self, event): |
| self.process(event) |
| |
| def on_deleted(self, event): |
| self.process(event) |
| |
| def on_moved(self, event): |
| self.process(event) |
| |
| |
| class Logger: |
| """Parent logger class for monitoring and reading log changes""" |
| |
| def __init__(self, config: dict): |
| self.config = config |
| self.nodename = who_am_i() |
| self.observer = watchdog.observers.Observer() |
| self.processor = LinuxHandler(self) |
| self.file_handles: typing.Dict[str, typing.TextIO] = {} |
| self.inodes: typing.Dict[int, str] = {} |
| self.inodes_path: typing.Dict[str, int] = {} |
| self.logs = ElasticParent(self) |
| |
| def monitor(self, paths): |
| for path in paths: |
| if os.path.isdir(path): |
| self.observer.schedule(self.processor, path, recursive=True) |
| self.observer.start() |
| |
| def parse_line(self, path, data): |
| for line in (line.rstrip() for line in data.split("\n")): |
| m = re.match(r"^<%JSON:([^>%]+)%>\s*(.+)", line) |
| if m: |
| try: |
| # Try normally |
| try: |
| js = json.loads(m.group(2)) |
| # In case \x[..] has been used, try again! |
| except json.JSONDecodeError: |
| js = json.loads(re.sub(r"\\x..", "?", m.group(2))) |
| js["filepath"] = path |
| js["timestamp"] = time.time() |
| js["logtype"] = m.group(1) |
| self.logs.append(js["logtype"], js) |
| except json.JSONDecodeError: |
| pass |
| else: |
| for r in logtypes.regexes: |
| match = logtypes.regexes[r].match(line) |
| if match: |
| js = logtypes.tuples[r](filepath=path, logtype=r, timestamp=time.time(), **match.groupdict()) |
| self.logs.append(js.logtype, js._asdict()) |
| break |
| |
| def process(self, event): |
| path = event.src_path |
| if (event.event_type == "moved") and (path in self.file_handles): |
| try: |
| self.file_handles[path].close() |
| except IOError: |
| pass |
| del self.file_handles[path] |
| inode = self.inodes_path[path] |
| del self.inodes[inode] |
| |
| elif ( |
| (event.event_type == "modified" or event.event_type == "created") |
| and (path.find(".gz") == -1) |
| and path not in self.file_handles |
| ): |
| try: |
| idata = os.stat(path) |
| inode = idata.st_ino |
| if inode not in self.inodes: |
| # print("Opening: " + path) |
| self.file_handles[path] = open(path, "r") |
| # print("Started watching %s (%u)" % (path, inode)) |
| self.file_handles[path].seek(0, 2) |
| self.inodes[inode] = path |
| self.inodes_path[path] = inode |
| except IOError: |
| pass |
| elif event.event_type == "modified" and path in self.file_handles: |
| rd = 0 |
| data = "" |
| try: |
| while True: |
| line = self.file_handles[path].readline() |
| if not line: |
| break |
| else: |
| rd += len(line) |
| data += line |
| self.parse_line(path, data) |
| except (IOError, UnicodeDecodeError): |
| try: |
| self.file_handles[path].close() |
| except IOError: |
| pass |
| del self.file_handles[path] |
| inode = self.inodes_path[path] |
| del self.inodes[inode] |
| # File deleted? (close handle) |
| elif event.event_type == "deleted": |
| if path in self.file_handles: |
| try: |
| self.file_handles[path].close() |
| except IOError: |
| pass |
| del self.file_handles[path] |
| inode = self.inodes_path[path] |
| del self.inodes[inode] |
| |
| |
| class ElasticParent: |
| def __init__(self, parent): |
| self.config = parent.config |
| self.logger = parent |
| self.loggers = {} |
| self.hosts = [] |
| for provider in SUPPORTED_PROVIDERS: |
| if provider in parent.config: |
| self.hosts.extend(parent.config[provider].get("hosts", [])) |
| self.elastic = elasticsearch.Elasticsearch( |
| hosts=self.hosts, max_retries=5, retry_on_timeout=True |
| ) |
| self.indices = [] |
| |
| def append(self, log_type, data): |
| if log_type not in self.loggers: |
| self.loggers[log_type] = ElasticLogger(self) |
| self.loggers[log_type].append(data) |
| |
| def types(self): |
| for k, v in self.loggers.items(): |
| yield k, v |
| |
| |
| class ElasticLogger: |
| def __init__(self, parent: ElasticParent): |
| self.parent = parent |
| self.last_push = time.time() |
| self.sequence_id = 0 |
| self.sequence_time = 0 |
| self.pending: typing.List[dict] = [] |
| |
| def append(self, data: dict): |
| now = int(time.time()) |
| # Sequence ID is so we can order atomically in ES. |
| data["sequence_id"] = (now * 10000) + self.sequence_id |
| # Reset sequence every second if need be |
| if self.sequence_time != now: |
| self.sequence_time = now |
| self.sequence_id = 0 |
| self.sequence_id += 1 |
| self.pending.append(data) |
| |
| def push_if_needed(self): |
| if self.pending: |
| now = time.time() |
| if now - MAX_IDLE_TIME >= self.last_push or len(self.pending) > MAX_PENDING_DOCS: |
| nt = NodeThread(self) |
| nt.start() |
| self.pending = [] |
| self.last_push = now |
| |
| |
| if __name__ == "__main__": |
| yml = yaml.safe_load(open(YAML_FILE)) |
| logger = Logger(yml) |
| |
| print("Using %s as node name" % logger.nodename) |
| if os.path.exists(RSA_KEY): |
| with open(RSA_KEY, "r") as rsa: |
| FINGERPRINT, FINGERPRINT_SHA = l2fp(rsa.read()) |
| print("Identifying as %s" % FINGERPRINT) |
| |
| logger.monitor(yml["paths"]) |
| |
| try: |
| while True: |
| for t, logs in logger.logs.types(): |
| logs.push_if_needed() |
| time.sleep(1) |
| except KeyboardInterrupt: |
| logger.observer.stop() |
| logger.observer.join() |