blob: 4ced85dbebd7fa56fcfd99aa502086803ff39b82 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
from urllib.parse import urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from pyhugegraph.utils.constants import Constants
from pyhugegraph.utils.huge_config import HGraphConfig
from pyhugegraph.utils.log import log
from pyhugegraph.utils.util import ResponseValidation
class HGraphSession:
def __init__(
self,
cfg: HGraphConfig,
retries: int = 3,
backoff_factor: int = 0.1,
status_forcelist=(500, 502, 504),
session: requests.Session | None = None,
):
"""
Initialize the HGraphSession object.
:param retries: The maximum number of retries.
:param backoff_factor: The backoff factor, used to calculate the interval between retries.
:param status_forcelist: A list of status codes that trigger a retry.
:param session: An optional requests.Session instance, for testing or advanced use cases.
"""
self._cfg = cfg
self._retries = retries
self._backoff_factor = backoff_factor
self._status_forcelist = status_forcelist
self._auth = (cfg.username, cfg.password)
self._headers = {"Content-Type": Constants.HEADER_CONTENT_TYPE}
self._timeout = cfg.timeout
self._session = session if session else requests.Session()
self.__configure_session()
def __configure_session(self):
"""
Configure the retry strategy and connection adapter for the session.
"""
retry_strategy = Retry(
total=self._retries,
read=self._retries,
connect=self._retries,
backoff_factor=self._backoff_factor,
status_forcelist=self._status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
self._session.keep_alive = False
log.debug(
"Session configured with retries=%s and backoff_factor=%s",
self._retries,
self._backoff_factor,
)
@property
def cfg(self):
"""
Get the configuration information of the current instance.
Args:
None.
Returns:
-------
HGraphConfig: The configuration information of the current instance.
"""
return self._cfg
def resolve(self, path: str):
"""
Constructs the full URL for the given pathinfo based on the session context and API version.
:param path: The pathinfo to be appended to the base URL.
:return: The fully resolved URL as a string.
When path is "/some/things":
- Since path starts with "/", it is considered an absolute path,
and urljoin will replace the path part of the base URL.
- Assuming the base URL is "http://127.0.0.1:8000/graphspaces/default/graphs/test_graph/"
- The result will be "http://127.0.0.1:8000/some/things"
When path is "some/things":
- Since path is a relative path, urljoin will append it to the path part of the base URL.
- Assuming the base URL is "http://127.0.0.1:8000/graphspaces/default/graphs/test_graph/"
- The result will be "http://127.0.0.1:8000/graphspaces/default/graphs/test_graph/some/things"
"""
url = f"{self._cfg.url}/"
if self._cfg.gs_supported:
url = urljoin(
url,
f"graphspaces/{self._cfg.graphspace}/graphs/{self._cfg.graph_name}/",
)
else:
url = urljoin(url, f"graphs/{self._cfg.graph_name}/")
return urljoin(url, path).strip("/")
def close(self):
"""
closes the session.
Args:
None
Returns:
None
"""
self._session.close()
def request(
self,
path: str,
method: str = "GET",
validator=None,
**kwargs: Any,
) -> dict:
if validator is None:
validator = ResponseValidation()
url = self.resolve(path)
response: requests.Response = getattr(self._session, method.lower())(
url,
auth=self._auth,
headers=self._headers,
timeout=self._timeout,
**kwargs,
)
log.debug( # pylint: disable=logging-fstring-interpolation
f"Request: {method} {url} validator={validator} kwargs={kwargs} {response}"
)
return validator(response, method=method, path=path)