/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.deadlock.MessageDependencyMonitor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.sequencelog.MessageLogger;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.internal.util.Breadcrumbs;

/**
 * <P>
 * A <code>DistributionMessage</code> carries some piece of information to a distribution manager.
 * </P>
 *
 * <P>
 * Messages that don't have strict ordering requirements should extend
 * {@link org.apache.geode.distributed.internal.PooledDistributionMessage}. Messages that must be
 * processed serially in the order they were received can extend
 * {@link org.apache.geode.distributed.internal.SerialDistributionMessage}. To customize the
 * sequentialness/thread requirements of a message, extend DistributionMessage and implement
 * getExecutor().
 * </P>
 */
public abstract class DistributionMessage implements DataSerializableFixedID, Cloneable {

  /**
   * WARNING: setting this to true may break dunit tests.
   * <p>
   * see org.apache.geode.cache30.ClearMultiVmCallBkDUnitTest
   */
  private static final boolean INLINE_PROCESS =
      !Boolean.getBoolean("DistributionManager.enqueueOrderedMessages");

  private static final Logger logger = LogService.getLogger();

  /**
   * Indicates that a distribution message should be sent to all other distribution managers.
   */
  @Immutable
  public static final InternalDistributedMember ALL_RECIPIENTS = null;

  // common flags used by operation messages
  /** Keep this compatible with the other GFE layer PROCESSOR_ID flags. */
  protected static final short HAS_PROCESSOR_ID = 0x1;
  /** Flag set when this message carries a transactional member in context. */
  protected static final short HAS_TX_MEMBERID = 0x2;
  /** Flag set when this message carries a transactional context. */
  protected static final short HAS_TX_ID = 0x4;
  /** Flag set when this message is a possible duplicate. */
  protected static final short POS_DUP = 0x8;
  /** Indicate time statistics capturing as part of this message processing */
  protected static final short ENABLE_TIMESTATS = 0x10;
  /** If message sender has set the processor type to be used explicitly. */
  protected static final short HAS_PROCESSOR_TYPE = 0x20;

  /** the unreserved flags start for child classes */
  protected static final short UNRESERVED_FLAGS_START = (HAS_PROCESSOR_TYPE << 1);

  //////////////////// Instance Fields ////////////////////

  /** The sender of this message */
  protected transient InternalDistributedMember sender;

  /** A set of recipients for this message, not serialized */
  private transient InternalDistributedMember[] recipients = null;

  /** A timestamp, in nanos, associated with this message. Not serialized. */
  private transient long timeStamp;

  /** The number of bytes used to read this message, for statistics only */
  private transient int bytesRead = 0;

  /** true if message should be multicast; ignores recipients */
  private transient boolean multicast = false;

  /** true if messageBeingReceived stats need decrementing when done with msg */
  private transient boolean doDecMessagesBeingReceived = false;

  /**
   * This field will be set if we can send a direct ack for this message.
   */
  private transient ReplySender acker = null;

  /**
   * True if the P2P reader that received this message is a SHARED reader.
   */
  private transient boolean sharedReceiver;

  ////////////////////// Constructors //////////////////////

  protected DistributionMessage() {
    this.timeStamp = DistributionStats.getStatTime();
  }

  ////////////////////// Static Helper Methods //////////////////////

  /**
   * Get the next bit mask position while checking that the value should not exceed maximum byte
   * value.
   */
  protected static int getNextByteMask(final int mask) {
    return getNextBitMask(mask, (Byte.MAX_VALUE) + 1);
  }

  /**
   * Get the next bit mask position while checking that the value should not exceed given maximum
   * value.
   */
  protected static int getNextBitMask(int mask, final int maxValue) {
    mask <<= 1;
    if (mask > maxValue) {
      Assert.fail("exhausted bit flags with all available bits: 0x" + Integer.toHexString(mask)
          + ", max: 0x" + Integer.toHexString(maxValue));
    }
    return mask;
  }

