blob: 00602d48b41b155577ca75790897986df955a88c [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Asynchronous LDAP client."""
#
# TYPICAL USAGE:
#
# ### TBD. look at code for now.
#
import asyncio
import logging
import concurrent.futures
import bonsai
# Re-map the LDAPSearchScope constants to our namespace.
# These can now be used as (eg.) asfpy.aioldap.SCOPE.SUBTREE
from bonsai import LDAPSearchScope as SCOPE
LOGGER = logging.getLogger(__name__)
class ASF_LDAPConnection:
def __init__(self, client, executor):
# NOTE: must be instantiated within one of the EXECUTOR threads.
# Shared Executor holding one or more threads.
self.executor = executor
# bonsai.LDAPConnection ties itself to a loop. Thus, whatever
# loop is used to create the connection must also be used to
# perform its operations. We will construct that loop here,
# and use it for all bonsai actions. It does not have any
# thread affinity, until it is running. Thus, we can use this
# loop within any thread of the Executor.
self.loop = asyncio.new_event_loop()
# Hack around GnuTLS bug with async.
# See: https://github.com/noirello/bonsai/issues/25
# and: https://github.com/noirello/bonsai/issues/69
#
# In THIS executor thread, we'll perform the synchronous
# connection, so that the main thread's event loop will
# not be blocked.
async def do_connect():
### for testing: pretend this connection blocks
#import time; time.sleep(5)
# Tell bonsai to connect synchronously.
bonsai.set_connect_async(False)
try:
# Ties to self.loop (the running loop now).
return await client.connect(is_async=True)
finally:
# Make sure this always gets reset.
bonsai.set_connect_async(True)
# The underlying LDAP connection, tied to our loop.
self.conn = self.loop.run_until_complete(do_connect())
def close(self):
self.conn.close()
self.conn = None # ensure self is unusable
async def use_loop(self, loop, method, *args, **kw):
"Running in LOOP, run/wait for METHOD with *ARGS and **KW."
if loop is None:
loop = asyncio.get_running_loop()
def call_method():
# This synchronous function is now running within a thread
# of the Executor. Use some async to run the LDAP connection's
# method within our loop.
return self.loop.run_until_complete(method(*args, **kw))
# Wait within the caller's loop for the result.
return await loop.run_in_executor(self.executor, call_method)
async def search(self, base, attrs, scope=SCOPE.SUBTREE, loop=None):
return await self.use_loop(loop,
self.conn.search,
base,
scope,
attrlist=attrs)
async def whoami(self, loop=None):
return await self.use_loop(loop, self.conn.whoami)
### TBD ASF-specific custom methods? or use app-specific subclasses?
class ASF_LDAPClient:
CONNECTION_CLASS = ASF_LDAPConnection
def __init__(self, uri, binddn, bindpw):
self.client = bonsai.LDAPClient(uri)
self.client.set_credentials("SIMPLE", binddn, bindpw)
self.client.set_cert_policy("allow") # TODO: Load our cert(?)
self.executor = concurrent.futures.ThreadPoolExecutor(
thread_name_prefix='aioldap')
def connect(self, loop=None):
if loop is None:
loop = asyncio.get_running_loop()
# Run (blocking) in an executor thread.
def blocking_connect():
return self.CONNECTION_CLASS(self.client, self.executor)
future = loop.run_in_executor(self.executor, blocking_connect)
#print('CONN-FUTURE:', future)
return ConnectContextManager(future)
# For debugging, we want the ASF_ prefix, but callers can skip it.
LDAPClient = ASF_LDAPClient
class ConnectContextManager:
def __init__(self, future):
self.future = future
async def __aenter__(self):
self.conn = await self.future
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
def test_conns(client):
# Run three tasks in parallel: heartbeat, connA, connB. The latter
# two will open, run a couple LDAP queries, then close. Then re-open.
# These will run in one event loop (the LDAPClient has some private
# loops; no peeking).
#
# Should see: smooth heartbeat, even when an artifical delay is
# introduced to the connect() process.
# Some random services to read
SERVICE_BASE = 'cn=%s,ou=groups,ou=services,dc=apache,dc=org'
SERVICES = [ 'board', 'infrastructure-root', 'asf-secretary', ]
# Start hanging tasks off this.
loop = asyncio.new_event_loop()
t0 = loop.time()
def ts():
return f'[{loop.time() - t0 :.2f}]'
async def print_me():
async with client.connect() as conn:
print('ME:', await conn.whoami())
async def heartbeat():
while True:
print(f'{ts()} heartbeat')
await asyncio.sleep(2)
import random
async def conn_usage(name):
while True:
# Stagger the connections
await asyncio.sleep(random.randint(0, 3))
async with client.connect() as conn:
for _ in range(5):
s = random.choice(SERVICES)
rv = await conn.search(SERVICE_BASE % (s,), ['owner', 'member',])
print(f'{ts()} CONN[{name}]: RV=', rv)
await asyncio.sleep(random.randint(1, 3))
# between reconnections
await asyncio.sleep(random.randint(0, 5))
loop.create_task(print_me())
loop.create_task(heartbeat())
loop.create_task(conn_usage('A'))
loop.create_task(conn_usage('B'))
loop.run_forever()
if __name__ == '__main__':
import os, getpass
u = os.environ.get('AIOLDAP_USER') or getpass.getuser()
dn = 'uid=%s,ou=people,dc=apache,dc=org' % u
p = os.environ.get('AIOLDAP_PASSWORD') or getpass.getpass(f"Password for {u}: ")
c = ASF_LDAPClient('ldaps://ldap-us.apache.org:636', dn, p)
test_conns(c)