| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import argparse |
| import traceback |
| import sys |
| try: |
| from google.protobuf.text_format import Merge |
| from google.protobuf.text_format import MessageToString |
| except Exception as missingLib: |
| sys.exit("You need python protobuf library. Get it from: pip install protobuf") |
| |
| try: |
| from proto import MLDataFormats_pb2 |
| except Exception as missingLib: |
| sys.exit("Incompatible proto/MLDataFormats_pb2.py. Regenerate using: "+ |
| "protoc -I=${PULSAR_PATH}/managed-ledger/src/main/proto --python_out=${PULSAR_PATH}/bin/proto/ "+ |
| "${PULSAR_PATH}/managed-ledger/src/main/proto/MLDataFormats.proto") |
| |
| try: |
| from kazoo.client import KazooClient |
| except Exception as missingLib: |
| sys.exit("You need Kazoo ZK client library. Get it from: pip install kazoo") |
| |
| ''' |
| This util provides API to access managed-ledger data and also |
| provides command line tool-access to execute these commands. |
| ''' |
| managedLedgerPath = "/managed-ledgers/" |
| printMlCommand = "print-managed-ledger" |
| deleteMlLedgerIds = "delete-managed-ledger-ids" |
| printCursorsCommands = "print-cursor" |
| updateMakDeleteCursor = "update-mark-delete-cursor" |
| |
| ''' |
| Returns managed-ledger info for given managed-leger path |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| ''' |
| def getManagedLedgerInfo(zk, mlPath): |
| try: |
| # get managed-ledger info |
| mlData = zk.get(mlPath)[0] |
| mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() |
| try: |
| mlInfo.ParseFromString(mlData) |
| except Exception as formatException: |
| Merge(mlData, mlInfo) |
| return mlInfo |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to get data for {} due to {}'.format(mlPath, repr(e))) |
| |
| |
| ''' |
| Delete specific ledgerIds from the managed-ledger info and updates into zk |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| deleteLedgerIds : str |
| comma separated deleting ledger-ids (eg: 123,124) |
| ''' |
| def deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIds): |
| try: |
| # get managed-ledger info |
| (mlData, mlStat) = zk.get(mlPath) |
| mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() |
| isTextFormat = False |
| try: |
| mlInfo.ParseFromString(mlData) |
| except Exception as formatException: |
| Merge(mlData, mlInfo) |
| isTextFormat = True |
| |
| ledgerInfoList = mlInfo.ledgerInfo |
| |
| i = 0 |
| while i < len(ledgerInfoList): |
| ledgerInfo = ledgerInfoList[i] |
| if ledgerInfo.ledgerId in deletLedgerIds: |
| ledgerInfoList.remove(ledgerInfo) |
| else: |
| i += 1 |
| |
| updatedMlInfo = None |
| if isTextFormat: |
| updatedMlInfo = MessageToString(mlInfo, True) |
| else: |
| updatedMlInfo = mlInfo.SerializeToString(); |
| zk.set(mlPath, updatedMlInfo, version=mlStat.version) |
| print('Updated {} with value\n{}'.format(mlPath, str(mlInfo))) |
| |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to delete ledgerIds for {} due to {}'.format(mlPath, repr(e))) |
| |
| ''' |
| Returns managed-ledger cursor info for given managed-cursor path |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| cursorName : str |
| managed-cursor path |
| ''' |
| def getManagedCursorInfo(zk, mlPath): |
| try: |
| cursors = zk.get_children(mlPath) |
| cursorList = {} |
| for cursor in cursors: |
| cursorData = zk.get(mlPath + "/" + cursor)[0] |
| cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() |
| try: |
| cursorInfo.ParseFromString(cursorData) |
| except Exception as formatException: |
| Merge(cursorData, cursorInfo) |
| cursorList[cursor] = cursorInfo |
| return cursorList |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to get ml-cursor {} due to {}'.format(mlPath, repr(e))) |
| |
| |
| ''' |
| Update mark-delete position of the given managed-cursor into zk |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| cursorName : str |
| managed-cursor path |
| markDeletePosition: str |
| markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1) |
| ''' |
| def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId): |
| try: |
| (cursorData, cursorStat) = zk.get(cursorPath) |
| cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() |
| isTextFormat = False |
| try: |
| cursorInfo.ParseFromString(cursorData) |
| except Exception as formatException: |
| Merge(cursorData, cursorInfo) |
| isTextFormat = True |
| cursorInfo.markDeleteLedgerId = markDeleteLedgerId |
| cursorInfo.markDeleteEntryId = markDeleteEntryId |
| sData = None |
| if isTextFormat: |
| sData = MessageToString(cursorInfo, True) |
| else: |
| sData = cursorInfo.SerializeToString() |
| zk.set(cursorPath, sData, version=cursorStat.version) |
| print('Updated {} with value \n{}'.format(cursorPath, cursorInfo)) |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to update ml-cursor {} due to {}'.format(cursorPath, repr(e))) |
| |
| |
| |
| ''' |
| print managed-ledger info for given managed-leger path |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| |
| eg: |
| print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test |
| ''' |
| def printManagedLedgerCommand(zk, mlPath): |
| print(getManagedLedgerInfo(zk, mlPath)) |
| |
| |
| ''' |
| print managed-ledger cursor info for given managed-cursor path |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| cursorName : str |
| managed-cursor path |
| |
| eg: |
| print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 |
| ''' |
| def printManagedCursorCommand(zk, mlPath, cursorName): |
| try: |
| if cursorName: |
| print(getManagedCursorInfo(zk, mlPath)[cursorName]) |
| else: |
| print('Usage: --command {} [--cursorName]'.format(printCursorsCommands)) |
| except Exception as e: |
| traceback.print_exc() |
| print('No cursor found for {}/{}'.format(mlPath, cursorName)) |
| |
| ''' |
| delete specific ledgerIds from the managed-ledger info |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| deleteLedgerIds : str |
| comma separated deleting ledger-ids (eg: 123,124) |
| eg: |
| delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3 |
| ''' |
| def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): |
| try: |
| if deleteLedgerIds: |
| deletLedgerIds = set(deleteLedgerIds.split(",")) |
| deletLedgerIdSet = set() |
| for id in deletLedgerIds: |
| deletLedgerIdSet.add(long(id)) |
| deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIdSet) |
| else: |
| print('Usage: --command {} [--ledgerIds]'.format(deleteMlLedgerIds)) |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to delete ml-ledger_ids {} due to {}'.format(mlPath, repr(e))) |
| |
| ''' |
| Update mark-delete position of the given managed-cursor |
| Parameters |
| ---------- |
| zk : KazooClient |
| Zookeeper-client instance to query zk-client. |
| mlPath : str |
| managed-ledger path |
| cursorName : str |
| managed-cursor path |
| markDeletePosition: str |
| markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1) |
| |
| eg: |
| update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1 |
| ''' |
| def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): |
| try: |
| if cursorName: |
| if markDeletePosition: |
| positionPair = markDeletePosition.split(":") |
| if len(positionPair) == 2: |
| updateCursorMarkDelete(zk, mlPath + "/" + cursorName, (long(positionPair[0])), long(positionPair[1])) |
| else: |
| print("markDeletePosition must be in format <ledger_id>:<entry_id>") |
| else: |
| print('Usage: --command {} [----cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor)) |
| else: |
| print('Usage: --command {} [--cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor)) |
| |
| |
| except Exception as e: |
| traceback.print_exc() |
| print('Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e))) |
| |
| if __name__ in '__main__': |
| |
| commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor) |
| |
| try: |
| command = sys.argv[1] |
| except Exception as indexError: |
| print('ERROR: Pass command as a first argument, supported {}\n\n'.format(commandHelpText)) |
| arguments = sys.argv[2:] |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port") |
| parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path") |
| parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated") |
| parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name") |
| parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: <ledger_id>:<entry_id>") |
| args = parser.parse_args(arguments) |
| |
| zkSrvr = args.zkServer |
| mlPath = managedLedgerPath + args.managedLedgerPath |
| deleteLedgerIds = args.ledgerIds |
| cursorName = args.cursorName |
| cursorMarkDelete = args.cursorMarkDelete |
| |
| zk = KazooClient(hosts=zkSrvr) |
| zk.start() |
| |
| if command == printMlCommand: |
| printManagedLedgerCommand(zk, mlPath) |
| elif command == deleteMlLedgerIds: |
| deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds) |
| elif command == printCursorsCommands: |
| printManagedCursorCommand(zk, mlPath, cursorName) |
| elif command == updateMakDeleteCursor: |
| updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete) |
| else: |
| print('{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText)) |