/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.cache.server.ServerLoadProbe;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.admin.ClientMembershipMessage;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.management.membership.ClientMembership;
import org.apache.geode.management.membership.ClientMembershipEvent;
import org.apache.geode.management.membership.ClientMembershipListener;
import org.apache.geode.util.internal.GeodeGlossary;

/**
 * Abstract class that contains common code that all true implementations of
 * {@link InternalCacheServer}
 * can use.
 *
 * @since GemFire 5.7
 */
public abstract class AbstractCacheServer implements InternalCacheServer {

  public static final String TEST_OVERRIDE_DEFAULT_PORT_PROPERTY =
      GeodeGlossary.GEMFIRE_PREFIX + "test.CacheServer.OVERRIDE_DEFAULT_PORT";

  /** The cache that is served by this cache server */
  protected final InternalCache cache;

  /** The port that the cache server was configured to run on */
  protected int port;

  /** The maximum number of connections that the BridgeServer will accept */
  protected int maxConnections;

  /** The maximum number of threads that the BridgeServer will create */
  protected int maxThreads;

  /** Whether the cache server notifies by subscription */
  protected boolean notifyBySubscription = true;

  /**
   * The buffer size in bytes of the socket for this <code>BridgeServer</code>
   */
  protected int socketBufferSize;

  /**
   * The tcpNoDelay setting for outgoing sockets
   */
  protected boolean tcpNoDelay;

  /**
   * The maximum amount of time between client pings. This value is used by the
   * <code>ClientHealthMonitor</code> to determine the health of this <code>BridgeServer</code>'s
   * clients.
   */
  protected int maximumTimeBetweenPings;

  /** the maximum number of messages that can be enqueued in a client-queue. */
  protected int maximumMessageCount;

  /**
   * the time (in seconds) after which a message in the client queue will expire.
   */
  protected int messageTimeToLive;
  /**
   * The groups this server belongs to. Use <code>getGroups</code> to read.
   *
   * @since GemFire 5.7
   */
  protected String[] groups;

  protected ServerLoadProbe loadProbe;

  /**
   * The ip address or host name that this server is to listen on.
   *
   * @since GemFire 5.7
   */
  protected String bindAddress;
  /**
   * The ip address or host name that will be given to clients so they can connect to this server
   *
   * @since GemFire 5.7
   */
  protected String hostnameForClients;

  /**
   * How frequency to poll the load on this server.
   */
  protected long loadPollInterval;

  protected ClientSubscriptionConfig clientSubscriptionConfig;

  /**
   * Listens to client membership events and notifies any admin members as clients of this server
   * leave/crash.
   */
  protected final ClientMembershipListener listener;

  ////////////////////// Constructors //////////////////////

  /**
   * Creates a new <code>BridgeServer</code> with the default configuration.
   *
   * @param cache The cache being served
   */
  public AbstractCacheServer(InternalCache cache) {
    this(cache, true);
  }

  public AbstractCacheServer(InternalCache cache, boolean attachListener) {
    this.cache = cache;
    this.port = Integer.getInteger(TEST_OVERRIDE_DEFAULT_PORT_PROPERTY, CacheServer.DEFAULT_PORT);
    this.maxConnections = CacheServer.DEFAULT_MAX_CONNECTIONS;
    this.maxThreads = CacheServer.DEFAULT_MAX_THREADS;
    this.socketBufferSize = CacheServer.DEFAULT_SOCKET_BUFFER_SIZE;
    this.tcpNoDelay = CacheServer.DEFAULT_TCP_NO_DELAY;
    this.maximumTimeBetweenPings = CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
    this.maximumMessageCount = CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT;
    this.messageTimeToLive = CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE;
    this.groups = CacheServer.DEFAULT_GROUPS;
    this.bindAddress = CacheServer.DEFAULT_BIND_ADDRESS;
    this.hostnameForClients = CacheServer.DEFAULT_HOSTNAME_FOR_CLIENTS;
    this.loadProbe = CacheServer.DEFAULT_LOAD_PROBE;
    this.loadPollInterval = CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
    this.clientSubscriptionConfig = new ClientSubscriptionConfigImpl();

    if (!attachListener) {
      this.listener = null;
      return;
    }
    listener = new ClientMembershipListener() {

      @Override
      public void memberJoined(ClientMembershipEvent event) {
        if (event.isClient()) {
          createAndSendMessage(event, ClientMembershipMessage.JOINED);
        }
      }

      @Override
      public void memberLeft(ClientMembershipEvent event) {
        if (event.isClient()) {
          createAndSendMessage(event, ClientMembershipMessage.LEFT);
        }
      }

      @Override
      public void memberCrashed(ClientMembershipEvent event) {
        if (event.isClient()) {
          createAndSendMessage(event, ClientMembershipMessage.CRASHED);
        }
      }

      /**
       * Method to create & send the ClientMembershipMessage to admin members. The message is sent
       * only if there are any admin members in the distribution system.
       *
       * @param event describes a change in client membership
       * @param type type of event - one of ClientMembershipMessage.JOINED,
       *        ClientMembershipMessage.LEFT, ClientMembershipMessage.CRASHED
       */
      private void createAndSendMessage(ClientMembershipEvent event, int type) {
        InternalDistributedSystem ds = null;
        Cache cacheInstance = AbstractCacheServer.this.cache;
        if (cacheInstance != null && !(cacheInstance instanceof CacheCreation)) {
          ds = (InternalDistributedSystem) cacheInstance.getDistributedSystem();
        } else {
          ds = InternalDistributedSystem.getAnyInstance();
        }

        // ds could be null
        if (ds != null && ds.isConnected()) {
          DistributionManager dm = ds.getDistributionManager();
          Set adminMemberSet = dm.getAdminMemberSet();

          /* check if there are any admin members at all */
          if (!adminMemberSet.isEmpty()) {
            DistributedMember member = event.getMember();

            ClientMembershipMessage msg = new ClientMembershipMessage(event.getMemberId(),
                member == null ? null : member.getHost(), type);

            msg.setRecipients(adminMemberSet);
            dm.putOutgoing(msg);
          }
        }
      }
    };

    ClientMembership.registerClientMembershipListener(listener);
  }

