blob: de0a26b89fac3dbc5a79adcf6b5b12982526df4e [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import re
import shlex
import typing
"""
This is the query de-fuzzer library for Foal.
It turns a URL search query into an ES query
"""
def defuzz(formdata: dict, nodate: bool = False, list_override: typing.Optional[str] = None) -> dict:
# Default to 30 day date range
daterange = {"gt": "now-30d", "lt": "now+1d"}
# Custom date range?
# If a month is the only thing, fake start and end
if "date" in formdata and "e" not in formdata:
formdata["s"] = formdata["date"]
formdata["e"] = formdata["date"]
# classic start and end month params
if "s" in formdata and "e" in formdata:
assert re.match(r"\d{4}-\d{1,2}$", formdata["s"]), "Keyword 's' must be of type YYYY-MM"
assert re.match(r"\d{4}-\d{1,2}$", formdata["e"]), "Keyword 'e' must be of type YYYY-MM"
syear, smonth = formdata["s"].split("-", 1)
eyear, emonth = formdata["e"].split("-", 1)
_estart, eend = calendar.monthrange(int(eyear), int(emonth))
daterange = {
"gt": "%04u/%02u/01 00:00:00" % (int(syear), int(smonth)),
"lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), eend),
}
# Advanced date formatting
elif "d" in formdata:
# The more/less than N days/weeks/months/years ago
m = re.match(r"^([a-z]+)=([0-9Mwyd]+)$", formdata["d"])
if m:
t = m.group(1)
r = m.group(2)
if t == "lte" and r:
daterange = {"gt": "now-%s" % r}
elif t == "gte" and r:
daterange = {"lt": "now-%s" % r}
# simple one month listing
m = re.match(r"^(\d\d\d\d-\d+)$", formdata["d"])
if m:
xdate = m.group(1)
dyear, dmonth = xdate.split("-", 1)
daterange = {
"gte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)),
"lte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)),
"format": "yyyy-MM-dd",
}
# dfr and dto defining a time span
m = re.match(r"^dfr=(\d\d\d\d-\d+-\d+)\|dto=(\d\d\d\d-\d+-\d+)$", formdata["d"])
if m:
dfr = m.group(1)
dto = m.group(2)
syear, smonth, sday = dfr.split("-", 1)
eyear, emonth, eday = dto.split("-", 1)
daterange = {
"gt": "%04u/%02u/%02u 00:00:00" % (int(syear), int(smonth), int(sday)),
"lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), int(eday)),
}
# List parameter(s)
fqdn = formdata.get("domain", "*") # If left out entirely, assume wildcard search
listname = formdata.get("list", "*") # If left out entirely, assume wildcard search
if list_override: # Certain requests use the full list ID as a single variable. Allow for that if so.
assert list_override.count("@") == 1, "list_override must contain exactly one @ character"
listname, fqdn = list_override.split("@", 1)
assert fqdn, "You must specify a domain part of the mailing list(s) to search, or * for wildcard search."
assert listname, "You must specify a list part of the mailing list(s) to search, or * for wildcard search."
assert '@' not in listname, "The list component of the List ID(s) cannot contain @, please use both list and domain keywords for searching."
list_raw = "<%s.%s>" % (listname, fqdn)
# Default is to look in a specific list
query_list_hash: typing.Dict = {"term": {"list_raw": list_raw}}
# *@fqdn match?
if listname == "*" and fqdn != "*":
query_list_hash = {"wildcard": {"list_raw": {"value": "*.%s>" % fqdn}}}
# listname@* match?
if listname != "*" and fqdn == "*":
query_list_hash = {"wildcard": {"list_raw": "<%s.*>" % listname}}
# *@* ??
if listname == "*" and fqdn == "*":
query_list_hash = {"wildcard": {"list_raw": "*"}}
must = [query_list_hash]
must_not = []
# Append date range if not excluded
if not nodate:
must.append({"range": {"date": daterange}})
# Query string search:
# - foo bar baz: find emails with these words
# - orange -apples: fond email with oranges but not apples
# - "this sentence": find emails with this exact string
if "q" in formdata:
qs = formdata["q"].replace(":", "")
bits = shlex.split(qs)
should = []
shouldnot = []
for bit in bits:
force_positive = False
# Translate -- into a positive '-', so you can find "-1" etc
if bit[0:1] == "--":
force_positive = True
bit = bit[1:]
# Negatives
if bit[0] == "-" and not force_positive:
# Quoted?
if bit.find(" ") != -1:
bit = '"' + bit + '"'
shouldnot.append(bit[1:])
# Positives
else:
# Quoted?
if bit.find(" ") != -1:
bit = '"' + bit + '"'
should.append(bit)
if len(should) > 0:
must.append(
{
"query_string": {
"fields": ["subject", "from", "body"],
"query": " AND ".join(should),
}
}
)
if len(shouldnot) > 0:
must_not.append(
{
"query_string": {
"fields": ["subject", "from", "body"],
"query": " AND ".join(shouldnot),
}
}
)
# Header parameters
for header in ["from", "subject", "body", "to"]:
hname = "header_%s" % header
if hname in formdata:
hvalue = formdata[hname] # .replace('"', "")
must.append({"match": {header: {"query": hvalue}}})
thebool = {"must": must}
if len(must_not) > 0:
thebool["must_not"] = must_not
return thebool