/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.tcp;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.geode.CancelCriterion;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.LogWriter;
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.VersionedDataInputStream;

/**
 * <p>
 * MsgDestreamer supports destreaming a streamed message from a tcp Connection that arrives in
 * chunks. This allows us to receive a message without needing to read it completely into a buffer
 * before we can start deserializing it.
 *
 * @since GemFire 5.0.2
 *
 */

public class MsgDestreamer {
  /**
   * If an exception occurs during deserialization of the message it will be recorded here.
   */
  private Throwable failure;
  /**
   * Used to store the deserialized message on success.
   */
  private DistributionMessage result;
  /**
   * The current failed messages reply processor id if it has one
   */
  private int RPid;
  /**
   * The thread that will be doing the deserialization of the message.
   */
  private final DestreamerThread t;

  private int size;

  final CancelCriterion stopper;

  final KnownVersion version;

  public MsgDestreamer(DMStats stats, CancelCriterion stopper, KnownVersion v) {
    this.stopper = stopper;
    this.t = new DestreamerThread(stats, stopper);
    this.version = v;
    init();
  }

  private void init() {
    this.t.start();
  }

  public void close() {
    reset();
    this.t.close();
  }

  public void reset() {
    synchronized (this) {
      this.failure = null;
      this.result = null;
    }
    this.size = 0;
    this.t.setName("IDLE p2pDestreamer");
  }

  public void setName(String name) {
    this.t.setName("p2pDestreamer for " + name);
  }

  private void waitUntilDone() throws InterruptedException {
    if (this.t.isClosed() || Thread.interrupted())
      throw new InterruptedException();
    synchronized (this) {
      while (this.failure == null && this.result == null) {
        if (this.t.isClosed() || Thread.interrupted())
          throw new InterruptedException();
        this.wait(); // spurious wakeup ok
      }
    }
  }

  // private final String me = "MsgDestreamer<" + System.identityHashCode(this) + ">";

  // public String toString() {
  // return this.me;
  // }
  // private void logit(String s) {
  // LogWriterI18n l = getLogger();
  // if (l != null) {
  // l.fine(this + ": " + s);
  // }
  // }
  // private LogWriterI18n getLogger() {
  // LogWriterI18n result = null;
  // DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
  // if (ds != null) {
  // result = ds.getLogWriter();
  // }
  // return result;
  // }

  /**
   * Adds a chunk to be deserialized
   *
   * @param bb contains the bytes of the chunk
   * @param length the number of bytes in bb that are this chunk
   */
  public void addChunk(ByteBuffer bb, int length) throws IOException {
    // if this destreamer has failed or this chunk is empty just return
    if (this.failure == null && length > 0) {
      // logit("addChunk bb length=" + length);
      this.t.addChunk(bb, length);
      this.size += length;
    }
  }

  /**
   * Returns the number of bytes added to this destreamer.
   */
  public int size() {
    return this.size;
  }

  /**
   * Waits for the deserialization to complete and returns the deserialized message.
   *
   * @throws IOException A problem occurred while deserializing the message.
   * @throws ClassNotFoundException The class of an object read from <code>in</code> could not be
   *         found
   */
  public DistributionMessage getMessage()
      throws InterruptedException, IOException, ClassNotFoundException {
    // if (Thread.interrupted()) throw new InterruptedException(); not necessary done in
    // waitUntilDone
    // this.t.join();
    waitUntilDone();
    if (this.failure != null) {
      // logit("failed with" + this.failure);
      if (this.failure instanceof ClassNotFoundException) {
        throw (ClassNotFoundException) this.failure;
      } else if (this.failure instanceof IOException) {
        throw (IOException) this.failure;
      } else {
        IOException io =
            new IOException("failure during message deserialization");
        io.initCause(this.failure);
        throw io;
      }
    } else {
      // logit("result =" + this.result);
      return this.result;
    }
  }

  /**
   * Returns the reply processor id for the current failed message. Returns 0 if it does not have
   * one. Note this method should only be called after getMessage has thrown an exception.
   */
  public int getRPid() {
    return this.RPid;
  }

  protected void setFailure(Throwable ex, int RPid) {
    synchronized (this) {
      this.failure = ex;
      this.RPid = RPid;
      this.notify();
    }
  }

  protected void setResult(DistributionMessage msg) {
    synchronized (this) {
      this.result = msg;
      this.RPid = 0;
      this.notify();
    }
  }

