/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal.membership.gms.messenger;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.logging.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;

import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class GMSQuorumChecker {
  private static final Logger logger = LogService.getLogger();
  private boolean isInfoEnabled = false;
  private Map<SocketAddress, MemberIdentifier> addressConversionMap;
  private GMSPingPonger pingPonger;

  private Set<MemberIdentifier> receivedAcks;

  private final GMSMembershipView lastView;

  // guarded by this
  private boolean quorumAchieved = false;
  private final JChannel channel;
  private JGAddress myAddress;
  private final long partitionThreshold;
  private Set<MemberIdentifier> oldMemberIdentifiers;
  private ConcurrentLinkedQueue<Message> messageQueue = new ConcurrentLinkedQueue<>();

  public GMSQuorumChecker(GMSMembershipView jgView, int partitionThreshold, JChannel channel,
      Set<MemberIdentifier> oldMemberIdentifiers) {
    this.lastView = jgView;
    this.partitionThreshold = partitionThreshold;
    this.channel = channel;
    this.oldMemberIdentifiers = oldMemberIdentifiers;
  }

  public void initialize() {
    receivedAcks = new ConcurrentHashSet<>();

    pingPonger = new GMSPingPonger();
    myAddress = (JGAddress) channel.down(new Event(Event.GET_LOCAL_ADDRESS));

    addressConversionMap = new ConcurrentHashMap<>(this.lastView.size());
    List<MemberIdentifier> members = this.lastView.getMembers();
    for (MemberIdentifier addr : members) {
      SocketAddress sockaddr =
          new InetSocketAddress(addr.getInetAddress(), addr.getMembershipPort());
      addressConversionMap.put(sockaddr, addr);
    }

    isInfoEnabled = logger.isInfoEnabled();
    resume();
  }


  public synchronized boolean checkForQuorum(long timeout) throws InterruptedException {
    if (quorumAchieved) {
      return true;
    }

    if (isInfoEnabled) {
      logger.info("beginning quorum check with {}", this);
    }
    sendPingMessages();
    quorumAchieved = waitForResponses(lastView.getMembers().size(), timeout);
    // If we did not achieve full quorum, calculate if we achieved quorum
    if (!quorumAchieved) {
      quorumAchieved = calculateQuorum();
    }
    return quorumAchieved;
  }


  public void suspend() {
    // NO-OP for this implementation
  }


  public void close() {
    if (channel != null && !channel.isClosed()) {
      channel.close();
    }
  }


  public void resume() {
    JGroupsMessenger.setChannelReceiver(channel, new QuorumCheckerReceiver());
  }


  public MembershipInformation getMembershipInfo() {
    return new MembershipInformation(channel, oldMemberIdentifiers, messageQueue);
  }

  private boolean calculateQuorum() {
    // quorum check
    int weight = getWeight(this.lastView.getMembers(), this.lastView.getLeadMember());
    int ackedWeight = getWeight(receivedAcks, this.lastView.getLeadMember());
    int lossThreshold = (int) Math.round((weight * this.partitionThreshold) / 100.0);
    if (isInfoEnabled) {
      logger.info(
          "quorum check: contacted {} processes with {} member weight units.  Threshold for a quorum is {}",
          receivedAcks.size(), ackedWeight, lossThreshold);
    }
    return (ackedWeight >= lossThreshold);
  }

  private boolean waitForResponses(int numMembers, long timeout) throws InterruptedException {
    long endTime = System.currentTimeMillis() + timeout;
    for (;;) {
      long time = System.currentTimeMillis();
      long remaining = (endTime - time);
      if (remaining <= 0) {
        if (isInfoEnabled) {
          logger.info("quorum check: timeout waiting for responses.  {} responses received",
              receivedAcks.size());
        }
        break;
      }
      if (isInfoEnabled) {
        logger.info("quorum check: waiting up to {}ms to receive a quorum of responses",
            remaining);
      }
      Thread.sleep(500);
      if (receivedAcks.size() == numMembers) {
        // we've heard from everyone now so we've got a quorum
        if (isInfoEnabled) {
          logger.info(
              "quorum check: received responses from all members that were in the old distributed system");
        }
        return true;
      }
    }
    return false;
  }

  private int getWeight(Collection<MemberIdentifier> idms,
      MemberIdentifier leader) {
    int weight = 0;
    for (MemberIdentifier mbr : idms) {
      int thisWeight = mbr.getMemberWeight();
      if (mbr.getVmKind() == 10 /* NORMAL_DM_KIND */) {
        thisWeight += 10;
        if (leader != null && mbr.equals(leader)) {
          thisWeight += 5;
        }
      } else if (mbr.preferredForCoordinator()) {
        thisWeight += 3;
      }
      weight += thisWeight;
    }
    return weight;
  }

  private void sendPingMessages() {
    // send a ping message to each member in the last view seen
    List<MemberIdentifier> members = this.lastView.getMembers();
    for (MemberIdentifier addr : members) {
      if (!receivedAcks.contains(addr)) {
        JGAddress dest = new JGAddress(addr);
        if (isInfoEnabled) {
          logger.info("quorum check: sending request to {}", addr);
        }
        try {
          pingPonger.sendPingMessage(channel, myAddress, dest);
        } catch (Exception e) {
          logger.info("Failed sending Ping message to " + dest);
        }
      }
    }
  }

  private class QuorumCheckerReceiver implements Receiver {

    @Override
    public void receive(Message msg) {
      byte[] msgBytes = msg.getBuffer();
      if (pingPonger.isPingMessage(msgBytes)) {
        try {
          pingPonger.sendPongMessage(channel, myAddress, msg.getSrc());
        } catch (Exception e) {
          logger.info("Failed sending Pong message to " + msg.getSrc());
        }
      } else if (pingPonger.isPongMessage(msgBytes)) {
        pongReceived(msg.getSrc());
      } else {
        queueMessage(msg);
      }
    }

    private void queueMessage(Message msg) {
      messageQueue.add(msg);
    }

    @Override
    public void getState(OutputStream output) throws Exception {}

    @Override
    public void setState(InputStream input) throws Exception {}

    @Override
    public void viewAccepted(View new_view) {}

    @Override
    public void suspect(Address suspected_mbr) {}

    @Override
    public void block() {}

    @Override
    public void unblock() {}

    public void pongReceived(Address sender) {
      logger.info("received ping-pong response from {}", sender);
      JGAddress jgSender = (JGAddress) sender;
      SocketAddress sockaddr = new InetSocketAddress(jgSender.getInetAddress(), jgSender.getPort());
      MemberIdentifier memberAddr = addressConversionMap.get(sockaddr);

      if (memberAddr != null) {
        logger.info("quorum check: mapped address to member ID {}", memberAddr);
        receivedAcks.add(memberAddr);
      }
    }
  }

  public String toString() {
    return getClass().getSimpleName() + " on view " + this.lastView;
  }

}
