#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

'''
json2dot.py: Generates Graphviz representation of Nemo DAG::toString
This file is used as backend for https://service.jangho.io/nemo-dag/
'''

import json
import re
import sys

nextIdx = 0


def propertiesToString(properties):
  return '<BR/>'.join(
    ['{}={}'.format(re.sub('Property$', '', item[0].split('.')[-1]), item[1]) for item in sorted(properties.items())])


def getIdx():
  global nextIdx
  nextIdx += 1
  return nextIdx


def stateToColor(state):
  try:
    return {'READY': '#fffbe2',
            'EXECUTING': '#e2fbff',
            'COMPLETE': '#e2ffe5',
            'FAILED_RECOVERABLE': '#ffe2e2',
            'FAILED_UNRECOVERABLE': '#e2e2e2'}[state]
  except:
    return 'white'


class PlanState:
  def __init__(self, data):
    self.id = data['planId']
    self.stages = {}
    for stage in data['stages']:
      self.stages[stage['id']] = StageState(stage)

  @classmethod
  def empty(cls):
    return cls({'planId': None, 'stages': []})

  def get(self, id):
    try:
      return self.stages[id]
    except:
      return StageState.empty()


class StageState:
  def __init__(self, data):
    self.id = data['id']
    self.state = data['state']
    self.tasks = {}
    for irVertex in data['tasks']:
      self.tasks[irVertex['id']] = TaskState(irVertex)

  @classmethod
  def empty(cls):
    return cls({'id': None, 'state': None, 'tasks': []})

  def get(self, id):
    try:
      return self.tasks[id]
    except:
      return TaskState.empty()

  @property
  def taskStateSummary(self):
    stateToNumTasks = dict()
    for taskState in self.tasks.values():
      before = stateToNumTasks.get(taskState.state, 0)
      stateToNumTasks[taskState.state] = before + 1
    return '\\n'.join(['{}: {}'.format(state, stateToNumTasks[state])
                       for state in stateToNumTasks.keys()])


class TaskState:
  def __init__(self, data):
    self.id = data['id']
    self.state = data['state']

  @classmethod
  def empty(cls):
    return cls({'id': None, 'state': None})


class DAG:
  '''
  A class for converting DAG to Graphviz representation.
  JSON representation should be formatted like what toString method in DAG.java does.
  '''

  def __init__(self, dag, planState):
    self.vertices = {}
    self.edges = []
    for vertex in dag['vertices']:
      self.vertices[vertex['id']] = Vertex(vertex['id'], vertex['properties'], planState.get(vertex['id']))
    for edge in dag['edges']:
      self.edges.append(Edge(self.vertices[edge['src']], self.vertices[edge['dst']], edge['properties']))

  @property
  def dot(self):
    dot = ''
    for vertex in self.vertices.values():
      dot += vertex.dot
    for edge in self.edges:
      dot += edge.dot
    return dot


def Vertex(id, properties, state):
  try:
    return Stage(id, properties, state)
  except:
    pass
  try:
    return LoopVertex(id, properties)
  except:
    pass
  return NormalVertex(id, properties, state)


class NormalVertex:
  def __init__(self, id, properties, state):
    self.id = id
    self.properties = properties
    self.idx = getIdx()
    self.state = state.state

  @property
  def dot(self):
    color = 'black'
    try:
      placement = self.properties['executionProperties'][
        'org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty']
      if (placement == 'Transient'):
        color = 'orange'
      if (placement == 'Reserved'):
        color = 'green'
    except:
      pass
    label = self.id
    if self.state is not None:
      label += '<BR/>({})'.format(self.state)
    try:
      label += '<BR/>{}'.format(self.properties['source'])
    except:
      pass
    try:
      transform = self.properties['transform'].split(':')
      transform_name = transform[0]
      try:
        class_name = transform[1].split('{')[0].split('.')[-1].split('$')[0].split('@')[0]
      except IndexError:
        class_name = '?'
      label += '<BR/>{}:{}'.format(transform_name, class_name)
    except:
      pass
    if ('class' in self.properties and self.properties['class'] == 'AggregationBarrierVertex'):
      shape = ', shape=box'
      label += '<BR/>AggregationBarrier'
    else:
      shape = ''
    try:
      label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(
        propertiesToString(self.properties['executionProperties']))
    except:
      pass
    dot = '{} [label=<{}>, color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color,
                                                                              stateToColor(self.state), shape)
    return dot

  @property
  def oneVertex(self):
    return self

  @property
  def logicalEnd(self):
    return self.idx