  /**
   * Thread used to deserialize chunks into a message.
   */
  private class DestreamerThread extends Thread {
    private volatile boolean closed = false;
    final DestreamerIS is;
    final DMStats stats;

    public DestreamerThread(DMStats stats, CancelCriterion stopper) {
      setDaemon(true);
      super.setName("IDLE p2pDestreamer");
      this.is = new DestreamerIS(this, stopper);
      this.stats = stats;
    }
    // private final String me = "DestreamerThread<" + System.identityHashCode(this) + ">";
    // public String toString() {
    // return this.me;
    // }

    public void addChunk(ByteBuffer chunk, int bbLength) throws IOException {
      ByteBuffer bb = chunk.slice();
      bb.limit(bbLength);
      this.is.addChunk(bb);
    }

    @Override
    public void run() {
      for (;;) {
        if (isClosed()) {
          return;
        }
        try {
          ReplyProcessor21.initMessageRPId();
          final KnownVersion v = version;
          DataInputStream dis =
              v == null ? new DataInputStream(this.is)
                  : new VersionedDataInputStream(this.is, v);
          long startSer = this.stats.startMsgDeserialization();
          setResult((DistributionMessage) InternalDataSerializer.readDSFID(dis));
          this.stats.endMsgDeserialization(startSer);
        } catch (VirtualMachineError err) {
          SystemFailure.initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable ex) {
          // Whenever you catch Error or Throwable, you must also
          // catch VirtualMachineError (see above). However, there is
          // _still_ a possibility that you are dealing with a cascading
          // error condition, so you also need to check to see if the JVM
          // is still usable:
          SystemFailure.checkFailure();
          setFailure(ex, ReplyProcessor21.getMessageRPId());
        } finally {
          this.is.close();
          ReplyProcessor21.clearMessageRPId();
        }
      }
    }

    public void close() {
      this.closed = true;
      interrupt();
    }

    public boolean isClosed() {
      return this.closed;
    }
  }
  /**
   * This input stream waits for data to be available. Once it is provided, by a call to addChunk,
   * it will stream the data in from that chunk, signal that is has completed, and then wait for
   * another chunk.
   */
  private static class DestreamerIS extends InputStream {
    final Object dataMon = new Object();
    final Object doneMon = new Object();
    ByteBuffer data;
    final DestreamerThread owner;
    final CancelCriterion stopper;

    private class Stopper extends CancelCriterion {
      private final CancelCriterion stopper;

      Stopper(CancelCriterion stopper) {
        this.stopper = stopper;
      }

      /*
       * (non-Javadoc)
       *
       * @see org.apache.geode.CancelCriterion#cancelInProgress()
       */
      @Override
      public String cancelInProgress() {
        String reason = stopper.cancelInProgress();
        if (reason != null) {
          return reason;
        }
        if (owner.isClosed()) {
          return "owner is closed";
        }
        return null;
      }

      /*
       * (non-Javadoc)
       *
       * @see org.apache.geode.CancelCriterion#generateCancelledException(java.lang.Throwable)
       */
      @Override
      public RuntimeException generateCancelledException(Throwable e) {
        String reason = cancelInProgress();
        if (reason == null) {
          return null;
        }
        RuntimeException result = stopper.generateCancelledException(e);
        if (result != null) {
          return result;
        }
        return new DistributedSystemDisconnectedException("owner is closed");
      }
    }

    public DestreamerIS(DestreamerThread t, CancelCriterion stopper) {
      this.owner = t;
      this.data = null;
      this.stopper = new Stopper(stopper);
    }

    // public String toString() {
    // return this.owner.me;
    // }
    // private void logit(String s) {
    // LogWriterI18n l = getLogger();
    // if (l != null) {
    // l.fine(this + ": " + s);
    // }
    // }

    // private LogWriterI18n getLogger() {
    // LogWriterI18n result = null;
    // DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
    // if (ds != null) {
    // result = ds.getLogWriter();
    // }
    // return result;
    // }

    private boolean isClosed() {
      return this.owner.isClosed();
    }

