blob: e336d524fb1a6d8a5c21243c80feade4b05ae9e5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os,sys,re,math,os.path
from collections import defaultdict
from itertools import groupby
from bz2 import BZ2File
from gzip import GzipFile as GZFile
try:
from urllib.request import urlopen
except:
from urllib2 import urlopen as urlopen
class AMRawEvent(object):
def __init__(self, ts, dag, event, args):
self.ts = ts
self.dag = dag
self.event = event
self.args = args
def __repr__(self):
return "%s->%s (%s)" % (self.dag, self.event, self.args)
def first(l):
return (l[:1] or [None])[0]
def kv_add(d, k, v):
if(d.has_key(k)):
oldv = d[k]
if(type(oldv) is list):
oldv.append(v)
else:
oldv = [oldv, v]
d[k] = oldv
else:
d[k] = v
def csv_kv(args):
kvs = {};
pairs = [p.strip() for p in args.split(",")]
for kv in pairs:
if(kv.find("=") == -1):
kv_add(kvs, kv, None)
elif(kv.find("=") == kv.rfind("=")):
(k,v) = kv.split("=")
kv_add(kvs, k, v)
return kvs
class AppMaster(object):
def __init__(self, raw):
self.raw = raw
self.kvs = csv_kv(raw.args)
self.name = self.kvs["appAttemptId"]
self.zero = int(self.kvs["startTime"])
#self.ready = int(self.kvs["initTime"])
#self.start = int(self.kvs["appSubmitTime"])
self.containers = None
self.dags = None
def __repr__(self):
return "[%s started at %d]" % (self.name, self.zero)
class Container(object):
def __init__(self, raw):
self.raw = raw
self.kvs = csv_kv(raw.args)
self.name = self.kvs["containerId"]
self.start = int(self.kvs["launchTime"])
self.stop = -1
self.status = 0
self.node =""
def __repr__(self):
return "[%s start=%d]" % (self.name, self.start)
class DAG(object):
def __init__(self, raw):
self.raw = raw
self.name = raw.dag
self.kvs = csv_kv(raw.args)
self.start = (int)(self.kvs["startTime"])
self.finish = (int)(self.kvs["finishTime"])
self.duration = (int)(self.kvs["timeTaken"])
def structure(self, vertexes):
self.vertexes = [v for v in vertexes if v.dag == self.name]
def attempts(self):
for v in self.vertexes:
for t in v.tasks:
for a in t.attempts:
if(a.dag == self.name):
yield a
def __repr__(self):
return "%s (%d+%d)" % (self.name, self.start, self.duration)
class Vertex(object):
def __init__(self, raw):
self.raw = raw
self.dag = raw.dag
self.kvs = csv_kv(raw.args)
self.name = self.kvs["vertexName"]
self.initZero = (int)(self.kvs["initRequestedTime"])
self.init = (int)(self.kvs["initedTime"])
self.startZero = (int)(self.kvs["startRequestedTime"])
self.start = (int)(self.kvs["startedTime"])
self.finish = (int)(self.kvs["finishTime"])
self.duration = (int)(self.kvs["timeTaken"])
def structure(self, tasks):
self.tasks = [t for t in tasks if t.vertex == self.name]
def __repr__(self):
return "%s (%d+%d)" % (self.name, self.start, self.duration)
class Task(object):
def __init__(self, raw):
self.raw = raw
self.dag = raw.dag
self.kvs = csv_kv(raw.args)
self.vertex = self.kvs["vertexName"]
self.name = self.kvs["taskId"]
self.start = (int)(self.kvs["startTime"])
self.finish = (int)(self.kvs["finishTime"])
self.duration = (int)(self.kvs["timeTaken"])
def structure(self, attempts):
self.attempts = [a for a in attempts if a.task == self.name]
def __repr__(self):
return "%s (%d+%d)" % (self.name, self.start, self.duration)
class Attempt(object):
def __init__(self, pair):
start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair))
finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair))
self.raw = finish
self.dag = finish.dag
self.kvs = csv_kv(start.args)
self.kvs.update(csv_kv(finish.args))
self.name = self.kvs["taskAttemptId"]
self.task = self.name[:self.name.rfind("_")].replace("attempt","task")
(_, _, amid, dagid, vertexid, taskid, attemptid) = self.name.split("_")
self.tasknum = int(taskid)
self.attemptnum = int(attemptid)
self.vertex = self.kvs["vertexName"]
self.start = (int)(self.kvs["startTime"])
self.finish = (int)(self.kvs["finishTime"])
self.duration = (int)(self.kvs["timeTaken"])
self.container = self.kvs["containerId"]
self.node = self.kvs["nodeId"]
def __repr__(self):
return "%s (%d+%d)" % (self.name, self.start, self.duration)
def open_file(f):
if(f.endswith(".gz")):
return GZFile(f)
elif(f.endswith(".bz2")):
return BZ2File(f)
elif(f.startswith("http://")):
return urlopen(f)
return open(f)
class AMLog(object):
def init(self):
ID=r'[^\]]*'
TS=r'[0-9:\-, ]*'
MAIN_RE=r'^(?P<ts>%(ts)s) INFO [(?P<thread>%(id)s)] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)'
MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]')
MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID}
self.MAIN_RE = re.compile(MAIN_RE)
def __init__(self, f):
fp = open_file(f)
self.init()
self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp])
def structure(self):
am = self.appmaster() # this is a copy
containers = dict([(a.name, a) for a in self.containers()])
dags = self.dags()
vertexes = self.vertexes()
tasks = self.tasks()
attempts = self.attempts()
for t in tasks:
t.structure(attempts)
for v in vertexes:
v.structure(tasks)
for d in dags:
d.structure(vertexes)
for a in attempts:
c = containers[a.container]
c.node = a.node
am.containers = containers
am.dags = dags
return am
def appmaster(self):
return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"])
def containers(self):
containers = [Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED"]
containermap = dict([(c.name, c) for c in containers])
for ev in self.events:
if ev.event == "CONTAINER_STOPPED":
kvs = csv_kv(ev.args)
if containermap.has_key(kvs["containerId"]):
containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"])
containermap[kvs["containerId"]].status = int(kvs["exitStatus"])
return containers
def dags(self):
dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"]
return dags
def vertexes(self):
""" yes, not vertices """
vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"]
return vertexes
def tasks(self):
tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"]
return tasks
def attempts(self):
key = lambda a:a[0]
value = lambda a:a[1]
raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"]
pairs = groupby(sorted(raw), key = key)
attempts = [Attempt(map(value,p)) for (k,p) in pairs]
return attempts
def parse(self, l):
if(l.find("[HISTORY]") != -1):
m = self.MAIN_RE.match(l)
ts = m.group("ts")
dag = m.group("dag")
event = m.group("event")
args = m.group("args")
return AMRawEvent(ts, dag, event, args)
def main(argv):
f = argv[0]
tree = AMLog(argv[0]).structure()
# AM -> dag -> vertex -> task -> attempt
# AM -> container
containers = set(tree.containers.keys())
timeto = lambda a: (a - tree.zero)
for d in tree.dags:
for a in d.attempts():
print [a.vertex, a.name, a.container, a.start, a.finish]
if __name__ == "__main__":
main(sys.argv[1:])