class LoopVertex:
  def __init__(self, id, properties):
    self.id = id
    self.dag = DAG(properties['DAG'], PlanState.empty())
    self.remaining_iteration = properties['remainingIteration']
    self.executionProperties = properties['executionProperties']
    self.incoming = properties['dagIncomingEdges']
    self.outgoing = properties['dagOutgoingEdges']
    self.edgeMapping = properties['edgeWithLoopToEdgeWithInternalVertex']
    self.idx = getIdx()

  @property
  def dot(self):
    label = self.id
    try:
      label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
    except:
      pass
    label += '<BR/>(Remaining iteration: {})'.format(self.remaining_iteration)
    dot = 'subgraph cluster_{} {{'.format(self.idx)
    dot += 'label = "{}";'.format(label)
    dot += self.dag.dot
    dot += '}'
    return dot

  @property
  def oneVertex(self):
    return next(iter(self.dag.vertices.values())).oneVertex

  @property
  def logicalEnd(self):
    return 'cluster_{}'.format(self.idx)

  def internalSrcFor(self, edgeWithLoopId):
    edgeId = self.edgeMapping[edgeWithLoopId]
    vertexId = list(filter(lambda v: edgeId in self.outgoing[v], self.outgoing))[0]
    return self.dag.vertices[vertexId]

  def internalDstFor(self, edgeWithLoopId):
    edgeId = self.edgeMapping[edgeWithLoopId]
    vertexId = list(filter(lambda v: edgeId in self.incoming[v], self.incoming))[0]
    return self.dag.vertices[vertexId]


class Stage:
  def __init__(self, id, properties, state):
    self.id = id
    self.properties = properties
    self.stageDAG = DAG(properties['irDag'], PlanState.empty())
    self.idx = getIdx()
    self.state = state
    self.executionProperties = self.properties['executionProperties']

  @property
  def dot(self):
    if self.state.state is None:
      state = ''
    else:
      state = ' ({})'.format(self.state.state)
    label = '{}{}'.format(self.id, state)
    if self.state.tasks:
      label += '<BR/><BR/>{} Task(s):<BR/>{}'.format(len(self.state.tasks), self.state.taskStateSummary)
    label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
    dot = 'subgraph cluster_{} {{'.format(self.idx)
    dot += 'label = <{}>;'.format(label)
    dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
    dot += self.stageDAG.dot
    dot += '}'
    return dot

  @property
  def oneVertex(self):
    return next(iter(self.stageDAG.vertices.values())).oneVertex

  @property
  def logicalEnd(self):
    return 'cluster_{}'.format(self.idx)


def Edge(src, dst, properties):
  try:
    return StageEdge(src, dst, properties)
  except:
    pass
  try:
    return RuntimeEdge(src, dst, properties)
  except:
    pass
  try:
    return IREdge(src, dst, properties)
  except:
    pass
  return NormalEdge(src, dst, properties)


class NormalEdge:
  def __init__(self, src, dst, properties):
    self.src = src
    self.dst = dst

  @property
  def dot(self):
    return '{} -> {} [ltail = {}, lhead = {}];'.format(self.src.oneVertex.idx, self.dst.oneVertex.idx,
                                                       self.src.logicalEnd, self.dst.logicalEnd)


class IREdge:
  def __init__(self, src, dst, properties):
    self.src = src
    self.dst = dst
    self.id = properties['id']
    self.executionProperties = properties['executionProperties']

  @property
  def dot(self):
    src = self.src
    dst = self.dst
    try:
      src = src.internalSrcFor(self.id)
    except:
      pass
    try:
      dst = dst.internalDstFor(self.id)
    except:
      pass
    label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.id, propertiesToString(self.executionProperties))
    return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                                                                     dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd,
                                                                     label)


class StageEdge:
  def __init__(self, src, dst, properties):
    self.src = src.stageDAG.vertices[properties['externalSrcVertexId']]
    self.dst = dst.stageDAG.vertices[properties['externalDstVertexId']]
    self.runtimeEdgeId = properties['runtimeEdgeId']
    self.executionProperties = properties['executionProperties']

  @property
  def dot(self):
    label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId,
                                                             propertiesToString(self.executionProperties))
    return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                                                                     self.dst.oneVertex.idx, self.src.logicalEnd,
                                                                     self.dst.logicalEnd, label)


class RuntimeEdge:
  def __init__(self, src, dst, properties):
    self.src = src
    self.dst = dst
    self.runtimeEdgeId = properties['runtimeEdgeId']
    self.executionProperties = properties['executionProperties']

  @property
  def dot(self):
    label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId,
                                                             propertiesToString(self.executionProperties))
    return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                                                                     self.dst.oneVertex.idx, self.src.logicalEnd,
                                                                     self.dst.logicalEnd, label)


def jsonToDot(jsonDict):
  try:
    dag = DAG(jsonDict['dag'], PlanState(jsonDict['planState']))
  except:
    dag = DAG(jsonDict, PlanState.empty())
  return 'digraph dag {compound=true; nodesep=1.0; forcelabels=true;' + dag.dot + '}'


if __name__ == "__main__":
  print(jsonToDot(json.loads(sys.stdin.read())))
