blob: db93be488c6faf8de230bd40d712c7d3a9143581 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workload Format Detection
Determines whether a workload is OpenSearch Benchmark format (requiring conversion)
or native Solr Orbit format (no conversion needed).
"""
import json
import logging
import os
logger = logging.getLogger(__name__)
# OpenSearch-specific operation types
OPENSEARCH_OPERATIONS = {
"create-index",
"delete-index",
"cluster-health",
"refresh",
"force-merge",
"index", # OpenSearch uses "index", Solr uses "bulk-index"
"search", # Could be either, check param-source
}
# Solr-specific operation types
SOLR_OPERATIONS = {
"create-collection",
"delete-collection",
"bulk-index",
"commit",
"optimize",
}
# OpenSearch-specific param sources
OPENSEARCH_PARAM_SOURCES = {
"opensearch-bulk-source",
"opensearch-search-source",
}
# Solr-specific param sources
SOLR_PARAM_SOURCES = {
"solr-bulk-source",
"solr-search-source",
}
def is_opensearch_workload(workload) -> bool:
"""
Detect if a workload is in OpenSearch Benchmark format.
Detection strategy (in order of priority):
1. Check for explicit "collections" key → Solr workload
2. Check for explicit "indices" key → OpenSearch workload
3. Check operation types in challenges
4. Check param-source values
5. Default to False (treat as Solr if unclear)
Args:
workload: Workload object or dict
Returns:
True if OpenSearch format (needs conversion), False if Solr format
"""
# Handle both Workload objects and dicts
if hasattr(workload, "indices"):
# Workload object - check for collections attribute
has_collections = hasattr(workload, "collections") and len(getattr(workload, "collections", [])) > 0
has_indices = len(workload.indices) > 0
if has_collections:
logger.debug("Detected Solr workload (has collections)")
return False
if has_indices:
logger.debug("Detected OpenSearch workload (has indices)")
return True
# For dicts, check keys directly
if isinstance(workload, dict):
if "collections" in workload:
logger.debug("Detected Solr workload (collections key)")
return False
if "indices" in workload:
logger.debug("Detected OpenSearch workload (indices key)")
return True
# Fallback: Check operation types and param sources
is_opensearch = _detect_from_operations(workload)
if is_opensearch:
logger.info("Detected OpenSearch workload format - conversion will be applied")
else:
logger.debug("Detected Solr workload format - no conversion needed")
return is_opensearch
def _detect_from_operations(workload) -> bool:
"""
Detect format by examining operations in challenges/test procedures.
Returns:
True if OpenSearch format, False if Solr format
"""
# Get test procedures (challenges)
test_procedures = []
if hasattr(workload, "test_procedures"):
test_procedures = workload.test_procedures
elif isinstance(workload, dict) and "challenges" in workload:
# Raw dict format
test_procedures = workload.get("challenges", [])
opensearch_score = 0
solr_score = 0
for test_proc in test_procedures:
# Get schedule
schedule = []
if hasattr(test_proc, "schedule"):
schedule = test_proc.schedule
elif isinstance(test_proc, dict):
schedule = test_proc.get("schedule", [])
for task in schedule:
# Get operation
operation = None
if hasattr(task, "operation"):
operation = task.operation
elif isinstance(task, dict):
operation = task.get("operation", {})
if not operation:
continue
# Get operation type
op_type = None
if hasattr(operation, "type"):
op_type = operation.type
elif isinstance(operation, dict):
op_type = operation.get("operation-type") or operation.get("type")
if op_type in OPENSEARCH_OPERATIONS:
opensearch_score += 2
if op_type in SOLR_OPERATIONS:
solr_score += 2
# Check param-source
param_source = None
if hasattr(operation, "param_source"):
param_source = operation.param_source
elif isinstance(operation, dict):
param_source = operation.get("param-source")
if param_source in OPENSEARCH_PARAM_SOURCES:
opensearch_score += 3
if param_source in SOLR_PARAM_SOURCES:
solr_score += 3
logger.debug(f"Detection scores - OpenSearch: {opensearch_score}, Solr: {solr_score}")
# If unclear, default to Solr (no conversion)
return opensearch_score > solr_score
def should_convert_workload(workload) -> bool:
"""
Convenience function - alias for is_opensearch_workload.
Args:
workload: Workload object or dict
Returns:
True if workload needs conversion (is OpenSearch format)
"""
return is_opensearch_workload(workload)
def is_opensearch_only_query(body) -> bool:
"""
Return True when the query body contains OpenSearch-only query features that
cannot be meaningfully translated to Solr — such as Painless/Groovy script
scoring, percolate, pinned, rank_feature, or intervals queries.
Queries identified this way should be SKIPPED during a Solr benchmark run
rather than silently degraded to ``q=*:*``.
Args:
body: The query body dict (non-dicts always return False)
Returns:
True if the body contains OS-only constructs that Solr cannot execute.
"""
if not isinstance(body, dict):
return False
query = body.get("query")
if not isinstance(query, dict):
return False
return _contains_os_only_node(query)
def _contains_os_only_node(node) -> bool:
"""Recursively check whether a query node contains OpenSearch-only features."""
if not isinstance(node, dict):
return False
# Script-based scoring (Painless, expression, Groovy, etc.)
if "function_score" in node:
for fn in node["function_score"].get("functions", []):
if "script_score" in fn:
return True
if "script_score" in node:
return True
# Other OpenSearch/Elasticsearch-only query types
for os_only_type in ("more_like_this", "percolate", "rank_feature", "pinned", "intervals"):
if os_only_type in node:
return True
# Recurse into bool clauses
if "bool" in node:
for clause_list in node["bool"].values():
if isinstance(clause_list, list):
for clause in clause_list:
if _contains_os_only_node(clause):
return True
elif isinstance(clause_list, dict):
if _contains_os_only_node(clause_list):
return True
return False
def is_opensearch_body(body) -> bool:
"""
Detect whether a query body dict is in OpenSearch DSL format.
Returns True when the body looks like an OpenSearch query, i.e. it contains
a "query" key whose value is a dict (not a plain Solr query string), OR when
it contains OpenSearch-specific aggregation keys ("aggs" / "aggregations").
Args:
body: The query body (any type; non-dicts always return False)
Returns:
True if the body is OpenSearch DSL format, False otherwise.
"""
if not isinstance(body, dict):
return False
query_val = body.get("query")
if isinstance(query_val, dict):
return True
if "aggs" in body or "aggregations" in body:
return True
return False
def has_opensearch_aggregations(body) -> bool:
"""
Return True when the body contains OpenSearch-style aggregation keys
("aggs" or "aggregations") that are not natively understood by Solr.
Args:
body: The query body dict (non-dicts always return False)
Returns:
True if the body contains OpenSearch aggregations.
"""
if not isinstance(body, dict):
return False
return "aggs" in body or "aggregations" in body
def is_opensearch_workload_path(workload_path: str) -> bool:
"""
Detect whether a workload directory contains an OpenSearch Benchmark workload.
Reads ``workload.json`` (or ``workload.jsonnet``) from the given directory as
raw JSON and checks for the presence of the ``"indices"`` key, which is the
definitive marker of an OSB workload. A ``"collections"`` key indicates a
Solr-native workload.
This function is the ONLY piece of conversion code that may be imported from
the benchmark run path (``test_run_orchestrator.py``). It does not import any
heavy conversion dependencies.
Args:
workload_path: Path to the workload directory.
Returns:
True if the workload is in OpenSearch Benchmark format (needs conversion).
False for Solr-native format, missing file, or any parse/IO error.
"""
for filename in ("workload.json", "workload.jsonnet"):
candidate = os.path.join(workload_path, filename)
if os.path.isfile(candidate):
try:
with open(candidate, encoding="utf-8") as fh:
data = json.load(fh)
if "indices" in data:
logger.debug("Detected OSB workload at '%s' (contains 'indices' key)", workload_path)
return True
if "collections" in data:
logger.debug("Detected Solr workload at '%s' (contains 'collections' key)", workload_path)
return False
except (OSError, json.JSONDecodeError) as exc:
logger.debug("Could not parse workload file '%s': %s — treating as Solr format", candidate, exc)
return False
return False