| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """Simple LDAP change detector and publisher via pypubsub.""" |
| |
| import sys |
| import yaml |
| import time |
| import requests |
| import ldap |
| import ldapurl |
| import ldap.ldapobject |
| import ldap.syncrepl |
| |
| |
| class SyncReplClient(ldap.ldapobject.ReconnectLDAPObject, ldap.syncrepl.SyncreplConsumer): |
| def __init__(self, *args, **kwargs): |
| self.cookie = None |
| self.sync_done = False |
| self.changedb = {} |
| self.__presentUUIDs = {} |
| self.pubsub_url = "http://localhost:2069/private/ldap" |
| |
| ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs) |
| |
| def syncrepl_get_cookie(self): |
| return self.cookie |
| |
| def syncrepl_set_cookie(self,cookie): |
| self.cookie = cookie |
| |
| def syncrepl_entry(self, dn, attributes, uuid): |
| previous_attributes = {} |
| if uuid in self.changedb: |
| change_type = 'modify' |
| previous_attributes = self.changedb[uuid] |
| else: |
| change_type = 'add' |
| attributes['dn'] = dn |
| self.changedb[uuid] = attributes |
| if self.sync_done: |
| print('Detected %s of entry %r' % (change_type, dn)) |
| self.post_change(dn, attributes, previous_attributes, change_type) |
| |
| def syncrepl_delete(self,uuids): |
| uuids = [uuid for uuid in uuids if uuid in self.changedb] |
| for uuid in uuids: |
| dn = self.changedb[uuid]['dn'] |
| print('Detected deletion of entry %r' % dn) |
| self.post_change(dn, {}, self.changedb[uuid], 'delete') |
| del self.changedb[uuid] |
| |
| def syncrepl_present(self,uuids,refreshDeletes=False): |
| if uuids is None: |
| if refreshDeletes is False: |
| deleted_entries = [ |
| uuid |
| for uuid in self.changedb.keys() |
| if uuid not in self.__presentUUIDs and uuid != 'ldap_cookie' |
| ] |
| self.syncrepl_delete( deleted_entries ) |
| self.__presentUUIDs = {} |
| else: |
| if refreshDeletes is True: |
| self.syncrepl_delete( uuids ) |
| else: |
| for uuid in uuids: |
| self.__presentUUIDs[uuid] = True |
| |
| def syncrepl_refreshdone(self): |
| print('Initial sync done, polling for changes...') |
| self.sync_done = True |
| |
| |
| def post_change(self,dn,attributes,previous_attributes, change_type): |
| print(f"Publishing change-set for {dn}") |
| js = { |
| 'dn': stringify(dn), |
| 'change_type': change_type, |
| 'old_attributes': stringify(previous_attributes), |
| 'new_attributes': stringify(attributes), |
| } |
| try: |
| requests.put(self.pubsub_url, json = js) |
| except Exception as e: |
| print(f"Could not push payload: {e}") |
| return True |
| |
| def set_pubsub_url(self, url): |
| self.pubsub_url = url |
| |
| def stringify(obj): |
| """Turn bytes into strings within a nested dict/list""" |
| if isinstance(obj, bytes): |
| obj = str(obj, 'utf-8') |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| if isinstance(v, bytes): |
| obj[k] = str(v, 'utf-8') |
| elif isinstance(v, list): |
| obj[k] = stringify(v) |
| elif isinstance(obj, list): |
| newlist = [] |
| for el in obj: |
| if isinstance(el, bytes): |
| el = str(el, 'utf-8') |
| elif isinstance(el, list): |
| el = stringify(el) |
| newlist.append(el) |
| obj = newlist |
| return obj |
| |
| def main(config): |
| ldap_url = ldapurl.LDAPUrl(config['ldapurl']) |
| while True: |
| print('Connecting to %s...' % ldap_url.initializeUrl()) |
| # Prepare the LDAP server connection (triggers the connection as well) |
| ldap_connection = SyncReplClient(ldap_url.initializeUrl()) |
| if config.get("pubsuburl"): |
| ldap_connection.set_pubsub_url(config['pubsuburl']) |
| |
| # Now we login to the LDAP server |
| try: |
| ldap_connection.simple_bind_s(ldap_url.who, ldap_url.cred) |
| except ldap.INVALID_CREDENTIALS as err: |
| print('Login to LDAP server failed: %s' % err) |
| sys.exit(1) |
| except ldap.SERVER_DOWN: |
| print('LDAP server is down, going to retry.') |
| time.sleep(5) |
| continue |
| |
| # Commence the syncing |
| print('Starting syncrepl...') |
| ldap_search = ldap_connection.syncrepl_search( |
| ldap_url.dn or '', |
| ldap_url.scope or ldap.SCOPE_SUBTREE, |
| mode = 'refreshAndPersist', |
| attrlist=ldap_url.attrs, |
| filterstr = ldap_url.filterstr or '(objectClass=*)' |
| ) |
| |
| try: |
| while ldap_connection.syncrepl_poll( all = 1, msgid = ldap_search): |
| pass |
| except KeyboardInterrupt: |
| # User asked to exit |
| return |
| except Exception as err: |
| # Handle any exception |
| print('Unhandled exception, reconnecting in 5 seconds: %s' % err) |
| time.sleep(5) |
| |
| if __name__ == '__main__': |
| config = yaml.safe_load(open("pypubsub-ldap.yaml")) |
| main(config) |