| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import re |
| import sys |
| |
| pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *') |
| counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?') |
| |
| def parse(tail): |
| result = {} |
| for n,v in re.findall(pat, tail): |
| result[n] = v |
| return result |
| |
| mapStartTime = {} |
| mapEndTime = {} |
| reduceStartTime = {} |
| reduceShuffleTime = {} |
| reduceSortTime = {} |
| reduceEndTime = {} |
| reduceBytes = {} |
| |
| for line in sys.stdin: |
| words = line.split(" ",1) |
| event = words[0] |
| attrs = parse(words[1]) |
| if event == 'MapAttempt': |
| if attrs.has_key("START_TIME"): |
| mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000 |
| elif attrs.has_key("FINISH_TIME"): |
| mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000 |
| elif event == 'ReduceAttempt': |
| if attrs.has_key("START_TIME"): |
| reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000 |
| elif attrs.has_key("FINISH_TIME"): |
| reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000 |
| reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000 |
| reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000 |
| elif event == 'Task': |
| if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"): |
| for n,v in re.findall(counterPat, attrs["COUNTERS"]): |
| if n == "File Systems.HDFS bytes written": |
| reduceBytes[attrs["TASKID"]] = int(v) |
| |
| runningMaps = {} |
| shufflingReduces = {} |
| sortingReduces = {} |
| runningReduces = {} |
| startTime = min(reduce(min, mapStartTime.values()), |
| reduce(min, reduceStartTime.values())) |
| endTime = max(reduce(max, mapEndTime.values()), |
| reduce(max, reduceEndTime.values())) |
| |
| reduces = reduceBytes.keys() |
| reduces.sort() |
| |
| print "Name reduce-output-bytes shuffle-finish reduce-finish" |
| for r in reduces: |
| print r, reduceBytes[r], reduceShuffleTime[r] - startTime, |
| print reduceEndTime[r] - startTime |
| |
| print |
| |
| for t in range(startTime, endTime): |
| runningMaps[t] = 0 |
| shufflingReduces[t] = 0 |
| sortingReduces[t] = 0 |
| runningReduces[t] = 0 |
| |
| for map in mapStartTime.keys(): |
| for t in range(mapStartTime[map], mapEndTime[map]): |
| runningMaps[t] += 1 |
| for reduce in reduceStartTime.keys(): |
| for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]): |
| shufflingReduces[t] += 1 |
| for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]): |
| sortingReduces[t] += 1 |
| for t in range(reduceSortTime[reduce], reduceEndTime[reduce]): |
| runningReduces[t] += 1 |
| |
| print "time maps shuffle merge reduce" |
| for t in range(startTime, endTime): |
| print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], |
| print runningReduces[t] |