| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import contextvars |
| import time |
| |
| |
| class RequestContextManager: |
| """ |
| Ensures that request context span the defined scope and allow nesting of request contexts with proper propagation. |
| This means that we can span a top-level request context, open sub-request contexts that can be used to measure |
| individual timings and still measure the proper total time on the top-level request context. |
| """ |
| def __init__(self, request_context_holder): |
| self.ctx_holder = request_context_holder |
| self.ctx = None |
| self.token = None |
| |
| async def __aenter__(self): |
| self.ctx, self.token = self.ctx_holder.init_request_context() |
| return self |
| |
| @property |
| def request_start(self): |
| return self.ctx.get("request_start") |
| |
| @property |
| def request_end(self): |
| end_list = self.ctx.get("request_end_list", []) |
| client_end = self.ctx.get("client_request_end") |
| if not end_list or client_end is None: |
| return None |
| valid = [v for v in end_list if v < client_end] |
| return max(valid) if valid else None |
| |
| @property |
| def client_request_start(self): |
| return self.ctx.get("client_request_start") |
| |
| @property |
| def client_request_end(self): |
| return self.ctx.get("client_request_end") |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| # propagate earliest request start and most recent request end to parent |
| client_request_start = self.client_request_start |
| client_request_end = self.client_request_end |
| request_start = self.request_start |
| request_end = self.request_end |
| self.ctx_holder.restore_context(self.token) |
| # don't attempt to restore these values on the top-level context as they don't exist |
| if self.token.old_value != contextvars.Token.MISSING: |
| self.ctx_holder.update_request_start(request_start) |
| self.ctx_holder.update_request_end(request_end) |
| self.ctx_holder.update_client_request_start(client_request_start) |
| self.ctx_holder.update_client_request_end(client_request_end) |
| self.token = None |
| return False |
| |
| |
| class RequestContextHolder: |
| """ |
| Holds request context variables. This class is only meant to be used together with RequestContextManager. |
| """ |
| request_context = contextvars.ContextVar("benchmark_request_context") |
| |
| def new_request_context(self): |
| return RequestContextManager(self) |
| |
| @classmethod |
| def init_request_context(cls): |
| ctx = {} |
| token = cls.request_context.set(ctx) |
| return ctx, token |
| |
| @classmethod |
| def restore_context(cls, token): |
| cls.request_context.reset(token) |
| |
| @classmethod |
| def update_request_start(cls, new_request_start): |
| meta = cls.request_context.get() |
| # this can happen if multiple requests are sent on the wire for one logical request (e.g. scrolls) |
| if "request_start" not in meta and "client_request_start" in meta: |
| meta["request_start"] = new_request_start |
| |
| @classmethod |
| def update_request_end(cls, new_request_end): |
| meta = cls.request_context.get() |
| if "request_end_list" not in meta: |
| meta["request_end_list"] = [] |
| meta["request_end_list"].append(new_request_end) |
| |
| @classmethod |
| def update_client_request_start(cls, new_client_request_start): |
| meta = cls.request_context.get() |
| if "client_request_start" not in meta: |
| meta["client_request_start"] = new_client_request_start |
| |
| @classmethod |
| def update_client_request_end(cls, new_client_request_end): |
| meta = cls.request_context.get() |
| meta["client_request_end"] = new_client_request_end |
| |
| @classmethod |
| def on_client_request_start(cls): |
| cls.update_client_request_start(time.perf_counter()) |
| |
| @classmethod |
| def on_client_request_end(cls): |
| cls.update_client_request_end(time.perf_counter()) |
| |
| @classmethod |
| def on_request_start(cls): |
| cls.update_request_start(time.perf_counter()) |
| |
| @classmethod |
| def on_request_end(cls): |
| cls.update_request_end(time.perf_counter()) |
| |
| @classmethod |
| def return_raw_response(cls): |
| ctx = cls.request_context.get() |
| ctx["raw_response"] = True |