/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.dag.app;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGKillRequestEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;

import com.google.common.annotations.VisibleForTesting;

public class RecoveryParser {

  private static final Logger LOG = LoggerFactory.getLogger(RecoveryParser.class);

  private final DAGAppMaster dagAppMaster;
  private final FileSystem recoveryFS;
  private final Path recoveryDataDir;
  private final Path currentAttemptRecoveryDataDir;
  private final int recoveryBufferSize;
  private final int currentAttemptId;

  public RecoveryParser(DAGAppMaster dagAppMaster,
      FileSystem recoveryFS,
      Path recoveryDataDir,
      int currentAttemptId) throws IOException {
    this.dagAppMaster = dagAppMaster;
    this.recoveryFS = recoveryFS;
    this.recoveryDataDir = recoveryDataDir;
    this.currentAttemptId = currentAttemptId;
    this.currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
        currentAttemptId);
    recoveryBufferSize = dagAppMaster.getConfig().getInt(
        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
    this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
  }

  public static class RecoveredDAGData {
    public TezDAGID recoveredDagID = null;
    public DAGImpl recoveredDAG = null;
    public DAGState dagState = null;
    public boolean isCompleted = false;
    public boolean nonRecoverable = false;
    public boolean isSessionStopped = false;
    public String reason = null;
    public Map<String, LocalResource> cumulativeAdditionalResources = null;
  }