  public static byte getNumBits(final int maxValue) {
    byte numBits = 1;
    while ((1 << numBits) <= maxValue) {
      numBits++;
    }
    return numBits;
  }

  ////////////////////// Instance Methods //////////////////////

  public void setDoDecMessagesBeingReceived(boolean v) {
    this.doDecMessagesBeingReceived = v;
  }

  public void setReplySender(ReplySender acker) {
    this.acker = acker;
  }

  public ReplySender getReplySender(DistributionManager dm) {
    if (acker != null) {
      return acker;
    } else {
      return dm;
    }
  }

  public boolean isDirectAck() {
    return acker != null;
  }

  /**
   * If true then this message most be sent on an ordered channel. If false then it can be
   * unordered.
   *
   * @since GemFire 5.5
   */
  public boolean orderedDelivery() {
    final int processorType = getProcessorType();
    switch (processorType) {
      case ClusterDistributionManager.SERIAL_EXECUTOR:
        // no need to use orderedDelivery for PR ops particularly when thread
        // does not own resources
        // case DistributionManager.PARTITIONED_REGION_EXECUTOR:
        return true;
      case ClusterDistributionManager.REGION_FUNCTION_EXECUTION_EXECUTOR:
        // allow nested distributed functions to be executed from within the
        // execution of a function
        return false;
      default:
        InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
        return (ids != null && ids.threadOwnsResources());
    }
  }

  /**
   * Sets the intended recipient of the message. If recipient is {@link #ALL_RECIPIENTS} then the
   * message will be sent to all distribution managers.
   */
  public void setRecipient(InternalDistributedMember recipient) {
    if (this.recipients != null) {
      throw new IllegalStateException(
          "Recipients can only be set once");
    }
    this.recipients = new InternalDistributedMember[] {recipient};
  }

  /**
   * Causes this message to be send using multicast if v is true.
   *
   * @since GemFire 5.0
   */
  public void setMulticast(boolean v) {
    this.multicast = v;
  }

  /**
   * Return true if this message should be sent using multicast.
   *
   * @since GemFire 5.0
   */
  public boolean getMulticast() {
    return this.multicast;
  }

  /**
   * Return true of this message should be sent via UDP instead of the direct-channel. This is
   * typically only done for messages that are broadcast to the full membership set.
   */
  public boolean sendViaUDP() {
    return false;
  }

  /**
   * Sets the intended recipient of the message. If recipient set contains {@link #ALL_RECIPIENTS}
   * then the message will be sent to all distribution managers.
   */
  public void setRecipients(Collection<? extends DistributedMember> recipients) {
    if (this.recipients != null) {
      throw new IllegalStateException(
          "Recipients can only be set once");
    }
    this.recipients = recipients
        .toArray(new InternalDistributedMember[recipients.size()]);
  }

  public void resetRecipients() {
    this.recipients = null;
    this.multicast = false;
  }

  /**
   * Returns the intended recipient(s) of this message. If the message is intended to delivered to
   * all distribution managers, then the array will contain ALL_RECIPIENTS. If the recipients have
   * not been set null is returned.
   */
  public InternalDistributedMember[] getRecipients() {
    if (this.multicast) {
      return new InternalDistributedMember[] {ALL_RECIPIENTS};
    } else if (this.recipients != null) {
      return this.recipients;
    } else {
      return new InternalDistributedMember[] {ALL_RECIPIENTS};
    }
  }

  /**
   * Returns true if message will be sent to everyone.
   */
  public boolean forAll() {
    return (this.recipients == null) || (this.multicast)
        || ((this.recipients.length > 0) && (this.recipients[0] == ALL_RECIPIENTS));
  }

