blob: 5b4b276684cc07beb1e0022113338ad4aa024df6 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenSearch Query DSL to Solr Query Syntax Translation
This module handles translation of OpenSearch Query DSL (JSON-based query language)
to Solr's Lucene query syntax.
IMPORTANT: This module should ONLY be used when converting OpenSearch workloads.
Native Solr workloads should not go through this translation layer.
"""
import logging
from datetime import datetime
from .field import normalize_field_name
logger = logging.getLogger(__name__)
def translate_opensearch_query(body: dict) -> dict:
"""
Translate an OpenSearch query DSL dict to Solr query parameters.
Supported patterns:
- ``match_all`` → ``*:*``
- ``term`` → ``field:value``
- ``terms`` → ``field:(v1 v2 v3)`` or ``{!terms f=field}v1,v2,...``
- ``match`` / ``match_phrase`` → ``field:value``
- ``range`` → ``field:[lo TO hi]``
- ``exists`` → ``field:[* TO *]``
- ``bool`` (must/should/must_not) → recursive translation in ``q``
- ``bool.filter`` → Solr ``fq`` parameters (supports large term lists)
- ``ids`` → ``id:(id1 id2 ...)``
Falls back to ``*:*`` for unrecognised patterns (logs warning).
Args:
body: OpenSearch query body dict with "query" key
Returns:
Dict with keys:
- ``"q"``: Solr query string for the ``q`` parameter
- ``"fq"``: list of Solr filter query strings for the ``fq`` parameter
Examples:
>>> translate_opensearch_query({"query": {"match_all": {}}})
{'q': '*:*', 'fq': []}
>>> translate_opensearch_query({"query": {"term": {"country": "US"}}})
{'q': 'country:US', 'fq': []}
"""
if not body or not isinstance(body, dict):
return {"q": "*:*", "fq": []}
query = body.get("query", {})
fq_list = []
# Top-level terms queries can be very large (thousands of values).
# Route them directly to fq using {!terms f=...} for efficiency.
if "terms" in query and len(query) == 1:
fq_str = _translate_node_for_fq(query)
if fq_str:
fq_list.append(fq_str)
return {"q": "*:*", "fq": fq_list}
q = _translate_query_node(query, fq_list=fq_list)
return {"q": q, "fq": fq_list}
def extract_sort_parameter(body: dict) -> str:
"""
Extract a Solr sort string from an OpenSearch sort clause.
Args:
body: OpenSearch query body dict with optional "sort" key
Returns:
Solr sort parameter string (e.g., "name_raw desc, _score asc")
or None if no sort clause present
Examples:
>>> extract_sort_parameter({"sort": [{"name.raw": "desc"}]})
'name_raw desc'
"""
if not isinstance(body, dict) or "sort" not in body:
return None
sort_clauses = body["sort"]
if isinstance(sort_clauses, dict):
sort_clauses = [sort_clauses]
solr_sorts = []
for clause in sort_clauses:
if isinstance(clause, str):
# Normalize field name before adding to sort
field = normalize_field_name(clause.split()[0] if " " in clause else clause)
suffix = " " + clause.split()[1] if " " in clause else ""
solr_sorts.append(field + suffix)
elif isinstance(clause, dict):
for field, order_info in clause.items():
if field == "_score":
continue
# Normalize field name
field = normalize_field_name(field)
if isinstance(order_info, dict):
order = order_info.get("order", "asc")
elif isinstance(order_info, str):
order = order_info
else:
order = "asc"
solr_sorts.append(f"{field} {order}")
return ", ".join(solr_sorts) if solr_sorts else None
# ---------------------------------------------------------------------------
# Internal helper functions
# ---------------------------------------------------------------------------
def _translate_query_node(node: dict, fq_list: list = None) -> str:
"""Recursively translate a single OpenSearch query node to Solr syntax.
Args:
node: OpenSearch query node dict
fq_list: Optional list to collect Solr fq filter strings. When provided,
bool.filter clauses are appended here instead of inlined in q.
"""
if not node or not isinstance(node, dict):
return "*:*"
if "match_all" in node:
return "*:*"
if "match_none" in node:
return "-*:*"
if "term" in node:
for field, value in node["term"].items():
v = value.get("value", value) if isinstance(value, dict) else value
field = normalize_field_name(field)
return f"{field}:{_escape_solr_value(v)}"
if "terms" in node:
for field, values in node["terms"].items():
if field.startswith("_"):
continue
field = normalize_field_name(field)
return _translate_terms_clause(field, values)
if "match" in node or "match_phrase" in node:
sub = node.get("match") or node.get("match_phrase")
if isinstance(sub, dict):
for field, value in sub.items():
# Skip empty field names or metadata fields
if not field or field.startswith("_"):
continue
v = value.get("query", value) if isinstance(value, dict) else value
field = normalize_field_name(field)
# For phrase queries, wrap in quotes if not already
if "match_phrase" in node and not (isinstance(v, str) and v.startswith('"')):
return f'{field}:"{_escape_solr_phrase(v)}"'
return f"{field}:{_escape_solr_value(v)}"
# If sub is not a dict or has no valid fields, fall back
logger.warning(
"match/match_phrase query has invalid structure: %s. Using *:*",
sub
)
return "*:*"
if "range" in node:
for field, bounds in node["range"].items():
field = normalize_field_name(field)
lo = bounds.get("gte", bounds.get("gt", "*"))
hi = bounds.get("lte", bounds.get("lt", "*"))
# Convert dates if format is specified (common for date fields)
os_format = bounds.get("format")
lo = _convert_date_to_solr_format(lo, os_format)
hi = _convert_date_to_solr_format(hi, os_format)
return f"{field}:[{lo} TO {hi}]"
if "exists" in node:
field = node["exists"].get("field", "*")
field = normalize_field_name(field)
return f"{field}:[* TO *]"
if "ids" in node:
values = node["ids"].get("values", [])
if values:
escaped = " ".join(_escape_solr_value(v) for v in values)
return f"id:({escaped})"
return "*:*"
if "bool" in node:
bool_q = node["bool"]
parts = []
def _add_to_q(clauses, prefix):
if not clauses:
return
if isinstance(clauses, dict):
clauses = [clauses]
for clause in clauses:
sub = _translate_query_node(clause, fq_list=fq_list)
if sub and sub != "*:*":
parts.append(f"{prefix}({sub})")
def _add_to_fq(clauses):
"""Translate bool.filter clauses to Solr fq parameters."""
if not clauses:
return
if isinstance(clauses, dict):
clauses = [clauses]
for clause in clauses:
fq_str = _translate_node_for_fq(clause)
if fq_str:
fq_list.append(fq_str)
_add_to_q(bool_q.get("must"), "+")
_add_to_q(bool_q.get("must_not"), "-")
# bool.filter → Solr fq when fq_list is available; otherwise inline in q
if fq_list is not None:
_add_to_fq(bool_q.get("filter"))
else:
_add_to_q(bool_q.get("filter"), "+")
shoulds = bool_q.get("should", [])
if isinstance(shoulds, dict):
shoulds = [shoulds]
should_parts = [_translate_query_node(s, fq_list=fq_list) for s in shoulds]
should_parts = [s for s in should_parts if s and s != "*:*"]
if should_parts:
parts.append("(" + " ".join(should_parts) + ")")
return " ".join(parts) if parts else "*:*"
# Unknown / untranslatable query node
logger.warning(
"Cannot translate OpenSearch query type '%s' to Solr syntax. "
"Falling back to q=*:* (results may not match workload intent). "
"Consider rewriting this operation as a native Solr workload task.",
list(node.keys()),
)
return "*:*"
def _translate_terms_clause(field: str, values: list) -> str:
"""
Translate a terms list to the most efficient Solr syntax.
- Small lists (≤100 terms): ``field:(v1 v2 ...)`` — standard Lucene OR clause
- Large lists (>100 terms): ``field:(v1 v2 ...)`` for q context, still works
but when used as an fq, callers should prefer ``{!terms f=field}v1,v2,...``
"""
escaped = " ".join(
f'"{_escape_solr_phrase(v)}"' if " " in str(v) else _escape_solr_value(v)
for v in values
)
return f"{field}:({escaped})"
def _translate_node_for_fq(node: dict) -> str:
"""
Translate a single query node to a Solr fq string.
For terms clauses, uses the efficient ``{!terms f=field}v1,v2,...`` syntax
which Solr handles as a cached bitset — ideal for large term lists in filters.
For other clause types, delegates to _translate_query_node() without fq_list
(filter sub-clauses are flattened into a single fq string).
"""
if not node or not isinstance(node, dict):
return None
# terms → {!terms f=field}v1,v2,... (Solr's efficient bitset filter)
if "terms" in node:
for field, values in node["terms"].items():
if field.startswith("_"):
continue
field = normalize_field_name(field)
if not values:
return None
# Join values as comma-separated (Solr {!terms} syntax)
joined = ",".join(str(v) for v in values)
return f"{{!terms f={field}}}{joined}"
# range, term, exists, match etc. — translate normally
return _translate_query_node(node, fq_list=None)
def _escape_solr_value(value) -> str:
"""Escape special Lucene/Solr query characters in a field value."""
special = r'+-&&||!(){}[]^"~*?:\/'
result = []
for char in str(value):
if char in special:
result.append('\\' + char)
else:
result.append(char)
return ''.join(result)
def _escape_solr_phrase(value) -> str:
"""
Escape a phrase value for Solr phrase queries.
For phrases, we only need to escape quotes (and backslashes).
Other special characters are OK within quotes.
"""
return str(value).replace('\\', '\\\\').replace('"', '\\"')
def translate_to_solr_json_dsl(body: dict) -> dict:
"""
Translate an OpenSearch query body to Solr JSON Query DSL format.
Output is a valid Solr JSON Query DSL body for POSTing to the
``/solr/{collection}/query`` endpoint (Mode 2 of SolrSearch runner).
The ``"query"`` value is always a string, making it Mode 2 compatible.
Args:
body: OpenSearch query body dict (may contain "query", "aggs", "size", "sort")
Returns:
Solr JSON Query DSL dict with keys:
- ``query``: Lucene query string
- ``filter``: list of filter query strings (omitted if empty)
- ``limit``: number of results (omitted if not specified)
- ``sort``: sort string (omitted if not specified)
- ``facet``: Solr JSON Facet API dict (omitted if no aggregations)
"""
if not body or not isinstance(body, dict):
return {"query": "*:*"}
fq_list = []
query_val = body.get("query")
if isinstance(query_val, dict):
translated = translate_opensearch_query(body)
q = translated["q"]
fq_list = translated["fq"]
else:
q = "*:*"
result = {"query": q}
if fq_list:
result["filter"] = fq_list
if "size" in body:
result["limit"] = body["size"]
sort_str = extract_sort_parameter(body)
if sort_str:
result["sort"] = sort_str
aggs = body.get("aggs") or body.get("aggregations")
if aggs and isinstance(aggs, dict):
facets = _convert_aggregations_to_facets(aggs)
if facets:
result["facet"] = facets
return result
def _convert_aggregations_to_facets(aggs: dict) -> dict:
"""
Convert OpenSearch aggregations to Solr JSON Facet API format.
Supported aggregation types:
- ``terms`` → ``{"type":"terms","field":...,"limit":n}``
- ``date_histogram`` → ``{"type":"range","field":...,"gap":"..."}``
- ``histogram`` → ``{"type":"range","field":...,"gap":n}``
- ``avg`` → ``"avg(field)"`` function expression
- ``sum`` → ``"sum(field)"`` function expression
- ``min`` → ``"min(field)"`` function expression
- ``max`` → ``"max(field)"`` function expression
- ``value_count`` → ``"countvals(field)"`` function expression
Nested aggregations within bucket aggs are recursively converted.
Unsupported aggregation types are skipped with a WARN log.
Args:
aggs: OpenSearch aggregations dict (the value of "aggs" or "aggregations")
Returns:
Solr JSON Facet API dict suitable for the ``"facet"`` key in JSON Query DSL
"""
if not aggs or not isinstance(aggs, dict):
return {}
result = {}
for agg_name, agg_def in aggs.items():
if not isinstance(agg_def, dict):
continue
entry = _convert_single_agg(agg_name, agg_def)
if entry is not None:
result[agg_name] = entry
return result
def _convert_single_agg(agg_name: str, agg_def: dict):
"""Convert a single named OpenSearch aggregation to a Solr facet entry."""
# --- bucket aggregations ---
if "terms" in agg_def:
terms_conf = agg_def["terms"]
field = normalize_field_name(terms_conf.get("field", ""))
if not field:
logger.warning("terms agg '%s' has no field — skipping", agg_name)
return None
facet_def = {
"type": "terms",
"field": field,
"limit": terms_conf.get("size", 10),
}
nested = agg_def.get("aggs") or agg_def.get("aggregations")
if nested:
sub = _convert_aggregations_to_facets(nested)
if sub:
facet_def["facet"] = sub
return facet_def
if "date_histogram" in agg_def:
dh_conf = agg_def["date_histogram"]
field = normalize_field_name(dh_conf.get("field", ""))
if not field:
logger.warning("date_histogram agg '%s' has no field — skipping", agg_name)
return None
interval = (
dh_conf.get("calendar_interval")
or dh_conf.get("fixed_interval")
or dh_conf.get("interval", "month")
)
gap = _calendar_interval_to_solr_gap(interval)
facet_def = {
"type": "range",
"field": field,
"gap": gap,
"mincount": 1,
"start": "NOW/YEAR-10YEAR",
"end": "NOW/YEAR+1YEAR",
}
nested = agg_def.get("aggs") or agg_def.get("aggregations")
if nested:
sub = _convert_aggregations_to_facets(nested)
if sub:
facet_def["facet"] = sub
return facet_def
if "histogram" in agg_def:
h_conf = agg_def["histogram"]
field = normalize_field_name(h_conf.get("field", ""))
if not field:
logger.warning("histogram agg '%s' has no field — skipping", agg_name)
return None
return {
"type": "range",
"field": field,
"gap": h_conf.get("interval", 1),
"mincount": 1,
"start": 0,
"end": 1000000,
}
# --- metric aggregations (function expressions) ---
for metric_type in ("avg", "sum", "min", "max"):
if metric_type in agg_def:
field = normalize_field_name(agg_def[metric_type].get("field", ""))
if not field:
logger.warning("%s agg '%s' has no field — skipping", metric_type, agg_name)
return None
return f"{metric_type}({field})"
if "value_count" in agg_def:
field = normalize_field_name(agg_def["value_count"].get("field", ""))
if not field:
logger.warning("value_count agg '%s' has no field — skipping", agg_name)
return None
return f"countvals({field})"
agg_type = next(iter(agg_def), "unknown")
logger.warning(
"Unsupported aggregation type '%s' (name='%s') — skipping in Solr conversion.",
agg_type, agg_name,
)
return None
def _calendar_interval_to_solr_gap(interval: str) -> str:
"""Convert an OpenSearch calendar_interval or fixed_interval to a Solr range gap string."""
mapping = {
"minute": "+1MINUTE",
"1m": "+1MINUTE",
"hour": "+1HOUR",
"1h": "+1HOUR",
"day": "+1DAY",
"1d": "+1DAY",
"week": "+7DAYS",
"1w": "+7DAYS",
"month": "+1MONTH",
"1m_month": "+1MONTH", # avoid conflict with 1m (minute)
"quarter": "+3MONTHS",
"1q": "+3MONTHS",
"year": "+1YEAR",
"1y": "+1YEAR",
}
return mapping.get(str(interval).lower(), "+1MONTH")
def _convert_date_to_solr_format(date_str, os_format=None) -> str:
"""
Convert an OpenSearch date string to Solr ISO 8601 format.
Args:
date_str: Date string in various OpenSearch formats
os_format: Optional OpenSearch date format pattern (e.g., "dd/MM/yyyy")
Returns:
ISO 8601 date string for Solr (e.g., "2015-01-01T00:00:00Z")
If the date is already in ISO format or conversion fails, returns the
original string unchanged.
"""
if not isinstance(date_str, str) or date_str in ("*", "now"):
return date_str
# Map OpenSearch date format patterns to Python strptime format
OS_TO_PYTHON_FORMAT = {
"dd/MM/yyyy": "%d/%m/%Y",
"MM/dd/yyyy": "%m/%d/%Y",
"yyyy-MM-dd": "%Y-%m-%d",
"yyyy/MM/dd": "%Y/%m/%d",
"dd-MM-yyyy": "%d-%m-%Y",
"MM-dd-yyyy": "%m-%d-%Y",
# Add more as needed
}
# If format is provided, use it to parse the date
if os_format:
python_fmt = OS_TO_PYTHON_FORMAT.get(os_format)
if python_fmt:
try:
dt = datetime.strptime(date_str, python_fmt)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
except ValueError:
logger.warning(f"Failed to parse date '{date_str}' with format '{os_format}'")
return date_str
else:
logger.warning(f"Unknown OpenSearch date format: '{os_format}'")
# Try common patterns if no format specified
for python_fmt in OS_TO_PYTHON_FORMAT.values():
try:
dt = datetime.strptime(date_str, python_fmt)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
except ValueError:
continue
# If it's already in ISO-like format, return as-is
# (handles cases like "2015-01-01T00:00:00Z" or partial ISO)
if "T" in date_str or len(date_str) == 10: # YYYY-MM-DD
return date_str
logger.warning(f"Could not parse date '{date_str}', using as-is")
return date_str