/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.admin.remote;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.geode.DataSerializer;
import org.apache.geode.admin.AlertLevel;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.AdminMessageType;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.admin.Alert;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.management.internal.AlertDetails;

/**
 * A message that is sent to an admin member or manager to notify it of an alert.
 *
 * <p>
 * Alerts are sent from remote members to the manager via {@link AlertListenerMessage} which is
 * no-ack (asynchronous). This means you cannot log a warning in one VM and immediately verify that
 * it arrives in the manager VM. You have to use {@code Mockito#timeout(long)} or
 * {@code Awaitility}.
 */
public class AlertListenerMessage extends PooledDistributionMessage implements AdminMessageType {
  @MakeNotStatic
  private static final AtomicReference<Listener> listenerRef = new AtomicReference<>();

  private int alertLevel;
  private Date date;
  private String connectionName;
  private String threadName;
  private long threadId;
  private String message;
  private String exceptionText;

  public static AlertListenerMessage create(DistributedMember recipient, int alertLevel,
      Instant timestamp,
      String connectionName, String threadName, long threadId, String message,
      String exceptionText) {
    AlertListenerMessage alertListenerMessage = new AlertListenerMessage();
    alertListenerMessage.setRecipient((InternalDistributedMember) recipient);
    alertListenerMessage.alertLevel = alertLevel;
    alertListenerMessage.date = new Date(timestamp.toEpochMilli());
    alertListenerMessage.connectionName = connectionName;
    if (alertListenerMessage.connectionName == null) {
      alertListenerMessage.connectionName = "";
    }
    alertListenerMessage.threadName = threadName;
    if (alertListenerMessage.threadName == null) {
      alertListenerMessage.threadName = "";
    }
    alertListenerMessage.threadId = threadId;
    alertListenerMessage.message = message;
    if (alertListenerMessage.message == null) {
      alertListenerMessage.message = "";
    }
    alertListenerMessage.exceptionText = exceptionText;
    if (alertListenerMessage.exceptionText == null) {
      alertListenerMessage.exceptionText = "";
    }
    return alertListenerMessage;
  }

  @Override
  public void process(ClusterDistributionManager dm) {
    Listener listener = getListener();
    if (listener != null) {
      listener.received(this);
    }

    RemoteGfManagerAgent agent = dm.getAgent();
    if (agent != null) {
      RemoteGemFireVM manager = agent.getMemberById(getSender());
      if (manager == null) {
        return;
      }
      Alert alert = new RemoteAlert(manager, alertLevel, date, connectionName, threadName, threadId,
          message, exceptionText, getSender());

      if (listener != null) {
        listener.created(alert);
      }

      agent.callAlertListener(alert);
    } else {
      /*
       * The other recipient type is a JMX Manager which needs AlertDetails so that it can send out
       * JMX notifications for the alert.
       */
      AlertDetails alertDetail = new AlertDetails(alertLevel, date, connectionName, threadName,
          threadId, message, exceptionText, getSender());

      if (listener != null) {
        listener.created(alertDetail);
      }

      dm.getSystem().handleResourceEvent(ResourceEvent.SYSTEM_ALERT, alertDetail);
    }
  }

  @Override
  public boolean sendViaUDP() {
    return true;
  }

  @Override
  public int getDSFID() {
    return ALERT_LISTENER_MESSAGE;
  }

  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    super.toData(out, context);
    out.writeInt(alertLevel);
    DataSerializer.writeObject(date, out);
    DataSerializer.writeString(connectionName, out);
    DataSerializer.writeString(threadName, out);
    out.writeLong(threadId);
    DataSerializer.writeString(message, out);
    DataSerializer.writeString(exceptionText, out);
  }

  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    super.fromData(in, context);
    alertLevel = in.readInt();
    date = DataSerializer.readObject(in);
    connectionName = DataSerializer.readString(in);
    threadName = DataSerializer.readString(in);
    threadId = in.readLong();
    message = DataSerializer.readString(in);
    exceptionText = DataSerializer.readString(in);
  }

  @Override
  public String toString() {
    return "Alert \"" + message + "\" level " + AlertLevel.forSeverity(alertLevel);
  }

  @VisibleForTesting
  public String getMessage() {
    return message;
  }

  @VisibleForTesting
  public static void addListener(Listener listener) {
    listenerRef.compareAndSet(null, listener);
  }

  @VisibleForTesting
  public static void removeListener(Listener listener) {
    listenerRef.compareAndSet(listener, null);
  }

  @VisibleForTesting
  public static Listener getListener() {
    return listenerRef.get();
  }

  @VisibleForTesting
  public interface Listener {

    void received(AlertListenerMessage message);

    void created(Alert alert);

    void created(AlertDetails alertDetails);
  }
}
