blob: 73538718442d5a1534744cd9d4a120178e64f733 [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import logging
import time
import zipfile
from pathlib import Path
import requests
from solrorbit.context import RequestContextHolder
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class SolrClientError(Exception):
"""Base exception for all SolrAdminClient errors."""
class CollectionAlreadyExistsError(SolrClientError):
"""Raised when create_collection() targets an existing collection."""
class CollectionNotFoundError(SolrClientError):
"""Raised when delete_collection() targets a non-existent collection."""
# ---------------------------------------------------------------------------
# SolrAdminClient
# ---------------------------------------------------------------------------
class SolrAdminClient:
"""
Thin wrapper around requests.Session for Solr V2 API admin operations.
Handles collection management, configset upload, version detection,
cluster status, and metrics retrieval. High-frequency data operations
(indexing, search, commit, optimize) use pysolr directly in runner.py.
Not thread-safe — each worker process creates its own instance.
"""
def __init__(self, host: str, port: int = 8983,
username: str = None, password: str = None,
tls: bool = False, timeout: int = 30):
scheme = "https" if tls else "http"
self.base_url = f"{scheme}://{host}:{port}"
self.api_url = f"{self.base_url}/api"
self.timeout = timeout
self._username = username
self._password = password
self._session = None # created lazily on first use
def _get_session(self) -> requests.Session:
"""Return the shared session, creating it on first call (lazy init)."""
if self._session is None:
self._session = requests.Session()
# Disable automatic proxy detection (trust_env=False) to avoid hanging
# on macOS after fork() — CFNetwork proxy detection is not fork-safe.
self._session.trust_env = False
if self._username and self._password:
self._session.auth = (self._username, self._password)
self._session.headers.update({"Accept": "application/json"})
return self._session
# ------------------------------------------------------------------
# Version detection
# ------------------------------------------------------------------
def info(self) -> dict:
"""Return parsed JSON from GET /api/node/system."""
resp = self._get("/api/node/system")
return resp.json()
def get_version(self) -> str:
"""
Detect Solr version via GET /api/node/system.
Returns the version string, e.g. "9.7.0".
"""
data = self.info()
try:
return data["lucene"]["solr-spec-version"]
except KeyError as exc:
raise SolrClientError(
f"Could not parse Solr version from /api/node/system response: {data}"
) from exc
def get_major_version(self) -> int:
"""Return the major version integer (9 or 10)."""
version = self.get_version()
return int(version.split(".")[0])
def wait_for_cluster_ready(self, timeout: int = 60, **kwargs) -> None:
"""
Poll GET /api/node/system until Solr responds or timeout is exceeded.
Args:
timeout: Maximum seconds to wait (default 60).
"""
deadline = time.monotonic() + timeout
last_exc = None
while time.monotonic() < deadline:
try:
self.info()
return
except Exception as exc:
last_exc = exc
time.sleep(2)
raise SolrClientError(
f"Solr cluster did not become ready within {timeout}s. Last error: {last_exc}"
)
# ------------------------------------------------------------------
# Configset management
# ------------------------------------------------------------------
def upload_configset(self, name: str, configset_dir: str) -> None:
"""
Zip the configset directory and upload it via the Solr V1 API.
Uses: POST /solr/admin/configs?action=UPLOAD&name={name}
The directory must contain a conf/ sub-directory with at minimum
schema.xml (or managed-schema) and solrconfig.xml.
Args:
name: Configset name to register on the cluster.
configset_dir: Local path to the directory containing conf/.
"""
zip_bytes = self._build_configset_zip(configset_dir)
# Use V1 API for configsets (V2 API not available in Solr 9.x)
url = f"{self.base_url}/solr/admin/configs"
params = {"action": "UPLOAD", "name": name}
resp = self._get_session().post(
url,
params=params,
data=zip_bytes,
headers={"Content-Type": "application/zip"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, f"upload configset '{name}'")
logger.info("Uploaded configset '%s' from '%s'", name, configset_dir)
def delete_configset(self, name: str) -> None:
"""
Delete a configset via the Solr V1 API.
Uses: POST /solr/admin/configs?action=DELETE&name={name}
"""
# Use V1 API for configsets (V2 API not available in Solr 9.x)
url = f"{self.base_url}/solr/admin/configs"
params = {"action": "DELETE", "name": name}
resp = self._get_session().post(
url,
params=params,
timeout=self.timeout,
)
self._raise_for_solr_error(resp, f"delete configset '{name}'")
logger.info("Deleted configset '%s'", name)
# ------------------------------------------------------------------
# Collection management
# ------------------------------------------------------------------
def create_collection(self, name: str, configset: str,
num_shards: int = 1, replication_factor: int = 1,
tlog_replicas: int = 0, pull_replicas: int = 0,
wait_for_active_shards: int = 1) -> None:
"""
Create a Solr collection via POST /api/collections.
The configset must already exist on the cluster (call upload_configset first).
``replication_factor`` maps to ``nrtReplicas`` in the Solr V2 API (they are semantically
identical: NRT replicas that participate in indexing and serving queries in real time).
"""
payload = {
"name": name,
"config": configset,
"numShards": num_shards,
"nrtReplicas": replication_factor,
"tlogReplicas": tlog_replicas,
"pullReplicas": pull_replicas,
"waitForFinalState": True,
}
resp = self._get_session().post(
f"{self.api_url}/collections",
json=payload,
timeout=self.timeout,
)
if resp.status_code == 400:
body = self._try_parse_json(resp)
if "already exists" in str(body).lower():
raise CollectionAlreadyExistsError(
f"Collection '{name}' already exists"
)
self._raise_for_solr_error(resp, f"create collection '{name}'")
logger.info("Created collection '%s' (shards=%d, nrt=%d, tlog=%d, pull=%d)",
name, num_shards, replication_factor, tlog_replicas, pull_replicas)
def delete_collection(self, name: str) -> None:
"""Delete a Solr collection via DELETE /api/collections/{name}."""
resp = self._get_session().delete(
f"{self.api_url}/collections/{name}",
timeout=self.timeout,
)
if resp.status_code == 404:
raise CollectionNotFoundError(f"Collection '{name}' not found")
# Solr 9.x may return 400 with "Could not find collection" instead of 404
if resp.status_code == 400:
body = self._try_parse_json(resp)
msg = body.get("error", {}).get("msg", "") if isinstance(body, dict) else ""
if "could not find collection" in msg.lower():
raise CollectionNotFoundError(f"Collection '{name}' not found")
self._raise_for_solr_error(resp, f"delete collection '{name}'")
logger.info("Deleted collection '%s'", name)
# ------------------------------------------------------------------
# Cluster status
# ------------------------------------------------------------------
def get_cluster_status(self) -> dict:
"""Return cluster state via GET /api/cluster."""
resp = self._get("/api/cluster")
return resp.json().get("cluster", resp.json())
def get_clusterstatus(self) -> dict:
"""Return full CLUSTERSTATUS response dict via V1 Collections API."""
resp = self._get_session().get(
f"{self.base_url}/solr/admin/collections",
params={"action": "CLUSTERSTATUS", "wt": "json"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, "CLUSTERSTATUS")
return resp.json()
def get_core_status(self, core_name: str) -> dict:
"""Return Core STATUS for a specific core (leader replica) via V1 Cores API."""
resp = self._get_session().get(
f"{self.base_url}/solr/admin/cores",
params={"action": "STATUS", "core": core_name, "wt": "json"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, f"Core STATUS for '{core_name}'")
return resp.json().get("status", {}).get(core_name, {})
def list_collections(self) -> list:
"""Return list of collection names via Collections API LIST action."""
resp = self._get_session().get(
f"{self.base_url}/solr/admin/collections",
params={"action": "LIST", "wt": "json"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, "LIST collections")
return resp.json().get("collections", [])
def get_luke_stats(self, collection: str) -> dict:
"""Return Luke index stats for a collection (numDocs, segmentCount, etc.)."""
resp = self._get_session().get(
f"{self.base_url}/solr/{collection}/admin/luke",
params={"numTerms": "0", "wt": "json"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, f"Luke stats for '{collection}'")
return resp.json().get("index", {})
def count_documents(self, collection: str) -> int:
"""Return the number of documents in a collection via rows=0 select query."""
resp = self._get_session().get(
f"{self.base_url}/solr/{collection}/select",
params={"q": "*:*", "rows": 0, "wt": "json"},
timeout=self.timeout,
)
self._raise_for_solr_error(resp, f"count documents in '{collection}'")
return resp.json()["response"]["numFound"]
def get_schema(self, collection: str) -> dict:
"""Return the schema of a collection via GET /solr/{collection}/schema."""
resp = self._get(f"/solr/{collection}/schema")
return resp.json().get("schema", resp.json())
# ------------------------------------------------------------------
# Metrics
# ------------------------------------------------------------------
def get_node_metrics(self):
"""
Retrieve node metrics via GET /solr/admin/metrics.
Both Solr 9.x and Solr 10.x use the same endpoint. The response format
differs by version and is detected via the Content-Type header:
- application/json → Solr 9.x custom JSON → returns parsed dict
- text/plain → Solr 10.x Prometheus text → returns raw str
The telemetry device is responsible for parsing the format-specific response.
"""
resp = self._get_session().get(f"{self.base_url}/solr/admin/metrics", timeout=self.timeout)
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
if "text/plain" in content_type:
return resp.text
return resp.json()
def is_cloud_mode(self) -> bool:
"""
Return True if this Solr node is running in SolrCloud (ZooKeeper) mode.
Calls the Collections CLUSTERSTATUS API, which only succeeds in cloud mode.
Returns False if Solr responds with a 400 indicating standalone/user-managed mode.
Raises SolrClientError on connection errors or unexpected failures.
"""
try:
resp = self._get_session().get(
f"{self.base_url}/solr/admin/collections",
params={"action": "CLUSTERSTATUS", "wt": "json"},
timeout=self.timeout,
)
except requests.exceptions.RequestException as exc:
raise SolrClientError(
f"Could not connect to Solr at {self.base_url}: {exc}"
) from exc
if resp.ok:
return True
# HTTP 400 with a message about SolrCloud/user-managed → definitely standalone mode
body = self._try_parse_json(resp)
msg = str(body.get("error", {}).get("msg", "")).lower()
if resp.status_code == 400 and any(
kw in msg for kw in ("solrcloud", "user-managed", "collections api")
):
return False
# Auth error, unexpected status, etc. — surface to caller
raise SolrClientError(
f"Unexpected response from CLUSTERSTATUS (HTTP {resp.status_code}): {resp.text[:300]}"
)
# ------------------------------------------------------------------
# Raw request (for the raw-request workload operation)
# ------------------------------------------------------------------
def raw_request(self, method: str, path: str,
body=None, headers: dict = None) -> requests.Response:
"""
Send an arbitrary HTTP request to a Solr endpoint.
Args:
method: HTTP method ("GET", "POST", "DELETE", etc.)
path: URL path relative to http://{host}:{port}/ (e.g. "/api/cluster")
body: Request body (dict → serialized as JSON, str → sent as-is)
headers: Additional request headers
"""
url = f"{self.base_url}{path}"
req_headers = dict(headers or {})
kwargs = {"timeout": self.timeout, "headers": req_headers}
if isinstance(body, dict):
kwargs["json"] = body
elif isinstance(body, str):
kwargs["data"] = body
resp = self._get_session().request(method.upper(), url, **kwargs)
return resp
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get(self, path: str) -> requests.Response:
resp = self._get_session().get(f"{self.base_url}{path}", timeout=self.timeout)
self._raise_for_solr_error(resp, f"GET {path}")
return resp
def _raise_for_solr_error(self, resp: requests.Response, operation: str) -> None:
if resp.ok:
return
body = self._try_parse_json(resp)
msg = body.get("error", {}).get("msg", resp.text) if isinstance(body, dict) else resp.text
raise SolrClientError(
f"Solr {operation} failed (HTTP {resp.status_code}): {msg}"
)
@staticmethod
def _try_parse_json(resp: requests.Response) -> dict:
try:
return resp.json()
except Exception:
return {}
@staticmethod
def _build_configset_zip(configset_dir: str) -> bytes:
"""
Walk configset_dir and produce an in-memory ZIP suitable for
PUT /api/cluster/configs/{name}.
"""
buf = io.BytesIO()
root = Path(configset_dir)
with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in sorted(root.rglob("*")):
if file_path.is_file():
arcname = file_path.relative_to(root)
zf.write(file_path, arcname)
return buf.getvalue()
# ---------------------------------------------------------------------------
# SolrClient — unified client used by runners and telemetry devices
# ---------------------------------------------------------------------------
class SolrClient(RequestContextHolder): # pylint: disable=too-many-public-methods
"""
Single unified Solr client. Wraps SolrAdminClient (admin/HTTP) and pysolr.Solr
(indexing/search) as private implementation details.
All runners and telemetry devices receive a SolrClient and call methods
on it directly — SolrAdminClient and pysolr.Solr are never referenced
externally.
"""
class _NoOpTransport:
async def close(self):
pass
def __init__(self, host="localhost", port=8983, username=None, password=None,
tls=False, timeout=30):
self._host = host
self._port = port
self._username = username
self._password = password
self._tls = tls
self._timeout = timeout
self._admin = SolrAdminClient(host=host, port=port, username=username,
password=password, tls=tls, timeout=timeout)
self._pysolr_clients = {} # collection → pysolr.Solr (created lazily)
self.transport = SolrClient._NoOpTransport()
# ------------------------------------------------------------------
# Admin / cluster operations (delegated to _admin)
# ------------------------------------------------------------------
def info(self) -> dict:
"""Return cluster info in an Solr Orbit-compatible shape: {name, version.number}."""
data = self._admin.info()
return {
"name": "Apache Solr",
"version": {"number": data["lucene"]["solr-spec-version"]},
}
def get_version(self):
return self._admin.get_version()
def get_major_version(self):
return self._admin.get_major_version()
def get_cluster_status(self):
return self._admin.get_cluster_status()
def get_node_metrics(self):
return self._admin.get_node_metrics()
def get_clusterstatus(self):
return self._admin.get_clusterstatus()
def get_core_status(self, core_name):
return self._admin.get_core_status(core_name)
def list_collections(self):
return self._admin.list_collections()
def get_luke_stats(self, collection):
return self._admin.get_luke_stats(collection)
def upload_configset(self, name, path):
return self._admin.upload_configset(name, path)
def delete_configset(self, name):
return self._admin.delete_configset(name)
def create_collection(self, name, *args, **kwargs):
return self._admin.create_collection(name, *args, **kwargs)
def delete_collection(self, name, **kwargs):
return self._admin.delete_collection(name, **kwargs)
def wait_for_cluster_ready(self, **kwargs):
return self._admin.wait_for_cluster_ready(**kwargs)
def is_cloud_mode(self) -> bool:
return self._admin.is_cloud_mode()
def raw_request(self, method, path, body=None, headers=None):
return self._admin.raw_request(method, path, body=body, headers=headers)
def count_documents(self, collection):
return self._admin.count_documents(collection)
def get_schema(self, collection):
return self._admin.get_schema(collection)
# Expose the admin client's internal _get for telemetry devices that
# need direct access to /api/node/system and similar endpoints.
def _get(self, path: str):
return self._admin._get(path)
# ------------------------------------------------------------------
# Data operations (delegated to pysolr.Solr, per collection)
# ------------------------------------------------------------------
def _get_pysolr(self, collection: str):
"""Return (lazily-created, cached) pysolr.Solr for the given collection."""
import pysolr # pylint: disable=import-outside-toplevel
if collection not in self._pysolr_clients:
scheme = "https" if self._tls else "http"
url = f"{scheme}://{self._host}:{self._port}/solr/{collection}"
session = requests.Session()
session.trust_env = False # fork-safe on macOS (no CFNetwork proxy detection)
if self._username and self._password:
session.auth = (self._username, self._password)
self._pysolr_clients[collection] = pysolr.Solr(
url, timeout=self._timeout, always_commit=False, session=session)
return self._pysolr_clients[collection]
def add(self, collection, docs, **kwargs):
return self._get_pysolr(collection).add(docs, **kwargs)
def search(self, collection, q, **kwargs):
return self._get_pysolr(collection).search(q, **kwargs)
def commit(self, collection, **kwargs):
return self._get_pysolr(collection).commit(**kwargs)
def optimize(self, collection, **kwargs):
return self._get_pysolr(collection).optimize(**kwargs)
class ClientFactory:
"""
Factory that creates SolrClient instances from cluster host configuration.
"""
def __init__(self, hosts, client_options):
self._hosts = hosts
self._client_options = dict(client_options)
self.logger = logging.getLogger(__name__)
def _parse_host(self):
entry = self._hosts[0] if self._hosts else {}
if isinstance(entry, dict):
return entry.get("host", "localhost"), int(entry.get("port", 8983))
parts = str(entry).rsplit(":", 1)
host = parts[0] if parts else "localhost"
port = int(parts[1]) if len(parts) == 2 and parts[1].isdigit() else 8983
return host, port
def create(self):
host, port = self._parse_host()
return SolrClient(
host=host,
port=port,
username=self._client_options.get("basic_auth_user"),
password=self._client_options.get("basic_auth_password"),
tls=self._client_options.get("use_ssl", False),
)
def create_async(self):
return self.create()