    private ByteBuffer waitForData() throws InterruptedException {
      if (isClosed() || Thread.interrupted())
        throw new InterruptedException();
      synchronized (this.dataMon) {
        ByteBuffer result = this.data;
        while (result == null) {
          if (isClosed() || Thread.interrupted())
            throw new InterruptedException();
          // logit("about to dataMon wait");
          this.dataMon.wait(); // spurious wakeup ok
          // logit("after dataMon wait");
          if (isClosed() || Thread.interrupted())
            throw new InterruptedException();
          result = this.data;
        }
        return result;
      }
    }

    private void provideData(ByteBuffer bb) {
      synchronized (this.dataMon) {
        // if (bb != null) {
        // logit("MDIS: providing bb with " +
        // bb.remaining() + " bytes");
        // }
        this.data = bb;
        // logit("dataMon notify bb=" + bb);
        this.dataMon.notify();
      }
    }

    private void waitUntilDone() throws InterruptedException {
      if (isClosed() || Thread.interrupted())
        throw new InterruptedException();
      synchronized (this.doneMon) {
        while (this.data != null) {
          if (isClosed() || Thread.interrupted())
            throw new InterruptedException();
          // logit("about to doneMon wait");
          this.doneMon.wait(); // spurious wakeup ok
          // logit("after doneMon wait");
          if (isClosed() || Thread.interrupted())
            throw new InterruptedException();
        }
      }
    }

    private void signalDone() {
      synchronized (this.doneMon) {
        this.data = null;
        // logit("doneMon notify");
        this.doneMon.notify();
      }
    }

    public void addChunk(ByteBuffer bb) throws IOException {
      provideData(bb);
      for (;;) {
        stopper.checkCancelInProgress(null);
        boolean interrupted = Thread.interrupted();
        try {
          waitUntilDone();
          break;
        } catch (InterruptedException e) {
          interrupted = true;
        } finally {
          if (interrupted) {
            Thread.currentThread().interrupt();
          }
        }
      }
    }

    private ByteBuffer waitForAvailableData() throws IOException {
      boolean available = false;
      ByteBuffer myData;
      do {
        // only the thread that sets data to null ever does this check
        // so I believe it is ok to do this check outside of sync.
        myData = this.data;
        if (myData == null) {
          for (;;) {
            if (isClosed()) {
              throw new IOException("owner closed"); // TODO
            }
            stopper.checkCancelInProgress(null);
            boolean interrupted = Thread.interrupted();
            try {
              myData = waitForData();
              break;
            } catch (InterruptedException e) {
              interrupted = true;
            } finally {
              if (interrupted) {
                Thread.currentThread().interrupt();
              }
            }
          }

          if (myData == null) {
            // someone must have called close so tell our caller
            // that we were interrupted. This fixes bug 37230.
            stopper.checkCancelInProgress(null);
            throw new InternalGemFireError("bug 37230, please report to support");
          }
          // logit("received new bb with " +
          // myData.remaining() + " bytes");
        }
        int remaining = myData.remaining();
        if (remaining <= 0) {
          signalDone();
        } else {
          available = true;
        }
      } while (!available);
      return myData;
    }

    @Override
    public void close() {
      signalDone();
    }

    /**
     * See the InputStream read method for javadocs. Note that if an attempt to read past the end of
     * the wrapped ByteBuffer is done this method throws BufferUnderflowException
     */
    @Override
    public int read() throws IOException {
      ByteBuffer bb = waitForAvailableData();
      // logit("read result=" + result);
      return (bb.get() & 0xff);
    }


    /*
     * this method is not thread safe See the InputStream read method for javadocs. Note that if an
     * attempt to read past the end of the wrapped ByteBuffer is done this method throws
     * BufferUnderflowException
     */
    @Override
    public int read(byte b[], int off, int len) throws IOException {
      ByteBuffer bb = waitForAvailableData();
      int remaining = bb.remaining();
      int bytesToRead = len;
      if (remaining < len) {
        bytesToRead = remaining;
      }
      bb.get(b, off, bytesToRead);
      // logit("read[] read=" + bytesToRead);
      return bytesToRead;
    }

    @Override
    public int available() throws IOException {
      ByteBuffer bb = this.data;
      if (bb == null) {
        return 0;
      } else {
        return bb.remaining();
      }
    }

  }

  private static LogWriter getLogger() {
    LogWriter result = null;
    InternalDistributedSystem ids = InternalDistributedSystem.unsafeGetConnectedInstance();
    if (ids != null) {
      result = ids.getLogWriter();
    }
    return result;
  }
}
