| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import sys,re |
| from itertools import groupby |
| from bz2 import BZ2File |
| from gzip import GzipFile as GZFile |
| try: |
| from urllib.request import urlopen |
| except: |
| from urllib2 import urlopen as urlopen |
| |
| class AMRawEvent(object): |
| def __init__(self, ts, dag, event, args): |
| self.ts = ts |
| self.dag = dag |
| self.event = event |
| self.args = args |
| def __repr__(self): |
| return "%s->%s (%s)" % (self.dag, self.event, self.args) |
| |
| def first(l): |
| return (l[:1] or [None])[0] |
| |
| def kv_add(d, k, v): |
| if(d.has_key(k)): |
| oldv = d[k] |
| if(type(oldv) is list): |
| oldv.append(v) |
| else: |
| oldv = [oldv, v] |
| d[k] = oldv |
| else: |
| d[k] = v |
| |
| def csv_kv(args): |
| kvs = {}; |
| pairs = [p.strip() for p in args.split(",")] |
| for kv in pairs: |
| if(kv.find("=") == -1): |
| kv_add(kvs, kv, None) |
| elif(kv.find("=") == kv.rfind("=")): |
| (k,v) = kv.split("=") |
| kv_add(kvs, k, v) |
| return kvs |
| |
| class AppMaster(object): |
| def __init__(self, raw): |
| self.raw = raw |
| self.kvs = csv_kv(raw.args) |
| self.name = self.kvs["appAttemptId"] |
| self.zero = int(self.kvs["startTime"]) |
| #self.ready = int(self.kvs["initTime"]) |
| #self.start = int(self.kvs["appSubmitTime"]) |
| self.containers = None |
| self.dags = None |
| def __repr__(self): |
| return "[%s started at %d]" % (self.name, self.zero) |
| |
| class DummyAppMaster(object): |
| """ magic of duck typing """ |
| def __init__(self, dag): |
| self.raw = None |
| self.kvs = {} |
| self.name = "Appmaster for %s" % dag.name |
| self.zero = dag.start |
| self.containers = None |
| self.dags = None |
| |
| class Container(object): |
| def __init__(self, raw): |
| self.raw = raw |
| self.kvs = csv_kv(raw.args) |
| self.name = self.kvs["containerId"] |
| self.start = int(self.kvs["launchTime"]) |
| self.stop = -1 |
| self.status = 0 |
| self.node ="" |
| def __repr__(self): |
| return "[%s start=%d]" % (self.name, self.start) |
| |
| class DummyContainer(object): |
| def __init__(self, attempt): |
| self.raw = None |
| self.kvs = {} |
| self.name = attempt.container |
| self.status = 0 |
| self.start = attempt.start |
| self.stop = -1 |
| self.status = 0 |
| self.node = None |
| |
| class DAG(object): |
| def __init__(self, raw): |
| self.raw = raw |
| self.name = raw.dag |
| self.kvs = csv_kv(raw.args) |
| self.start = (int)(self.kvs["startTime"]) |
| self.finish = (int)(self.kvs["finishTime"]) |
| self.duration = (int)(self.kvs["timeTaken"]) |
| def structure(self, vertexes): |
| self.vertexes = [v for v in vertexes if v.dag == self.name] |
| def attempts(self): |
| for v in self.vertexes: |
| for t in v.tasks: |
| for a in t.attempts: |
| if(a.dag == self.name): |
| yield a |
| def __repr__(self): |
| return "%s (%d+%d)" % (self.name, self.start, self.duration) |
| |
| class Vertex(object): |
| def __init__(self, raw): |
| self.raw = raw |
| self.dag = raw.dag |
| self.kvs = csv_kv(raw.args) |
| self.name = self.kvs["vertexName"] |
| self.initZero = (int)(self.kvs["initRequestedTime"]) |
| self.init = (int)(self.kvs["initedTime"]) |
| self.startZero = (int)(self.kvs["startRequestedTime"]) |
| self.start = (int)(self.kvs["startedTime"]) |
| self.finish = (int)(self.kvs["finishTime"]) |
| self.duration = (int)(self.kvs["timeTaken"]) |
| def structure(self, tasks): |
| self.tasks = [t for t in tasks if t.vertex == self.name] |
| def __repr__(self): |
| return "%s (%d+%d)" % (self.name, self.start, self.duration) |
| |
| |
| class Task(object): |
| def __init__(self, raw): |
| self.raw = raw |
| self.dag = raw.dag |
| self.kvs = csv_kv(raw.args) |
| self.vertex = self.kvs["vertexName"] |
| self.name = self.kvs["taskId"] |
| self.start = (int)(self.kvs["startTime"]) |
| self.finish = (int)(self.kvs["finishTime"]) |
| self.duration = (int)(self.kvs["timeTaken"]) |
| def structure(self, attempts): |
| self.attempts = [a for a in attempts if a.task == self.name] |
| def __repr__(self): |
| return "%s (%d+%d)" % (self.name, self.start, self.duration) |
| |
| class Attempt(object): |
| def __init__(self, pair): |
| start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair)) |
| finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair)) |
| if start is None or finish is None: |
| print [start, finish]; |
| self.raw = finish |
| self.kvs = csv_kv(start.args) |
| if finish is not None: |
| self.dag = finish.dag |
| self.kvs.update(csv_kv(finish.args)) |
| self.finish = (int)(self.kvs["finishTime"]) |
| self.duration = (int)(self.kvs["timeTaken"]) |
| self.name = self.kvs["taskAttemptId"] |
| self.task = self.name[:self.name.rfind("_")].replace("attempt","task") |
| (_, _, amid, dagid, vertexid, taskid, attemptid) = self.name.split("_") |
| self.tasknum = int(taskid) |
| self.attemptnum = int(attemptid) |
| self.vertex = self.kvs["vertexName"] |
| self.start = (int)(self.kvs["startTime"]) |
| self.container = self.kvs["containerId"] |
| self.node = self.kvs["nodeId"] |
| def __repr__(self): |
| return "%s (%d+%d)" % (self.name, self.start, self.duration) |
| |
| |
| def open_file(f): |
| if(f.endswith(".gz")): |
| return GZFile(f) |
| elif(f.endswith(".bz2")): |
| return BZ2File(f) |
| elif(f.startswith("http://")): |
| return urlopen(f) |
| return open(f) |
| |
| class AMLog(object): |
| def init(self): |
| ID=r'[^\]]*' |
| TS=r'[0-9:\-, ]*' |
| MAIN_RE=r'^(?P<ts>%(ts)s) [?INFO]? [(?P<thread>%(id)s)] \|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?: [HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)' |
| MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]') |
| MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID} |
| self.MAIN_RE = re.compile(MAIN_RE) |
| |
| def __init__(self, f): |
| fp = open_file(f) |
| self.init() |
| self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp]) |
| |
| def structure(self): |
| am = self.appmaster() # this is a copy |
| containers = dict([(a.name, a) for a in self.containers()]) |
| dags = self.dags() |
| vertexes = self.vertexes() |
| tasks = self.tasks() |
| attempts = self.attempts() |
| for t in tasks: |
| t.structure(attempts) |
| for v in vertexes: |
| v.structure(tasks) |
| for d in dags: |
| d.structure(vertexes) |
| for a in attempts: |
| if containers.has_key(a.container): |
| c = containers[a.container] |
| c.node = a.node |
| else: |
| c = DummyContainer(a) |
| containers[a.container] = c |
| if not am: |
| am = DummyAppMaster(first(dags)) |
| am.containers = containers |
| am.dags = dags |
| return am |
| |
| def appmaster(self): |
| return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"]) |
| |
| def containers(self): |
| containers = [Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED"] |
| containermap = dict([(c.name, c) for c in containers]) |
| for ev in self.events: |
| if ev.event == "CONTAINER_STOPPED": |
| kvs = csv_kv(ev.args) |
| if containermap.has_key(kvs["containerId"]): |
| containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"]) |
| containermap[kvs["containerId"]].status = int(kvs["exitStatus"]) |
| return containers |
| |
| |
| def dags(self): |
| dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"] |
| return dags |
| |
| def vertexes(self): |
| """ yes, not vertices """ |
| vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"] |
| return vertexes |
| |
| def tasks(self): |
| tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"] |
| return tasks |
| |
| def attempts(self): |
| key = lambda a:a[0] |
| value = lambda a:a[1] |
| raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"] |
| pairs = groupby(sorted(raw), key = key) |
| attempts = [Attempt(map(value,p)) for (k,p) in pairs] |
| return attempts |
| |
| def parse(self, l): |
| if(l.find("[HISTORY]") != -1): |
| m = self.MAIN_RE.match(l) |
| ts = m.group("ts") |
| dag = m.group("dag") |
| event = m.group("event") |
| args = m.group("args") |
| return AMRawEvent(ts, dag, event, args) |
| |
| def main(argv): |
| tree = AMLog(argv[0]).structure() |
| # AM -> dag -> vertex -> task -> attempt |
| # AM -> container |
| for d in tree.dags: |
| for a in d.attempts(): |
| print [a.vertex, a.name, a.container, a.start, a.finish] |
| |
| if __name__ == "__main__": |
| main(sys.argv[1:]) |