  public String getRecipientsDescription() {
    if (this.recipients == null) {
      return "recipients: ALL";
    } else if (this.multicast) {
      return "recipients: multicast";
    } else if (this.recipients.length > 0 && this.recipients[0] == ALL_RECIPIENTS) {
      return "recipients: ALL";
    } else {
      StringBuffer sb = new StringBuffer(100);
      sb.append("recipients: <");
      for (int i = 0; i < this.recipients.length; i++) {
        if (i != 0) {
          sb.append(", ");
        }
        sb.append(this.recipients[i]);
      }
      sb.append(">");
      return sb.toString();
    }
  }

  /**
   * Returns the sender of this message. Note that this value is not set until this message is
   * received by a distribution manager.
   */
  public InternalDistributedMember getSender() {
    return this.sender;
  }

  /**
   * Sets the sender of this message. This method is only invoked when the message is
   * <B>received</B> by a <code>DistributionManager</code>.
   */
  public void setSender(InternalDistributedMember _sender) {
    this.sender = _sender;
  }

  /**
   * Return the Executor in which to process this message.
   */
  protected Executor getExecutor(ClusterDistributionManager dm) {
    return dm.getExecutor(getProcessorType(), sender);
  }

  public abstract int getProcessorType();

  /**
   * Processes this message. This method is invoked by the receiver of the message.
   *
   * @param dm the distribution manager that is processing the message.
   */
  protected abstract void process(ClusterDistributionManager dm);

