| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import sys |
| from datetime import datetime |
| from optparse import OptionParser |
| |
| # Version of this tool software |
| MAJOR_VERSION = 1 |
| MINOR_VERSION = 1 |
| # === Version history === |
| # 2011-11-16 1.1: Bugfixs: |
| # QPID-3623 - Incorrect handling of transactions |
| # QPID-3624 - Replace argparse lib with optparse so tool can be used on Python 2.6. |
| # 2011-11-07 1.0: Initial checkin |
| # QPID-3579: Initial version checked in |
| |
| |
| # AMQP 0-10 commands - these increment the command counter |
| EXEC_COMMANDS = ["ExecutionSync", "ExecutionResult", "ExecutionException", "MessageTransfer", "MessageAccept", |
| "MessageReject", "MessageRelease", "MessageAcquire", "MessageResume", "MessageSubscribe", |
| "MessageCancel", "MessageSetFlowMode", "MessageFlow", "MessageFlush", "MessageStop", "TxSelect", |
| "TxCommit", "TxRollback", "DtxSelect", "DtxStart", "DtxEnd", "DtxCommit", "DtxForget", "DtxGetTimeout", |
| "DtxPrepare", "DtxRecover", "DtxRollback", "DtxSetTimeout", "ExchangeDeclare", "ExchangeDelete", |
| "ExchangeQuery", "ExchangeBind", "ExchangeUnbind", "ExchangeBound", "QueueDeclare", "QueueDelete", |
| "QueuePurge", "QueueQuery", "FileQos", "FileQosOk", "FileConsume", "FileConsumeOk", "FileCancel", |
| "FileOpen", "FileOpenOk", "FileStage", "FilePublish", "FileReturn", "FileDeliver", "FileAck", |
| "FileReject", "StreamQos", "StreamQosOk", "StreamConsume", "StreamConsumeOk", "StreamCancel", |
| "StreamPublish", "StreamReturn", "StreamDeliver"] |
| HEADER_STR = " -line ----------timestamp -----------connection ssn recv send- txn-- operation---------->" |
| |
| PROGRESS_LINES_PER_DOT = 100000 |
| |
| class LogLevel: |
| CRITICAL = (1, "critical") |
| ERROR = (2, "error") |
| WARNING = (3, "warning") |
| NOTICE = (4, "notice") |
| INFO = (5, "info") |
| DEBUG = (6, "debug") |
| TRACE = (7, "trace") |
| @staticmethod |
| def get_level(level): |
| if level == LogLevel.CRITICAL[1]: return LogLevel.CRITICAL |
| if level == LogLevel.ERROR[1]: return LogLevel.ERROR |
| if level == LogLevel.WARNING[1]: return LogLevel.WARNING |
| if level == LogLevel.NOTICE[1]: return LogLevel.NOTICE |
| if level == LogLevel.INFO[1]: return LogLevel.INFO |
| if level == LogLevel.DEBUG[1]: return LogLevel.DEBUG |
| if level == LogLevel.TRACE[1]: return LogLevel.TRACE |
| raise Exception("Unknown log level: %s" % level) |
| |
| class LogLine: |
| def __init__(self, line_no, line): |
| self.line_no = line_no |
| self.timestamp = datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S") |
| self.level = LogLevel.get_level(line[20:].split(" ")[0]) |
| self.line = line[21 + len(self.level[1]):].strip() |
| self.cmd_cnt = None |
| self.txn_cnt = None |
| def __str__(self): |
| if self.contains("RECV"): cnt_str = "R" |
| else: cnt_str = " S" |
| if self.cmd_cnt is not None: cnt_str += str(self.cmd_cnt) |
| set_index = self.find("{") |
| header_index = self.find("header") |
| content_index = self.find("content") |
| if self.txn_cnt is None: |
| txn_cnt_str = "" |
| else: |
| txn_cnt_str = "T%d" % self.txn_cnt |
| if header_index != -1 and header_index < set_index: op_str = " + " + self.line[header_index:self.line.rfind("]")] |
| elif content_index != -1 and set_index == -1: op_str = " + " + self.line[content_index:self.line.rfind("]")] |
| else: op_str = self.line[set_index+1:self.line.rfind("}")] |
| return " %7d %19s %22s %3d %-10s %-5s %s" % (self.line_no, self.timestamp.isoformat(" "), |
| self.get_identifier_remote_addr(), self.get_channel(), |
| cnt_str, txn_cnt_str, op_str) |
| def contains(self, string): |
| return self.line.find(string) != -1 |
| def find(self, string): |
| return self.line.find(string) |
| def get_channel(self): |
| return int(self.get_named_value("channel")) |
| def get_identifier(self): |
| return self.line.partition("[")[2].partition("]")[0] |
| def get_identifier_remote_addr(self): |
| return self.get_identifier().partition("-")[2] |
| def get_named_value(self, name): |
| return self.line.partition("%s=" % name)[2].partition(";")[0] |
| def get_msg_accept_range(self): |
| str_nums = self.get_named_value("transfers").strip(" {[]}").split(",") |
| return range(int(str_nums[0]), int(str_nums[1]) + 1) |
| def is_log_level(self, level): |
| if self.level is None: return None |
| return level[0] == self.level[0] |
| def is_frame(self): |
| return self.contains("Frame[") |
| |
| class ConnectionProperty: |
| def __init__(self, line): |
| self.addr = line.get_identifier_remote_addr() |
| self.channel = line.get_channel() |
| self.ops = [line] |
| def add_op(self, line): |
| self.ops.append(line) |
| |
| class Connection(ConnectionProperty): |
| def __init__(self, line): |
| ConnectionProperty.__init__(self, line) |
| self.session_list = [] # Keeps session creation order |
| self.session_dict = {} # For looking up by channel no. |
| def __str__(self): |
| return "Connection %s (ops=%d; sessions=%d):" % (self.addr, len(self.ops), len(self.session_dict)) |
| def add_session(self, session): |
| self.session_list.append(session) |
| self.session_dict[session.channel] = session |
| def get_session(self, channel): |
| return self.session_dict[channel] |
| |
| class Session(ConnectionProperty): |
| def __init__(self, line): |
| ConnectionProperty.__init__(self, line) |
| self.name = line.get_named_value("name") |
| self.send_cnt = 0 |
| self.recv_cnt = 0 |
| self.txn_flag = False |
| self.txn_cnt = 0 |
| self.recv_cmds = {} # For looking up by cmd no |
| self.send_cmds = {} # For looking up by cmd no |
| def __str__(self): |
| if self.txn_flag: |
| return " + Session %d (name=%s send-cmds=%d recv-cmds=%d txns=%d):" % (self.channel, self.name, |
| self.send_cnt, self.recv_cnt, |
| self.txn_cnt) |
| return " + Session %d (name=%s send-cmds=%d recv-cmds=%d non-txn):" % (self.channel, self.name, self.send_cnt, |
| self.recv_cnt) |
| def incr_recv_cnt(self, line): |
| self.recv_cmds[self.recv_cnt] = line |
| self.recv_cnt += 1 |
| def incr_send_cnt(self, line): |
| self.send_cmds[self.send_cnt] = line |
| self.send_cnt += 1 |
| def set_send_txn_cnt(self, cmd): |
| self.send_cmds[cmd].txn_cnt = self.txn_cnt |
| |
| class TraceAnalysis: |
| def __init__(self): |
| self.connection_list = [] # Keeps connection creation order |
| self.connection_dict = {} # For looking up by connection address |
| parser = OptionParser(usage="%prog [options] trace-file", version="%%prog %d.%d" % (MAJOR_VERSION, MINOR_VERSION), |
| description="A tool to structure and display Qpid broker trace logs.") |
| parser.add_option("--connection-summary", action="store_true", default=False, dest="connection_summary", |
| help="Hide connection details, provide one-line summary") |
| parser.add_option("--session-summary", action="store_true", default=False, dest="session_summary", |
| help="Hide session details, provide one-line summary") |
| parser.add_option("--summary", "-s", action="store_true", default=False, dest="summary", |
| help="Hide both connection and session details. Equivalent to --connection-summary and" |
| "--session-summary") |
| self.opts, self.args = parser.parse_args() |
| if len(self.args) == 0: raise Exception("Missing trace-file argument") |
| def analyze_trace(self): |
| lcnt = 0 |
| print "Reading trace file %s:" % self.args[0] |
| log_file = open(self.args[0], "r") |
| try: |
| for fline in log_file: |
| lcnt += 1 |
| try: |
| lline = LogLine(lcnt, fline) |
| if lline.is_log_level(LogLevel.TRACE) and lline.is_frame(): |
| if lline.contains("{ConnectionStartBody"): |
| conn = Connection(lline) |
| self.connection_list.append(conn) |
| self.connection_dict[conn.addr] = conn |
| elif lline.contains("{Connection"): |
| self.connection_dict[lline.get_identifier_remote_addr()].add_op(lline) |
| elif lline.contains("{SessionAttachBody"): |
| ssn = Session(lline) |
| self.connection_dict[ssn.addr].add_session(ssn) |
| else: |
| ssn = self.connection_dict[lline.get_identifier_remote_addr()].get_session(lline.get_channel()) |
| ssn.add_op(lline) |
| if lline.line[lline.find("{") + 1 : lline.find("Body")] in EXEC_COMMANDS: |
| if lline.contains("RECV"): |
| lline.cmd_cnt = ssn.recv_cnt |
| if ssn.txn_flag: |
| if lline.contains("MessageAcceptBody"): |
| lline.txn_cnt = ssn.txn_cnt |
| for cmd in lline.get_msg_accept_range(): |
| ssn.set_send_txn_cnt(cmd) |
| if lline.contains("MessageTransferBody"): lline.txn_cnt = ssn.txn_cnt |
| ssn.incr_recv_cnt(lline) |
| elif lline.contains("SEND") or lline.contains("SENT"): |
| lline.cmd_cnt = ssn.send_cnt |
| ssn.incr_send_cnt(lline) |
| # TODO: This treatment will probably break down for DTX |
| if lline.contains("xSelectBody"): |
| ssn.txn_flag = True |
| elif lline.contains("xCommitBody") or lline.contains("xRollbackBody"): |
| lline.txn_cnt = ssn.txn_cnt |
| ssn.txn_cnt += 1 |
| except KeyboardInterrupt, e: raise e |
| except: pass |
| if (lcnt + 1) % PROGRESS_LINES_PER_DOT == 0: |
| sys.stdout.write(".") |
| sys.stdout.flush() |
| finally: log_file.close() |
| if lcnt > PROGRESS_LINES_PER_DOT: print |
| print "Read and analyzed", lcnt, "lines." |
| def print_analysis(self): |
| if len(self.connection_list) > 0: |
| for c in self.connection_list: |
| print |
| print c |
| if not self.opts.connection_summary and not self.opts.summary: |
| print HEADER_STR |
| for o in c.ops: |
| print o |
| for s in c.session_list: |
| print s |
| if not self.opts.session_summary and not self.opts.summary: |
| print HEADER_STR |
| for o in s.ops: |
| print o |
| else: |
| print "No trace-level entries found in log." |
| |
| def check_python_version(major, minor, micro): |
| if sys.version_info < (major, minor, micro): |
| print "Incorrect Python version: %s found; >= %d.%d.%d needed." % (sys.version.split()[0], major, minor, micro) |
| sys.exit(-1) |
| |
| # === Main program === |
| |
| if __name__ == '__main__': |
| check_python_version(2, 4, 0) |
| t = TraceAnalysis() |
| t.analyze_trace() |
| t.print_analysis() |
| |