blob: 65dd0a5f7fe6ca49468c169b8211062aadcccb91 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import re
import shlex
import typing
"""
This is the query de-fuzzer library for Foal.
It turns a URL search query into an ES query
"""
def defuzz(formdata: dict, nodate: bool = False, list_override: typing.Optional[str] = None) -> dict:
# Default to 30 day date range
daterange = {"gt": "now-30d", "lt": "now+1d"}
# Custom date range?
# If a month is the only thing, fake start and end
if "date" in formdata and "e" not in formdata:
formdata["s"] = formdata["date"]
formdata["e"] = formdata["date"]
# classic start and end month params
if "s" in formdata and "e" in formdata:
if not re.match(r"\d{4}-\d{1,2}$", formdata["s"]):
raise ValueError("Keyword 's' must be of type YYYY-MM")
if not re.match(r"\d{4}-\d{1,2}$", formdata["e"]):
raise ValueError("Keyword 'e' must be of type YYYY-MM")
syear, smonth = formdata["s"].split("-", 1)
eyear, emonth = formdata["e"].split("-", 1)
_estart, eend = calendar.monthrange(int(eyear), int(emonth))
daterange = {
"gt": "%04u/%02u/01 00:00:00" % (int(syear), int(smonth)),
"lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), eend),
}
# days ago to start, and number of days to match
elif "dfrom" in formdata and "dto" in formdata:
dfrom = formdata["dfrom"]
dto = formdata["dto"]
if re.match(r"\d+$", dfrom) and re.match(r"\d+$", dto):
ef = int(dfrom)
et = int(dto)
if ef > 0 and et > 0:
if et > ef:
et = ef # avoid overruning into the future
daterange = {
"gte": "now-%dd" % ef,
"lte": "now-%dd" % (ef - et),
}
else:
raise ValueError("Keywords 'dfrom' and 'dto' must be numeric")
# Advanced date formatting
elif "d" in formdata:
# The more/less than N days/weeks/months/years ago
m = re.match(r"^(lte|gte)=([0-9]+[Mwyd])$", formdata["d"])
if m:
t = m.group(1)
r = m.group(2)
if t == "lte" and r:
daterange = {"gt": "now-%s" % r}
elif t == "gte" and r:
daterange = {"lt": "now-%s" % r}
else:
# simple one month listing
m = re.match(r"^(\d\d\d\d-\d+)$", formdata["d"])
if m:
xdate = m.group(1)
dyear, dmonth = xdate.split("-", 1)
daterange = {
"gte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)),
"lte": "%04u-%02u-01||/M" % (int(dyear), int(dmonth)),
"format": "yyyy-MM-dd",
}
else:
# dfr and dto defining a time span
m = re.match(r"^dfr=(\d\d\d\d-\d+-\d+)\|dto=(\d\d\d\d-\d+-\d+)$", formdata["d"])
if m:
dfr = m.group(1)
dto = m.group(2)
syear, smonth, sday = dfr.split("-", 2)
eyear, emonth, eday = dto.split("-", 2)
daterange = {
"gt": "%04u/%02u/%02u 00:00:00" % (int(syear), int(smonth), int(sday)),
"lt": "%04u/%02u/%02u 23:59:59" % (int(eyear), int(emonth), int(eday)),
}
# List parameter(s)
if list_override: # Certain requests use the full list ID as a single variable. Allow for that if so.
if not list_override.count("@") == 1:
raise ValueError("list_override must contain exactly one @ character")
listname, fqdn = list_override.split("@", 1)
else:
fqdn = formdata.get("domain", '') # Must be provided
listname = formdata.get("list", '') # Must be provided
if not fqdn:
raise ValueError("You must specify a domain part of the mailing list(s) to search, or * for wildcard search.")
if not listname:
raise ValueError("You must specify a list part of the mailing list(s) to search, or * for wildcard search.")
if "@" in listname:
raise ValueError("The list component of the List ID(s) cannot contain @, please use both list and domain keywords for searching.")
list_raw = "<%s.%s>" % (listname, fqdn)
# Default is to look in a specific list
query_list_hash: typing.Dict = {"term": {"list_raw": list_raw}}
# *@fqdn match?
if listname == "*" and fqdn != "*":
query_list_hash = {"wildcard": {"list_raw": {"value": "*.%s>" % fqdn}}}
# listname@* match?
if listname != "*" and fqdn == "*":
query_list_hash = {"wildcard": {"list_raw": "<%s.*>" % listname}}
# *@* ??
if listname == "*" and fqdn == "*":
query_list_hash = {"wildcard": {"list_raw": "*"}}
must = [query_list_hash]
must_not = []
# Append date range if not excluded
if not nodate:
must.append({"range": {"date": daterange}})
# Query string search:
# - foo bar baz: find emails with these words
# - orange -apples: fond email with oranges but not apples
# - "this sentence": find emails with this exact string
if "q" in formdata:
qs = formdata["q"].replace(":", "")
try:
bits = shlex.split(qs)
except ValueError: # Uneven number of quotes, default to split on whitespace instead
bits = qs.split()
query_should_match = []
query_should_not_match = []
for bit in bits:
force_positive = False
# Translate -- into a positive '-', so you can find "-1" etc
if bit[0:2] == "--":
force_positive = True
bit = bit[1:]
# Negatives
if bit[0] == "-" and not force_positive:
query_should_not_match.append(bit[1:])
# Positives
else:
query_should_match.append(bit)
if query_should_match:
query_should_match_expanded = []
for x in query_should_match:
query_should_match_expanded.append(
{
"bool": {
"should": [
{
"multi_match": {
"fields": ["from", "body", "subject"],
"query": x,
"type": "phrase",
},
},
]
}
}
)
xmust = {"bool": {"minimum_should_match": len(query_should_match), "should": query_should_match_expanded}}
must.append(xmust)
for x in query_should_not_match:
must_not.append(
{
"match": {
"subject": x,
}
}
)
must_not.append(
{
"match": {
"from": x,
}
}
)
must_not.append(
{
"match": {
"body": x,
}
}
)
# Header parameters
for header in ["from", "subject", "body", "to"]:
hname = "header_%s" % header
if hname in formdata:
hvalue = formdata[hname]
must.append({"match_phrase": {header: hvalue}})
query_as_bool = {"must": must}
if must_not:
query_as_bool["must_not"] = must_not
return query_as_bool