/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram;

import java.io.*;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;

/**
 * Write ahead log for DAG changes.
 * Operations need to be registered with the journal instance before writing.
 * Registered prototype instances will be used to apply changes on read.
 *
 * @since 0.9.2
 */
public final class Journal
{
  private final static Logger LOG = LoggerFactory.getLogger(Journal.class);

  private enum RecoverableOperation
  {
    OPERATOR_STATE             (PTOperator.SET_OPERATOR_STATE),
    CONTAINER_STATE            (PTContainer.SET_CONTAINER_STATE),
    OPERATOR_PROPERTY          (StreamingContainerManager.SET_OPERATOR_PROPERTY),
    PHYSICAL_OPERATOR_PROPERTY (StreamingContainerManager.SET_PHYSICAL_OPERATOR_PROPERTY);

    private static final Map<Class<? extends Recoverable>, Integer> classToId;

    static
    {
      final ImmutableMap.Builder<Class<? extends Recoverable>, Integer> builder = ImmutableMap.builder();
      for (RecoverableOperation recoverableOperation : RecoverableOperation.values()) {
        builder.put(recoverableOperation.operation.getClass(), recoverableOperation.ordinal());
      }
      classToId = builder.build();
    }

    private final Recoverable operation;

    RecoverableOperation(Recoverable operation)
    {
      this.operation = operation;
    }

    private static RecoverableOperation get(int id)
    {
      return (id < values().length)? values()[id] : null;
    }

    private static Integer getId(Class<? extends Recoverable> operationClass)
    {
      return classToId.get(operationClass);
    }
  }

  public interface Recoverable
  {
    void read(Object object, Input in) throws KryoException;
    void write(Output out) throws KryoException;
  }

  private final StreamingContainerManager scm;
  private final AtomicReference<Output> output;
  private final AtomicBoolean replayMode;

  public Journal(StreamingContainerManager scm)
  {
    this.scm = scm;
    output = new AtomicReference<Output>();
    replayMode = new AtomicBoolean(false);
  }

  public void setOutputStream(@Nullable final OutputStream out) throws IOException
  {
    final Output output;
    if (out != null) {
      output = new Output(4096, -1) {
        @Override
        public void flush() throws KryoException {
          super.flush();
          // Kryo does not flush internal output stream during flush. We need to flush it explicitly.
          try {
            getOutputStream().flush();
          } catch (IOException e) {
            throw new KryoException(e);
          }
        }
      };
      output.setOutputStream(out);
    } else {
      output = null;
    }

    final Output oldOut = this.output.getAndSet(output);
    if (oldOut != null && oldOut.getOutputStream() != out) {
      synchronized (oldOut) {
        oldOut.close();
      }
    }
  }

  final void write(Recoverable op)
  {
    if (replayMode.get()) {
      throw new IllegalStateException("Request to write while journal is replaying operations");
    }
    Integer classId = RecoverableOperation.getId(op.getClass());
    if (classId == null) {
      throw new IllegalArgumentException("Class not registered " + op.getClass());
    }
    while (true) {
      final Output out = output.get();
      if (out != null) {
        // need to atomically write id, operation and flush the output stream
        synchronized (out) {
          try {
            LOG.debug("WAL write {}", RecoverableOperation.get(classId));
            out.writeInt(classId);
            op.write(out);
            out.flush();
            break;
          }
          catch (KryoException e) {
            // check that no other threads sneaked between get() and synchronized block and set output stream to a new
            // stream or null leading to the current stream being closed
            if (output.get() == out) {
              throw e;
            }
          }
        }
      } else {
        LOG.warn("Journal output stream is null. Skipping write to the WAL.");
        break;
      }
    }
  }

  final void replay(final InputStream input)
  {
    if (replayMode.compareAndSet(false, true)) {
      Input in = new Input(input);
      try {
        LOG.debug("Start replaying WAL");
        while (!in.eof()) {
          final int opId = in.readInt();
          final RecoverableOperation recoverableOperation = RecoverableOperation.get(opId);
          if (recoverableOperation == null) {
            throw new IllegalArgumentException("No reader registered for id " + opId);
          }
          LOG.debug("Replaying {}", recoverableOperation);
          switch (recoverableOperation) {
            case OPERATOR_STATE:
            case CONTAINER_STATE:
              recoverableOperation.operation.read(scm.getPhysicalPlan(), in);
              break;
            case OPERATOR_PROPERTY:
            case PHYSICAL_OPERATOR_PROPERTY:
              recoverableOperation.operation.read(scm, in);
              break;
            default:
              throw new IllegalArgumentException("Unsupported recoverable operation " + recoverableOperation);
          }
        }
      } finally {
        LOG.debug("Done replaying WAL");
        replayMode.set(false);
      }
    }
    else {
      throw new IllegalStateException("Request to replay while journal is already replaying other operations");
    }
  }

}
