blob: c52b391e35a5d54e6feb7f04e8e43051a7f19a2f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
import jwt
class JWTSigner:
"""
Signs and verifies JWT Token. Used to authorise and verify requests.
:param secret_key: key used to sign the request
:param expiration_time_in_seconds: time after which the token becomes invalid (in seconds)
:param audience: audience that the request is expected to have
:param leeway_in_seconds: leeway that allows for a small clock skew between the two parties
:param algorithm: algorithm used for signing
"""
def __init__(
self,
secret_key: str,
expiration_time_in_seconds: int,
audience: str,
leeway_in_seconds: int = 5,
algorithm: str = "HS512",
):
self._secret_key = secret_key
self._expiration_time_in_seconds = expiration_time_in_seconds
self._audience = audience
self._leeway_in_seconds = leeway_in_seconds
self._algorithm = algorithm
def generate_signed_token(self, extra_payload: dict[str, Any]) -> str:
"""
Generate JWT with extra payload added.
:param extra_payload: extra payload that is added to the signed token
:return: signed token
"""
jwt_dict = {
"aud": self._audience,
"iat": datetime.utcnow(),
"nbf": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
}
jwt_dict.update(extra_payload)
token = jwt.encode(
jwt_dict,
self._secret_key,
algorithm=self._algorithm,
)
return token
def verify_token(self, token: str) -> dict[str, Any]:
payload = jwt.decode(
token,
self._secret_key,
leeway=timedelta(seconds=self._leeway_in_seconds),
algorithms=[self._algorithm],
options={
"verify_signature": True,
"require": ["exp", "iat", "nbf"],
},
audience=self._audience,
)
return payload