blob: 795c8027c9703eba69ef4e915a0d0d892ce13eba [file]
#!/usr/bin/env python3
"""ASFQuart - User session methods and decorators"""
import typing
from . import base, ldap
import time
import binascii
import quart.sessions
import asfquart
import asyncio
class ClientSession(dict):
def __init__(self, raw_data: dict):
"""Initializes a client session from a raw dict. ClientSession is a subclassed dict, so that
we can send it to quart in a format it can render."""
super().__init__()
self.uid = raw_data.get("uid")
self.dn = raw_data.get("dn")
self.fullname = raw_data.get("fullname")
self.email = raw_data.get("email", f"{self.uid}@apache.org")
self.isMember = raw_data.get("isMember", False)
self.isChair = raw_data.get("isChair", False)
self.isRoot = raw_data.get("isRoot", False)
self.committees = raw_data.get("pmcs", [])
self.projects = raw_data.get("projects", [])
self.mfa = raw_data.get("mfa", False)
self.isRole = raw_data.get("roleaccount", False)
self.metadata = raw_data.get("metadata", {}) # This can contain whatever specific metadata the app needs
# Update the external dict representation with internal values
self.update(self.__dict__.items())
async def read(expiry_time=86400*7, app=None) -> typing.Optional[ClientSession]:
"""Fetches a cookie-based session if found (and valid), and updates the last access timestamp
for the session."""
if app is None:
app = asfquart.APP
# We store the session cookie using the app.app_id identifier, to distinguish between
# two asfquart apps running on the same hostname.
cookie_id = app.app_id
if cookie_id in quart.session:
now = time.time()
cookie_expiry_deadline = now - expiry_time
session_dict = quart.session[cookie_id]
if isinstance(session_dict, dict):
session_update_timestamp = session_dict.get("uts", 0)
# If a session cookie has expired (not updated/used for seven days), we delete it instead of returning it
if session_update_timestamp < cookie_expiry_deadline:
del quart.session[cookie_id]
# If it's still valid, use it
else:
# Update the timestamp, since the session has been requested (and thus used)
session_dict["uts"] = now
return ClientSession(session_dict)
# Check for session providers in Auth header. These sessions are created ad-hoc, and do not linger in the
# quart session DB. Since quart.request is not defined inside testing frameworks, the bool(request) test
# asks the werkzeug LocalProxy wrapper whether a request exists or not, and bails if not.
elif bool(quart.request) and 'Authorization' in quart.request.headers:
match quart.request.authorization.type:
case "bearer": # Role accounts, PATs - TBD
if app.token_handler:
assert callable(app.token_handler), "app.token_handler is not a callable function!"
session_dict = None # Blank, in case we don't have a working callback.
# Async token handler?
if asyncio.iscoroutinefunction(app.token_handler):
session_dict = await app.token_handler(quart.request.authorization.token)
# Sync handler?
elif callable(app.token_handler):
session_dict = app.token_handler(quart.request.authorization.token)
# If token handler returns a dict, we have a session and should set it up
if session_dict:
return ClientSession(session_dict)
else:
print(f"Debug: No PAT handler registered to handle token {quart.request.authorization.token}")
case "basic": # Basic LDAP auth - will need to grab info from LDAP
if ldap.LDAP_SUPPORTED:
try:
auth_user = quart.request.authorization.parameters["username"]
auth_pwd = quart.request.authorization.parameters["password"]
ldap_client = ldap.LDAPClient(auth_user, auth_pwd)
ldap_affiliations = await ldap_client.get_affiliations()
# Convert to the usual session dict. TODO: add a single standardized parser/class for sessions
session_dict = {
"uid": auth_user,
"pmcs": ldap_affiliations[ldap.DEFAULT_OWNER_ATTR],
"projects": ldap_affiliations[ldap.DEFAULT_MEMBER_ATTR],
}
return ClientSession(session_dict)
except (binascii.Error, ValueError, KeyError) as e:
# binascii/ValueError == bad base64 auth string
# KeyError = missing username or password
raise base.ASFQuartException("Invalid Authorization header provided", errorcode=400)
case default:
raise base.ASFQuartException("Not implemented yet", errorcode=501)
def write(session_data: dict, app=None):
"""Sets a cookie-based user session for this app"""
if app is None:
app = asfquart.APP
cookie_id = app.app_id
dict_copy = session_data.copy() # Copy dict so we don't mess with the original data
dict_copy["uts"] = time.time() # Set last access timestamp for expiry checks later
quart.session[cookie_id] = dict_copy
def clear(app=None):
"""Clears a session"""
if app is None:
app = asfquart.APP
quart.session.pop(app.app_id, None) # Safely pop the session if it's there.