  /**
   * Scheduled action to take when on this message when we are ready to process it.
   */
  protected void scheduleAction(final ClusterDistributionManager dm) {
    if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
      logger.trace(LogMarker.DM_VERBOSE, "Processing '{}'", this);
    }
    String reason = dm.getCancelCriterion().cancelInProgress();
    if (reason != null) {
      // throw new ShutdownException(reason);
      if (logger.isDebugEnabled()) {
        logger.debug("ScheduleAction: cancel in progress ({}); skipping<{}>", reason, this);
      }
      return;
    }
    if (MessageLogger.isEnabled()) {
      MessageLogger.logMessage(this, getSender(), dm.getDistributionManagerId());
    }
    MessageDependencyMonitor.processingMessage(this);
    long time = 0;
    if (DistributionStats.enableClockStats) {
      time = DistributionStats.getStatTime();
      dm.getStats().incMessageProcessingScheduleTime(time - getTimestamp());
    }
    setBreadcrumbsInReceiver();
    try {

      DistributionMessageObserver observer = DistributionMessageObserver.getInstance();
      if (observer != null) {
        observer.beforeProcessMessage(dm, this);
      }
      process(dm);
      if (observer != null) {
        observer.afterProcessMessage(dm, this);
      }
    } catch (CancelException e) {
      if (logger.isDebugEnabled()) {
        logger.debug("Cancelled caught processing {}: {}", this, e.getMessage(), e);
      }
    } catch (VirtualMachineError err) {
      SystemFailure.initiateFailure(err);
      // If this ever returns, rethrow the error. We're poisoned
      // now, so don't let this thread continue.
      throw err;
    } catch (Throwable t) {
      // Whenever you catch Error or Throwable, you must also
      // catch VirtualMachineError (see above). However, there is
      // _still_ a possibility that you are dealing with a cascading
      // error condition, so you also need to check to see if the JVM
      // is still usable:
      SystemFailure.checkFailure();
      logger.fatal(String.format("Uncaught exception processing %s", this), t);
    } finally {
      if (doDecMessagesBeingReceived) {
        dm.getStats().decMessagesBeingReceived(this.bytesRead);
      }
      dm.getStats().incProcessedMessages(1L);
      if (DistributionStats.enableClockStats) {
        dm.getStats().incProcessedMessagesTime(time);
      }
      Breadcrumbs.clearBreadcrumb();
      MessageDependencyMonitor.doneProcessing(this);
    }
  }

  /**
   * Schedule this message's process() method in a thread determined by getExecutor()
   */
  protected void schedule(final ClusterDistributionManager dm) {
    boolean inlineProcess = INLINE_PROCESS
        && getProcessorType() == ClusterDistributionManager.SERIAL_EXECUTOR && !isPreciousThread();

    boolean forceInline = this.acker != null || getInlineProcess() || Connection.isDominoThread();

    if (inlineProcess && !forceInline && isSharedReceiver()) {
      // If processing this message notify a serial gateway sender then don't do it inline.
      if (mayNotifySerialGatewaySender(dm)) {
        inlineProcess = false;
      }
    }

    inlineProcess |= forceInline;

    if (inlineProcess) {
      dm.getStats().incNumSerialThreads(1);
      try {
        scheduleAction(dm);
      } finally {
        dm.getStats().incNumSerialThreads(-1);
      }
    } else { // not inline
      try {
        getExecutor(dm).execute(new SizeableRunnable(this.getBytesRead()) {
          @Override
          public void run() {
            scheduleAction(dm);
          }

          @Override
          public String toString() {
            return "Processing {" + DistributionMessage.this.toString() + "}";
          }
        });
      } catch (RejectedExecutionException ex) {
        if (!dm.shutdownInProgress()) { // fix for bug 32395
          logger.warn(String.format("%s schedule() rejected", this.toString()), ex);
        }
      } catch (VirtualMachineError err) {
        SystemFailure.initiateFailure(err);
        // If this ever returns, rethrow the error. We're poisoned
        // now, so don't let this thread continue.
        throw err;
      } catch (Throwable t) {
        // Whenever you catch Error or Throwable, you must also
        // catch VirtualMachineError (see above). However, there is
        // _still_ a possibility that you are dealing with a cascading
        // error condition, so you also need to check to see if the JVM
        // is still usable:
        SystemFailure.checkFailure();
        logger.fatal(String.format("Uncaught exception processing %s", this), t);
        // I don't believe this ever happens (DJP May 2007)
        throw new InternalGemFireException(
            "Unexpected error scheduling message",
            t);
      }
    } // not inline
  }

  protected boolean mayNotifySerialGatewaySender(ClusterDistributionManager dm) {
    // subclasses should override this method if processing them may notify a serial gateway sender.
    return false;
  }

  /**
   * returns true if the current thread should not be used for inline processing. i.e., it is a
   * "precious" resource
   */
  public static boolean isPreciousThread() {
    String thrname = Thread.currentThread().getName();
    // return thrname.startsWith("Geode UDP");
    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver");
  }


  /** most messages should not force in-line processing */
  public boolean getInlineProcess() {
    return false;
  }

  /**
   * sets the breadcrumbs for this message into the current thread's name
   */
  public void setBreadcrumbsInReceiver() {
    if (Breadcrumbs.ENABLED) {
      String sender = null;
      String procId = "";
      long pid = getProcessorId();
      if (pid != 0) {
        procId = " processorId=" + pid;
      }
      if (Thread.currentThread().getName().startsWith(Connection.THREAD_KIND_IDENTIFIER)) {
        sender = procId;
      } else {
        sender = "sender=" + getSender() + procId;
      }
      if (sender.length() > 0) {
        Breadcrumbs.setReceiveSide(sender);
      }
      Object evID = getEventID();
      if (evID != null) {
        Breadcrumbs.setEventId(evID);
      }
    }
  }

  /**
   * sets breadcrumbs in a thread that is sending a message to another member
   */
  public void setBreadcrumbsInSender() {
    if (Breadcrumbs.ENABLED) {
      String procId = "";
      long pid = getProcessorId();
      if (pid != 0) {
        procId = "processorId=" + pid;
      }
      if (this.recipients != null && this.recipients.length <= 10) { // set a limit on recipients
        Breadcrumbs.setSendSide(procId + " recipients=" + Arrays.toString(this.recipients));
      } else {
        if (procId.length() > 0) {
          Breadcrumbs.setSendSide(procId);
        }
      }
      Object evID = getEventID();
      if (evID != null) {
        Breadcrumbs.setEventId(evID);
      }
    }
  }

  public EventID getEventID() {
    return null;
  }

  /**
   * This method resets the state of this message, usually releasing objects and resources it was
   * using. It is invoked after the message has been sent. Note that classes that override this
   * method should always invoke the inherited method (<code>super.reset()</code>).
   */
  public void reset() {
    resetRecipients();
    this.sender = null;
  }

  /**
   * Writes the contents of this <code>DistributionMessage</code> to the given output. Note that
   * classes that override this method should always invoke the inherited method
   * (<code>super.toData()</code>).
   */
  @Override
  public void toData(DataOutput out) throws IOException {
    // DataSerializer.writeObject(this.recipients, out); // no need to serialize; filled in later
    // ((IpAddress)this.sender).toData(out); // no need to serialize; filled in later
    // out.writeLong(this.timeStamp);
  }

  /**
   * Reads the contents of this <code>DistributionMessage</code> from the given input. Note that
   * classes that override this method should always invoke the inherited method
   * (<code>super.fromData()</code>).
   */
  @Override
  public void fromData(DataInput in) throws IOException, ClassNotFoundException {

    // this.recipients = (Set)DataSerializer.readObject(in); // no to deserialize; filled in later
    // this.sender = DataSerializer.readIpAddress(in); // no to deserialize; filled in later
    // this.timeStamp = (long)in.readLong();
  }

  /**
   * Returns a timestamp, in nanos, associated with this message.
   */
  public long getTimestamp() {
    return timeStamp;
  }

  /**
   * Sets the timestamp of this message to the current time (in nanos).
   *
   * @return the number of elapsed nanos since this message's last timestamp
   */
  public long resetTimestamp() {
    if (DistributionStats.enableClockStats) {
      long now = DistributionStats.getStatTime();
      long result = now - this.timeStamp;
      this.timeStamp = now;
      return result;
    } else {
      return 0;
    }
  }

  public void setBytesRead(int bytesRead) {
    this.bytesRead = bytesRead;
  }

  public int getBytesRead() {
    return bytesRead;
  }

  public void setSharedReceiver(boolean v) {
    this.sharedReceiver = v;
  }

  public boolean isSharedReceiver() {
    return this.sharedReceiver;
  }

  /**
   *
   * @return null if message is not conflatable. Otherwise return a key that can be used to identify
   *         the entry to conflate.
   * @since GemFire 4.2.2
   */
  public ConflationKey getConflationKey() {
    return null; // by default conflate nothing; override in subclasses
  }

  /**
   * @return the ID of the reply processor for this message, or zero if none
   * @since GemFire 5.7
   */
  public int getProcessorId() {
    return 0;
  }

  /**
   * Severe alert processing enables suspect processing at the ack-wait-threshold and issuing of a
   * severe alert at the end of the ack-severe-alert-threshold. Some messages should not support
   * this type of processing (e.g., GII, or DLockRequests)
   *
   * @return whether severe-alert processing may be performed on behalf of this message
   */
  public boolean isSevereAlertCompatible() {
    return false;
  }

  /**
   * Returns true if the message is for internal-use such as a meta-data region.
   *
   * @return true if the message is for internal-use such as a meta-data region
   * @since GemFire 7.0
   */
  public boolean isInternal() {
    return false;
  }

  /**
   * does this message carry state that will alter the content of one or more cache regions? This is
   * used to track the flight of content changes through communication channels
   */
  public boolean containsRegionContentChange() {
    return false;
  }

  /** returns the class name w/o package information. useful in logging */
  public String getShortClassName() {
    String cname = getClass().getName();
    return cname.substring(getClass().getPackage().getName().length() + 1);
  }

  @Override
  public String toString() {
    String cname = getShortClassName();
    final StringBuilder sb = new StringBuilder(cname);
    sb.append('@').append(Integer.toHexString(System.identityHashCode(this)));
    sb.append(" processorId=").append(getProcessorId());
    sb.append(" sender=").append(getSender());
    return sb.toString();
  }

  @Override
  public Version[] getSerializationVersions() {
    return null;
  }
}