  ///////////////////// Instance Methods /////////////////////

  @Override
  public int getPort() {
    return this.port;
  }

  @Override
  public void setPort(int port) {
    this.port = port;
  }

  @Override
  public String getBindAddress() {
    return this.bindAddress;
  }

  @Override
  public void setBindAddress(String address) {
    this.bindAddress = address;
  }

  @Override
  public String getHostnameForClients() {
    return this.hostnameForClients;
  }

  @Override
  public void setHostnameForClients(String name) {
    this.hostnameForClients = name;
  }

  @Override
  public int getMaxConnections() {
    return this.maxConnections;
  }

  @Override
  public void setMaxConnections(int maxCon) {
    this.maxConnections = maxCon;
  }

  @Override
  public int getMaxThreads() {
    return this.maxThreads;
  }

  @Override
  public void setMaxThreads(int maxThreads) {
    this.maxThreads = maxThreads;
  }

  @Override
  public void start() throws IOException {
    // This method is invoked during testing, but it is not necessary
    // to do anything.
  }

  @Override
  public void setNotifyBySubscription(boolean b) {
    // this.notifyBySubscription = true;
  }

  @Override
  public boolean getNotifyBySubscription() {
    return this.notifyBySubscription;
  }

  @Override
  public void setSocketBufferSize(int socketBufferSize) {
    this.socketBufferSize = socketBufferSize;
  }

  @Override
  public int getSocketBufferSize() {
    return this.socketBufferSize;
  }

  @Override
  public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
    this.maximumTimeBetweenPings = maximumTimeBetweenPings;
  }

  @Override
  public int getMaximumTimeBetweenPings() {
    return this.maximumTimeBetweenPings;
  }

  @Override
  public int getMaximumMessageCount() {
    return this.maximumMessageCount;
  }

  @Override
  public void setMaximumMessageCount(int maximumMessageCount) {
    this.maximumMessageCount = maximumMessageCount;
  }

  @Override
  public int getMessageTimeToLive() {
    return this.messageTimeToLive;
  }

  @Override
  public void setMessageTimeToLive(int messageTimeToLive) {
    this.messageTimeToLive = messageTimeToLive;
  }

  @Override
  public void setGroups(String[] groups) {
    if (groups == null) {
      this.groups = CacheServer.DEFAULT_GROUPS;
    } else if (groups.length > 0) {
      // copy it for isolation
      String[] copy = new String[groups.length];
      System.arraycopy(groups, 0, copy, 0, groups.length);
      this.groups = copy;
    } else {
      this.groups = CacheServer.DEFAULT_GROUPS; // keep findbugs happy
    }
  }

  @Override
  public String[] getGroups() {
    String[] result = this.groups;
    if (result.length > 0) {
      // copy it for isolation
      String[] copy = new String[result.length];
      System.arraycopy(result, 0, copy, 0, result.length);
      result = copy;
    }
    return result;
  }

  @Override
  public ServerLoadProbe getLoadProbe() {
    return loadProbe;
  }

  @Override
  public void setLoadProbe(ServerLoadProbe loadProbe) {
    this.loadProbe = loadProbe;
  }

  @Override
  public long getLoadPollInterval() {
    return loadPollInterval;
  }

  @Override
  public void setLoadPollInterval(long loadPollInterval) {
    this.loadPollInterval = loadPollInterval;
  }

  @Override
  public void setTcpNoDelay(boolean setting) {
    this.tcpNoDelay = setting;
  }

  @Override
  public boolean getTcpNoDelay() {
    return this.tcpNoDelay;
  }

  @Override
  public InternalCache getCache() {
    return this.cache;
  }

  private static boolean eq(String s1, String s2) {
    if (s1 == null) {
      return s2 == null;
    } else {
      return s1.equals(s2);
    }
  }

  /**
   * Returns whether or not this cache server has the same configuration as another cache server.
   */
  public boolean sameAs(CacheServer other) {
    return getPort() == other.getPort() && eq(getBindAddress(), other.getBindAddress())
        && getSocketBufferSize() == other.getSocketBufferSize()
        && getMaximumTimeBetweenPings() == other.getMaximumTimeBetweenPings()
        && getNotifyBySubscription() == other.getNotifyBySubscription()
        && getMaxConnections() == other.getMaxConnections()
        && getMaxThreads() == other.getMaxThreads()
        && getMaximumMessageCount() == other.getMaximumMessageCount()
        && getMessageTimeToLive() == other.getMessageTimeToLive()
        && Arrays.equals(getGroups(), other.getGroups())
        && getLoadProbe().equals(other.getLoadProbe())
        && getLoadPollInterval() == other.getLoadPollInterval()
        && getTcpNoDelay() == other.getTcpNoDelay();
  }
}