  private static void parseSummaryFile(FSDataInputStream inputStream)
      throws IOException {
    while (true) {
      RecoveryProtos.SummaryEventProto proto =
          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
      if (proto == null) {
        LOG.info("Reached end of summary stream");
        break;
      }
      LOG.info("[SUMMARY]"
          + " dagId=" + proto.getDagId()
          + ", timestamp=" + proto.getTimestamp()
          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
    }
  }

  private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
      throws IOException {
    int eventTypeOrdinal = -1;
    try {
      eventTypeOrdinal = inputStream.readInt();
    } catch (EOFException eof) {
      return null;
    }
    if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
        HistoryEventType.values().length) {
      // Corrupt data
      // reached end
      throw new IOException("Corrupt data found when trying to read next event type"
          + ", eventTypeOrdinal=" + eventTypeOrdinal);
    }
    HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
    HistoryEvent event;
    switch (eventType) {
      case AM_LAUNCHED:
        event = new AMLaunchedEvent();
        break;
      case AM_STARTED:
        event = new AMStartedEvent();
        break;
      case DAG_SUBMITTED:
        event = new DAGSubmittedEvent();
        break;
      case DAG_INITIALIZED:
        event = new DAGInitializedEvent();
        break;
      case DAG_STARTED:
        event = new DAGStartedEvent();
        break;
      case DAG_COMMIT_STARTED:
        event = new DAGCommitStartedEvent();
        break;
      case DAG_FINISHED:
        event = new DAGFinishedEvent();
        break;
      case DAG_KILL_REQUEST:
        event = new DAGKillRequestEvent();
        break;
      case CONTAINER_LAUNCHED:
        event = new ContainerLaunchedEvent();
        break;
      case CONTAINER_STOPPED:
        event = new ContainerStoppedEvent();
        break;
      case VERTEX_INITIALIZED:
        event = new VertexInitializedEvent();
        break;
      case VERTEX_STARTED:
        event = new VertexStartedEvent();
        break;
      case VERTEX_PARALLELISM_UPDATED:
        event = new VertexParallelismUpdatedEvent();
        break;
      case VERTEX_COMMIT_STARTED:
        event = new VertexCommitStartedEvent();
        break;
      case VERTEX_GROUP_COMMIT_STARTED:
        event = new VertexGroupCommitStartedEvent();
        break;
      case VERTEX_GROUP_COMMIT_FINISHED:
        event = new VertexGroupCommitFinishedEvent();
        break;
      case VERTEX_FINISHED:
        event = new VertexFinishedEvent();
        break;
      case TASK_STARTED:
        event = new TaskStartedEvent();
        break;
      case TASK_FINISHED:
        event = new TaskFinishedEvent();
        break;
      case TASK_ATTEMPT_STARTED:
        event = new TaskAttemptStartedEvent();
        break;
      case TASK_ATTEMPT_FINISHED:
        event = new TaskAttemptFinishedEvent();
        break;
      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
        event = new VertexRecoverableEventsGeneratedEvent();
        break;
      default:
        throw new IOException("Invalid data found, unknown event type "
            + eventType);

    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Parsing event from input stream"
          + ", eventType=" + eventType);
    }
    try {
      event.fromProtoStream(inputStream);
    } catch (EOFException eof) {
      return null;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Parsed event from input stream"
          + ", eventType=" + eventType
          + ", event=" + event.toString());
    }
    return event;
  }





  public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream)
      throws IOException {
    List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
    while (true) {
      HistoryEvent historyEvent = getNextEvent(inputStream);
      if (historyEvent == null) {
        LOG.info("Reached end of stream");
        break;
      }
      historyEvents.add(historyEvent);
    }
    return historyEvents;
  }

  public static void main(String argv[]) throws IOException {
    // TODO clean up with better usage and error handling
    Configuration conf = new Configuration();
    String summaryPath = argv[0];
    List<String> dagPaths = new ArrayList<String>();
    if (argv.length > 1) {
      for (int i = 1; i < argv.length; ++i) {
        dagPaths.add(argv[i]);
      }
    }
    FileSystem fs = FileSystem.get(conf);
    LOG.info("Parsing Summary file " + summaryPath);
    parseSummaryFile(fs.open(new Path(summaryPath)));
    for (String dagPath : dagPaths) {
      LOG.info("Parsing DAG recovery file " + dagPath);
      List<HistoryEvent> historyEvents = parseDAGRecoveryFile(fs.open(new Path(dagPath)));
      for (HistoryEvent historyEvent : historyEvents) {
        LOG.info("Parsed event from recovery stream"
            + ", eventType=" + historyEvent.getEventType()
            + ", event=" + historyEvent);
      }
    }
  }

  private Path getSummaryPath(Path attemptRrecoveryDataDir) {
    return TezCommonUtils.getSummaryRecoveryPath(attemptRrecoveryDataDir);
  }

  private FSDataInputStream getSummaryStream(Path summaryPath)
      throws IOException {
    if (!recoveryFS.exists(summaryPath)) {
      return null;
    }
    return recoveryFS.open(summaryPath, recoveryBufferSize);
  }

  private Path getDAGRecoveryFilePath(Path recoveryDataDir,
      TezDAGID dagID) {
    return new Path(recoveryDataDir,
        dagID.toString() + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
  }

  @VisibleForTesting
  DAGSummaryData getLastCompletedOrInProgressDAG(
      Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
    DAGSummaryData inProgressDAG = null;
    DAGSummaryData lastCompletedDAG = null;
    for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
      if (!entry.getValue().completed) {
        if (inProgressDAG != null) {
          throw new RuntimeException("Multiple in progress DAGs seen"
              + ", dagId=" + inProgressDAG.dagId
              + ", dagId=" + entry.getKey());
        }
        inProgressDAG = entry.getValue();
      } else {
        if (lastCompletedDAG == null ||
            lastCompletedDAG.dagId.getId() < entry.getValue().dagId.getId()) {
          lastCompletedDAG = entry.getValue();
        }
      }
    }
    if (inProgressDAG == null) {
      return lastCompletedDAG;
    }
    return inProgressDAG;
  }

  @VisibleForTesting
  static class DAGSummaryData {

    final TezDAGID dagId;
    boolean completed = false;
    boolean dagCommitCompleted = true;
    DAGState dagState;
    Map<TezVertexID, Boolean> vertexCommitStatus =
        new HashMap<TezVertexID, Boolean>();
    Map<String, Boolean> vertexGroupCommitStatus =
        new HashMap<String, Boolean>();
    List<HistoryEvent> bufferedSummaryEvents =
        new ArrayList<HistoryEvent>();

    DAGSummaryData(TezDAGID dagId) {
      this.dagId = dagId;
    }

    void handleSummaryEvent(SummaryEventProto proto) throws IOException {
      HistoryEventType eventType =
          HistoryEventType.values()[proto.getEventType()];
      switch (eventType) {
        case DAG_SUBMITTED:
          completed = false;
          DAGSubmittedEvent dagSubmittedEvent = new DAGSubmittedEvent();
          dagSubmittedEvent.fromSummaryProtoStream(proto);
          break;
        case DAG_FINISHED:
          completed = true;
          dagCommitCompleted = true;
          DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
          dagFinishedEvent.fromSummaryProtoStream(proto);
          dagState = dagFinishedEvent.getState();
          break;
        case DAG_KILL_REQUEST:
          DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent();
          killRequestEvent.fromSummaryProtoStream(proto);
          bufferedSummaryEvents.add(killRequestEvent);
          break;
        case DAG_COMMIT_STARTED:
          dagCommitCompleted = false;
          break;
        case VERTEX_COMMIT_STARTED:
          VertexCommitStartedEvent vertexCommitStartedEvent =
              new VertexCommitStartedEvent();
          vertexCommitStartedEvent.fromSummaryProtoStream(proto);
          vertexCommitStatus.put(
              vertexCommitStartedEvent.getVertexID(), false);
          break;
        case VERTEX_FINISHED:
          VertexFinishedEvent vertexFinishedEvent =
              new VertexFinishedEvent();
          vertexFinishedEvent.fromSummaryProtoStream(proto);
          if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
            vertexCommitStatus.put(
                vertexFinishedEvent.getVertexID(), true);
            bufferedSummaryEvents.add(vertexFinishedEvent);
          }
          break;
        case VERTEX_GROUP_COMMIT_STARTED:
          VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
              new VertexGroupCommitStartedEvent();
          vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
          bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
          vertexGroupCommitStatus.put(
              vertexGroupCommitStartedEvent.getVertexGroupName(), false);
          break;
        case VERTEX_GROUP_COMMIT_FINISHED:
          VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
              new VertexGroupCommitFinishedEvent();
          vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
          bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
          vertexGroupCommitStatus.put(
              vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
          break;
        default:
          String message = "Found invalid summary event that was not handled"
              + ", eventType=" + eventType.name();
          throw new IOException(message);
      }
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("dagId=").append(dagId);
      sb.append(", dagCompleted=").append(completed);
      if (!vertexCommitStatus.isEmpty()) {
        sb.append(", vertexCommitStatuses=[");
        for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) {
          sb.append("{ vertexId=").append(entry.getKey())
              .append(", committed=").append(entry.getValue()).append("}, ");
        }
        sb.append("]");
      }
      if (!vertexGroupCommitStatus.isEmpty()) {
        sb.append(", vertexGroupCommitStatuses=[");
        for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) {
          sb.append("{ vertexGroup=").append(entry.getKey())
              .append(", committed=").append(entry.getValue()).append("}, ");
        }
        sb.append("]");
      }
      return sb.toString();
    }
  }

  private String isDAGRecoverable(DAGSummaryData data) {
    if (!data.dagCommitCompleted) {
      return "DAG Commit was in progress, not recoverable"
          + ", dagId=" + data.dagId;
    }
    if (!data.vertexCommitStatus.isEmpty()) {
      for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
        if (!(entry.getValue().booleanValue())) {
          return "Vertex Commit was in progress, not recoverable"
              + ", dagId=" + data.dagId
              + ", vertexId=" + entry.getKey();
        }
      }
    }
    if (!data.vertexGroupCommitStatus.isEmpty()) {
      for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
        if (!(entry.getValue().booleanValue())) {
          return "Vertex Group Commit was in progress, not recoverable"
              + ", dagId=" + data.dagId
              + ", vertexGroup=" + entry.getKey();
        }
      }
    }
    return null;
  }

  private List<Path> getSummaryFiles() throws IOException {
    List<Path> summaryFiles = new ArrayList<Path>();
    for (int i = 1; i < currentAttemptId; ++i) {
      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
      Path fatalErrorOccurred = new Path(attemptPath,
          RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
      if (recoveryFS.exists(fatalErrorOccurred)) {
        throw new IOException("Found that a fatal error occurred in"
            + " recovery during previous attempt, foundFile="
            + fatalErrorOccurred.toString());
      }
      Path summaryFile = getSummaryPath(attemptPath);
      if (recoveryFS.exists(summaryFile)) {
        summaryFiles.add(summaryFile);
      }
    }
    return summaryFiles;
  }

  private List<Path> getDAGRecoveryFiles(TezDAGID dagId) throws IOException {
    List<Path> recoveryFiles = new ArrayList<Path>();
    for (int i = 1; i < currentAttemptId; ++i) {
      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
      Path recoveryFile = getDAGRecoveryFilePath(attemptPath, dagId);
      if (recoveryFS.exists(recoveryFile)) {
        recoveryFiles.add(recoveryFile);
      }
    }
    return recoveryFiles;
  }

  public RecoveredDAGData parseRecoveryData() throws IOException {
    int dagCounter = 0;
    Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
        new HashMap<TezDAGID, DAGSummaryData>();
    List<Path> summaryFiles = getSummaryFiles();
    for (Path summaryFile : summaryFiles) {
      FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile);
      LOG.info("Parsing summary file"
          + ", path=" + summaryFile.toString()
          + ", len=" + summaryFileStatus.getLen()
          + ", lastModTime=" + summaryFileStatus.getModificationTime());
      FSDataInputStream summaryStream = getSummaryStream(
          summaryFile);
      while (true) {
        RecoveryProtos.SummaryEventProto proto;
        try {
          proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
          if (proto == null) {
            LOG.info("Reached end of summary stream");
            break;
          }
        } catch (EOFException eof) {
          LOG.info("Reached end of summary stream");
          break;
        }
        HistoryEventType eventType =
            HistoryEventType.values()[proto.getEventType()];
        if (LOG.isDebugEnabled()) {
          LOG.debug("[RECOVERY SUMMARY]"
              + " dagId=" + proto.getDagId()
              + ", timestamp=" + proto.getTimestamp()
              + ", event=" + eventType);
        }
        TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
        if (dagId == null) {
          throw new IOException("null dagId, summary records may be corrupted");
        }
        if (dagCounter < dagId.getId()) {
          dagCounter = dagId.getId();
        }
        if (!dagSummaryDataMap.containsKey(dagId)) {
          dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
        }
        try {
          dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
        } catch (Exception e) {
          // any exception when parsing protobuf
          throw new IOException("Error when parsing summary event proto", e);
        }
      }
      summaryStream.close();
    }

    // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster
    dagAppMaster.setDAGCounter(dagCounter);
    for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){
      dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
    }

    DAGSummaryData lastInProgressDAGData =
        getLastCompletedOrInProgressDAG(dagSummaryDataMap);
    if (lastInProgressDAGData == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }
    TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
    if (lastInProgressDAG == null) {
      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
      return null;
    }

    LOG.info("Checking if DAG is in recoverable state"
        + ", dagId=" + lastInProgressDAGData.dagId);

    final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
    if (lastInProgressDAGData.completed) {
      recoveredDAGData.isCompleted = true;
      recoveredDAGData.dagState = lastInProgressDAGData.dagState;
    }

    String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
    if (nonRecoverableReason != null) {
      LOG.warn("Found last inProgress DAG but not recoverable: "
          + lastInProgressDAGData);
      recoveredDAGData.nonRecoverable = true;
      recoveredDAGData.reason = nonRecoverableReason;
    }

    List<Path> dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG);
    boolean skipAllOtherEvents = false;
    Path lastRecoveryFile = null;
    for (Path dagRecoveryFile : dagRecoveryFiles) {
      if (skipAllOtherEvents) {
        LOG.warn("Other recovery files will be skipped due to error in the previous recovery file"
            + lastRecoveryFile);
        break;
      }
      lastRecoveryFile = dagRecoveryFile;
      LOG.info("Trying to recover dag from recovery file"
          + ", dagId=" + lastInProgressDAG.toString()
          + ", dagRecoveryFile=" + dagRecoveryFile);
      FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
      while (true) {
        HistoryEvent event;
        try {
          event = getNextEvent(dagRecoveryStream);
          if (event == null) {
            LOG.info("Reached end of dag recovery stream");
            break;
          }
        } catch (EOFException eof) {
          LOG.info("Reached end of dag recovery stream");
          break;
        } catch (IOException ioe) {
          LOG.warn("Corrupt data found when trying to read next event", ioe);
          break;
        }
        if (skipAllOtherEvents) {
          // hit an error - skip reading other events
          break;
        }
        HistoryEventType eventType = event.getEventType();
        switch (eventType) {
          case DAG_SUBMITTED:
          {
            DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
                lastInProgressDAG);
            recoveredDAGData.cumulativeAdditionalResources = submittedEvent
              .getCumulativeAdditionalLocalResources();
            recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
            dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
            if (recoveredDAGData.nonRecoverable) {
              skipAllOtherEvents = true;
            }
            break;
          }
          case DAG_INITIALIZED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            break;
          }
          case DAG_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            break;
          }
          case DAG_COMMIT_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            break;
          }
          case VERTEX_GROUP_COMMIT_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            break;
          }
          case VERTEX_GROUP_COMMIT_FINISHED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            break;
          }
          case DAG_KILL_REQUEST:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            break;
          }
          case DAG_FINISHED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            // If this is seen, nothing to recover
            assert recoveredDAGData.recoveredDAG != null;
            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
            recoveredDAGData.isCompleted = true;
            recoveredDAGData.dagState =
                ((DAGFinishedEvent) event).getState();
            skipAllOtherEvents = true;
            break;
          }
          case CONTAINER_LAUNCHED:
          {
            // Nothing to do for now
            break;
          }
          case VERTEX_INITIALIZED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          case VERTEX_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexStartedEvent vEvent = (VertexStartedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          case VERTEX_PARALLELISM_UPDATED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          case VERTEX_COMMIT_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          case VERTEX_FINISHED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          case TASK_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            TaskStartedEvent tEvent = (TaskStartedEvent) event;
            Task task = recoveredDAGData.recoveredDAG.getVertex(
                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
            task.restoreFromEvent(tEvent);
            break;
          }
          case TASK_FINISHED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
            Task task = recoveredDAGData.recoveredDAG.getVertex(
                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
            task.restoreFromEvent(tEvent);
            break;
          }
          case TASK_ATTEMPT_STARTED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
            Task task =
                recoveredDAGData.recoveredDAG.getVertex(
                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
                        .getTask(tEvent.getTaskAttemptID().getTaskID());
            task.restoreFromEvent(tEvent);
            break;
          }
          case TASK_ATTEMPT_FINISHED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
            Task task =
                recoveredDAGData.recoveredDAG.getVertex(
                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
                    .getTask(tEvent.getTaskAttemptID().getTaskID());
            task.restoreFromEvent(tEvent);
            break;
          }
          case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
          {
            LOG.info("Recovering from event"
                + ", eventType=" + eventType
                + ", event=" + event.toString());
            assert recoveredDAGData.recoveredDAG != null;
            VertexRecoverableEventsGeneratedEvent vEvent =
                (VertexRecoverableEventsGeneratedEvent) event;
            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
            v.restoreFromEvent(vEvent);
            break;
          }
          default:
            throw new RuntimeException("Invalid data found, unknown event type "
                + eventType);
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("[DAG RECOVERY]"
              + " dagId=" + lastInProgressDAG
              + ", eventType=" + eventType
              + ", event=" + event.toString());
        }
      }
      dagRecoveryStream.close();
    }

    if (!recoveredDAGData.isCompleted
        && !recoveredDAGData.nonRecoverable) {
      if (lastInProgressDAGData.bufferedSummaryEvents != null
        && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
        for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
          assert recoveredDAGData.recoveredDAG != null;
          switch (bufferedEvent.getEventType()) {
            case VERTEX_GROUP_COMMIT_STARTED:
              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
              break;
            case VERTEX_GROUP_COMMIT_FINISHED:
              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
              break;
            case VERTEX_FINISHED:
              VertexFinishedEvent vertexFinishedEvent =
                  (VertexFinishedEvent) bufferedEvent;
              Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
                  vertexFinishedEvent.getVertexID());
              if (vertex == null) {
                recoveredDAGData.nonRecoverable = true;
                recoveredDAGData.reason = "All state could not be recovered"
                    + ", vertex completed but events not flushed"
                    + ", vertexId=" + vertexFinishedEvent.getVertexID();
              } else {
                vertex.restoreFromEvent(vertexFinishedEvent);
              }
              break;
            case DAG_KILL_REQUEST:
              DAGKillRequestEvent killRequestEvent = (DAGKillRequestEvent)bufferedEvent;
              recoveredDAGData.isSessionStopped = killRequestEvent.isSessionStopped();
              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
              break;
            default:
              throw new RuntimeException("Invalid data found in buffered summary events"
                  + ", unknown event type "
                  + bufferedEvent.getEventType());
          }
        }
      }
    }

    return recoveredDAGData;
  }

}
