blob: 9ed5d69bb132d53b3bf479d46a569695119ba9b8 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import argparse
import traceback
import sys
try:
from google.protobuf.text_format import Merge
from google.protobuf.text_format import MessageToString
except Exception as missingLib:
sys.exit("You need python protobuf library. Get it from: pip install protobuf")
try:
from proto import MLDataFormats_pb2
except Exception as missingLib:
sys.exit("Incompatible proto/MLDataFormats_pb2.py. Regenerate using: "+
"protoc -I=${PULSAR_PATH}/managed-ledger/src/main/proto --python_out=${PULSAR_PATH}/bin/proto/ "+
"${PULSAR_PATH}/managed-ledger/src/main/proto/MLDataFormats.proto")
try:
from kazoo.client import KazooClient
except Exception as missingLib:
sys.exit("You need Kazoo ZK client library. Get it from: pip install kazoo")
'''
This util provides API to access managed-ledger data and also
provides command line tool-access to execute these commands.
'''
managedLedgerPath = "/managed-ledgers/"
printMlCommand = "print-managed-ledger"
deleteMlLedgerIds = "delete-managed-ledger-ids"
printCursorsCommands = "print-cursor"
updateMakDeleteCursor = "update-mark-delete-cursor"
'''
Returns managed-ledger info for given managed-leger path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
'''
def getManagedLedgerInfo(zk, mlPath):
try:
# get managed-ledger info
mlData = zk.get(mlPath)[0]
mlInfo = MLDataFormats_pb2.ManagedLedgerInfo()
try:
mlInfo.ParseFromString(mlData)
except Exception as formatException:
Merge(mlData, mlInfo)
return mlInfo
except Exception as e:
traceback.print_exc()
print('Failed to get data for {} due to {}'.format(mlPath, repr(e)))
'''
Delete specific ledgerIds from the managed-ledger info and updates into zk
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
deleteLedgerIds : str
comma separated deleting ledger-ids (eg: 123,124)
'''
def deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIds):
try:
# get managed-ledger info
(mlData, mlStat) = zk.get(mlPath)
mlInfo = MLDataFormats_pb2.ManagedLedgerInfo()
isTextFormat = False
try:
mlInfo.ParseFromString(mlData)
except Exception as formatException:
Merge(mlData, mlInfo)
isTextFormat = True
ledgerInfoList = mlInfo.ledgerInfo
i = 0
while i < len(ledgerInfoList):
ledgerInfo = ledgerInfoList[i]
if ledgerInfo.ledgerId in deletLedgerIds:
ledgerInfoList.remove(ledgerInfo)
else:
i += 1
updatedMlInfo = None
if isTextFormat:
updatedMlInfo = MessageToString(mlInfo, True)
else:
updatedMlInfo = mlInfo.SerializeToString();
zk.set(mlPath, updatedMlInfo, version=mlStat.version)
print('Updated {} with value\n{}'.format(mlPath, str(mlInfo)))
except Exception as e:
traceback.print_exc()
print('Failed to delete ledgerIds for {} due to {}'.format(mlPath, repr(e)))
'''
Returns managed-ledger cursor info for given managed-cursor path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
'''
def getManagedCursorInfo(zk, mlPath):
try:
cursors = zk.get_children(mlPath)
cursorList = {}
for cursor in cursors:
cursorData = zk.get(mlPath + "/" + cursor)[0]
cursorInfo = MLDataFormats_pb2.ManagedCursorInfo()
try:
cursorInfo.ParseFromString(cursorData)
except Exception as formatException:
Merge(cursorData, cursorInfo)
cursorList[cursor] = cursorInfo
return cursorList
except Exception as e:
traceback.print_exc()
print('Failed to get ml-cursor {} due to {}'.format(mlPath, repr(e)))
'''
Update mark-delete position of the given managed-cursor into zk
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
markDeletePosition: str
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
'''
def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId):
try:
(cursorData, cursorStat) = zk.get(cursorPath)
cursorInfo = MLDataFormats_pb2.ManagedCursorInfo()
isTextFormat = False
try:
cursorInfo.ParseFromString(cursorData)
except Exception as formatException:
Merge(cursorData, cursorInfo)
isTextFormat = True
cursorInfo.markDeleteLedgerId = markDeleteLedgerId
cursorInfo.markDeleteEntryId = markDeleteEntryId
sData = None
if isTextFormat:
sData = MessageToString(cursorInfo, True)
else:
sData = cursorInfo.SerializeToString()
zk.set(cursorPath, sData, version=cursorStat.version)
print('Updated {} with value \n{}'.format(cursorPath, cursorInfo))
except Exception as e:
traceback.print_exc()
print('Failed to update ml-cursor {} due to {}'.format(cursorPath, repr(e)))
'''
print managed-ledger info for given managed-leger path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
eg:
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test
'''
def printManagedLedgerCommand(zk, mlPath):
print(getManagedLedgerInfo(zk, mlPath))
'''
print managed-ledger cursor info for given managed-cursor path
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
eg:
print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1
'''
def printManagedCursorCommand(zk, mlPath, cursorName):
try:
if cursorName:
print(getManagedCursorInfo(zk, mlPath)[cursorName])
else:
print('Usage: --command {} [--cursorName]'.format(printCursorsCommands))
except Exception as e:
traceback.print_exc()
print('No cursor found for {}/{}'.format(mlPath, cursorName))
'''
delete specific ledgerIds from the managed-ledger info
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
deleteLedgerIds : str
comma separated deleting ledger-ids (eg: 123,124)
eg:
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3
'''
def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds):
try:
if deleteLedgerIds:
deletLedgerIds = set(deleteLedgerIds.split(","))
deletLedgerIdSet = set()
for id in deletLedgerIds:
deletLedgerIdSet.add(long(id))
deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIdSet)
else:
print('Usage: --command {} [--ledgerIds]'.format(deleteMlLedgerIds))
except Exception as e:
traceback.print_exc()
print('Failed to delete ml-ledger_ids {} due to {}'.format(mlPath, repr(e)))
'''
Update mark-delete position of the given managed-cursor
Parameters
----------
zk : KazooClient
Zookeeper-client instance to query zk-client.
mlPath : str
managed-ledger path
cursorName : str
managed-cursor path
markDeletePosition: str
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
eg:
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1
'''
def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition):
try:
if cursorName:
if markDeletePosition:
positionPair = markDeletePosition.split(":")
if len(positionPair) == 2:
updateCursorMarkDelete(zk, mlPath + "/" + cursorName, (long(positionPair[0])), long(positionPair[1]))
else:
print("markDeletePosition must be in format <ledger_id>:<entry_id>")
else:
print('Usage: --command {} [----cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor))
else:
print('Usage: --command {} [--cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor))
except Exception as e:
traceback.print_exc()
print('Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e)))
if __name__ in '__main__':
commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor)
try:
command = sys.argv[1]
except Exception as indexError:
print('ERROR: Pass command as a first argument, supported {}\n\n'.format(commandHelpText))
arguments = sys.argv[2:]
parser = argparse.ArgumentParser()
parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port")
parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path")
parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated")
parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name")
parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: <ledger_id>:<entry_id>")
args = parser.parse_args(arguments)
zkSrvr = args.zkServer
mlPath = managedLedgerPath + args.managedLedgerPath
deleteLedgerIds = args.ledgerIds
cursorName = args.cursorName
cursorMarkDelete = args.cursorMarkDelete
zk = KazooClient(hosts=zkSrvr)
zk.start()
if command == printMlCommand:
printManagedLedgerCommand(zk, mlPath)
elif command == deleteMlLedgerIds:
deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds)
elif command == printCursorsCommands:
printManagedCursorCommand(zk, mlPath, cursorName)
elif command == updateMakDeleteCursor:
updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete)
else:
print('{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText))