/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.wan.misc;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;

import java.util.Properties;
import java.util.concurrent.atomic.LongAdder;

import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.AuthInitialize;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.SecurityTestUtils;
import org.apache.geode.security.TestSecurityManager;
import org.apache.geode.security.generator.CredentialGenerator;
import org.apache.geode.security.generator.DummyCredentialGenerator;
import org.apache.geode.security.templates.UserPasswordAuthInit;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.WanTest;

@Category({WanTest.class})
public class NewWanAuthenticationDUnitTest extends WANTestBase {

  private static final Logger logger = LogService.getLogger();

  private static boolean isDifferentServerInGetCredentialCall = false;

  private static final String securityJsonResource =
      "org/apache/geode/security/templates/security.json";
  private static final String senderId = "ln";
  private static final int numPuts = 10;

  private Integer lnPort;
  private Integer nyPort;
  private String regionName;
  private final VM sender = vm2;
  private final VM receiver = vm3;

  @Before
  public void setup() {
    disconnectAllFromDS();
    lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
    nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
    regionName = getTestMethodName() + "_RR";
  }

  /**
   * Authentication test for new WAN with valid credentials. Although, nothing related to
   * authentication has been changed in new WAN, this test case is added on request from QA for
   * defect 44650.
   */
  @Test
  public void testWanAuthValidCredentials() {
    final CredentialGenerator credentialGenerator = new DummyCredentialGenerator();

    final Properties extraProps = credentialGenerator.getSystemProperties();

    final Properties senderCredentials = credentialGenerator.getValidCredentials(1);
    if (extraProps != null) {
      senderCredentials.putAll(extraProps);
    }
    final Properties senderJavaProps = credentialGenerator.getJavaProperties();

    // receiver's invalid credentials
    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(1);
    if (extraProps != null) {
      receiverCredentials.putAll(extraProps);
    }
    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();

    final String clientAuthenticator = credentialGenerator.getAuthenticator();
    final String clientAuthInit = credentialGenerator.getAuthInit();

    final Properties senderSecurityProps =
        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
    // have receiver start a cache with invalid credentials
    final Properties receiverSecurityProps =
        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    sender.invoke(() -> createSecuredCache(senderSecurityProps, senderJavaProps, lnPort));
    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, receiverJavaProps, nyPort));

    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
    receiver.invoke(() -> createReceiverInSecuredCache());

    sender.invoke(
        () -> createReplicatedRegion(regionName, "ln", isOffHeap()));
    receiver.invoke(
        () -> createReplicatedRegion(regionName, null, isOffHeap()));

    // this tests verifies that even though the receiver has invalid credentials, the sender can
    // still send data to
    // the receiver because the sender has valid credentials
    sender.invoke(() -> startSender("ln"));
    sender.invoke(() -> waitForSenderRunningState("ln"));

    sender.invoke(() -> doPuts(regionName, 1));
    receiver.invoke(() -> {
      Region r = cache.getRegion(Region.SEPARATOR + regionName);
      await().untilAsserted(() -> assertTrue(r.size() > 0));
    });
  }

  @Test
  public void testWanIntegratedSecurityWithValidCredentials() {
    final Properties senderSecurityProps = buildSecurityProperties("admin", "secret");
    final Properties receiverSecurityProps = buildSecurityProperties("guest", "guest");

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    sender.invoke(() -> createSecuredCache(senderSecurityProps, null, lnPort));
    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, null, nyPort));

    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
    receiver.invoke(() -> createReceiverInSecuredCache());

    sender.invoke(
        () -> createReplicatedRegion(regionName, "ln", isOffHeap()));
    receiver.invoke(
        () -> createReplicatedRegion(regionName, null, isOffHeap()));

    sender.invoke(() -> startSender("ln"));
    sender.invoke(() -> waitForSenderRunningState("ln"));
    sender.invoke(() -> doPuts(regionName, 1));

    receiver.invoke(() -> {
      Region r = cache.getRegion(Region.SEPARATOR + regionName);
      await().untilAsserted(() -> assertTrue(r.size() > 0));
    });
  }

  /**
   * Test authentication with new WAN with invalid credentials. Although, nothing related to
   * authentication has been changed in new WAN, this test case is added on request from QA for
   * defect 44650.
   */
  @Test
  public void testWanAuthInvalidCredentials() {
    final CredentialGenerator credentialGenerator = new DummyCredentialGenerator();

    final Properties extraProps = credentialGenerator.getSystemProperties();

    final Properties senderCredentials = credentialGenerator.getInvalidCredentials(1);
    if (extraProps != null) {
      senderCredentials.putAll(extraProps);
    }
    final Properties senderJavaProperties = credentialGenerator.getJavaProperties();

    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(2);
    if (extraProps != null) {
      receiverCredentials.putAll(extraProps);
    }
    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();

    final String clientAuthenticator = credentialGenerator.getAuthenticator();
    final String clientAuthInit = credentialGenerator.getAuthInit();

    final Properties senderSecurityProps =
        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
    final Properties receiverSecurityPropsWithIncorrectSenderCreds =
        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);

    // ------------------------------ Set Up With Invalid Credentials ------------------------------

    sender.invoke(() -> {
      createSecuredCache(senderSecurityProps, senderJavaProperties, lnPort);
      createReplicatedRegion(regionName, senderId, isOffHeap());
      createSender(senderId, 2, false, 100, 10, false, false, null, true);
    });

    receiver.invoke(() -> {
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
          receiverJavaProps);
    });

    sender.invoke(() -> {
      startSender(senderId);
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, false);
    });

    receiver.invoke(() -> validateRegionSize(regionName, 0));
  }

  /**
   * Test authentication with new WAN with invalid credentials. Although, nothing related to
   * authentication has been changed in new WAN, this test case is added on request from QA for
   * defect 44650.
   */
  @Test
  public void testWanSecurityManagerWithInvalidThenValidCredentials() {
    final Properties senderSecurityProps = buildSecurityProperties("admin", "wrongPswd");

    final String securityJsonResource =
        "org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json";
    final Properties receiverSecurityPropsWithCorrectSenderCreds =
        buildSecurityProperties(securityJsonResource);
    final Properties receiverSecurityPropsWithIncorrectSenderCreds = buildSecurityProperties();

    // ------------------------------ Set Up With Invalid Credentials ------------------------------

    sender.invoke(() -> {
      createSecuredCache(senderSecurityProps, null, lnPort);
      createReplicatedRegion(regionName, senderId, isOffHeap());
      createSender(senderId, 2, false, 100, 10, false, false, null, true);
    });

    receiver.invoke(() -> {
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
          null);
    });

    sender.invoke(() -> {
      startSender(senderId);
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, false);
    });

    receiver.invoke(() -> validateRegionSize(regionName, 0));

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    receiver.invoke(() -> {
      closeCache();
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
    });

    sender.invoke(() -> {
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, true, false, true);
    });

    receiver.invoke(() -> validateRegionSize(regionName, numPuts));
  }

  @Test
  public void testWanSecurityManagerWithValidThenInvalidThenValidCredentials() {
    final String securityJsonResource =
        "org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json";
    final String gatewayConnectionRetryIntervalConfigParameter =
        DistributionConfig.GEMFIRE_PREFIX + "gateway-connection-retry-interval";

    final Properties senderSecurityProps = buildSecurityProperties("admin", "wrongPswd");

    final Properties receiverSecurityPropsWithCorrectSenderCreds =
        buildSecurityProperties(securityJsonResource);
    final Properties receiverSecurityPropsWithIncorrectSenderCreds = buildSecurityProperties();

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    sender.invoke(() -> {
      createSecuredCache(senderSecurityProps, null, lnPort);
      createReplicatedRegion(regionName, senderId, isOffHeap());
      createSender(senderId, 2, false, 100, 10, false, false, null, true);
    });

    receiver.invoke(() -> {
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
    });

    sender.invoke(() -> {
      startSender(senderId);
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, true, false, true);
    });

    receiver.invoke(() -> validateRegionSize(regionName, numPuts));

    // ------------------------------ Set Up With Invalid Credentials ------------------------------

    receiver.invoke(() -> {
      // Simulate restarting the receiver, this time without valid credentials for the sender.
      closeCache();
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
          null);
    });

    sender.invoke(() -> {
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, true);
    });

    receiver.invoke(() -> validateRegionSize(regionName, 0));

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    receiver.invoke(() -> {
      closeCache();
      // Simulate restarting the receiver, and restore valid credentials for the sender.
      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
    });

    sender.invoke(() -> {
      /*
       * Data should be able to flow properly after valid credentials have been restored.
       * No more puts are needed because we already have numPuts queued from when credentials
       * were invalid (see above).
       */
      doPutsAndVerifyQueueSizeAfterProcessing(regionName, 0, true, false, true);
    });

    receiver.invoke(() -> validateRegionSize(regionName, numPuts));
  }

  @Test
  public void testWanAuthValidCredentialsWithServer() {
    final DummyCredentialGenerator credentialGenerator = new DummyCredentialGenerator();
    credentialGenerator.init();

    final Properties extraProps = credentialGenerator.getSystemProperties();

    final Properties senderCredentials = credentialGenerator.getValidCredentials(1);
    if (extraProps != null) {
      senderCredentials.putAll(extraProps);
    }
    final Properties senderJavaProps = credentialGenerator.getJavaProperties();

    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(2);
    if (extraProps != null) {
      receiverCredentials.putAll(extraProps);
    }
    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();

    final String clientAuthenticator = credentialGenerator.getAuthenticator();
    final String clientAuthInit = UserPasswdAI.class.getName() + ".createAI";

    final Properties senderSecurityProps =
        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
    final Properties receiverSecurityProps =
        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    sender.invoke(() -> createSecuredCache(senderSecurityProps, senderJavaProps, lnPort));

    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, receiverJavaProps, nyPort));

    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));

    receiver.invoke(() -> createReceiverInSecuredCache());

    sender.invoke(() -> {
      startSender("ln");
      waitForSenderRunningState("ln");
      verifyDifferentServerInGetCredentialCall();
    });

    receiver.invoke(() -> verifyDifferentServerInGetCredentialCall());
  }

  @Test
  public void testWanSecurityManagerAuthValidCredentialsWithServer() {
    Properties senderSecurityProps = buildSecurityProperties("admin", "secret");
    Properties receiverSecurityProps = buildSecurityProperties("guest", "guest");

    // ------------------------------- Set Up With Valid Credentials -------------------------------

    sender.invoke(() -> createSecuredCache(senderSecurityProps, null, lnPort));

    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, null, nyPort));

    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));

    receiver.invoke(() -> createReceiverInSecuredCache());

    sender.invoke(() -> {
      startSender("ln");
      waitForSenderRunningState("ln");
      verifyDifferentServerInGetCredentialCall();
    });

    // this would fail for now because for integrated security, we are not sending the receiver's
    // credentials back
    // to the sender. Because in the old security implementation, even though the receiver's
    // credentials are sent back to the sender
    // the sender is not checking it.
    // sender.invoke(() -> verifyDifferentServerInGetCredentialCall());
  }

  private void doPutsAndVerifyQueueSizeAfterProcessing(
      final String regionName,
      final int numPuts,
      final boolean shouldBeConnected,
      final boolean isQueueBlocked,
      final boolean isAckThreadRunning) {
    if (isQueueBlocked) {
      // caller is assuming that queue processing will not make progress
      try {
        final LongAdder dispatchAttempts = new LongAdder();
        final LongAdder ackReadAttempts = new LongAdder();

        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {
          if (isAck)
            ackReadAttempts.increment();
          else
            dispatchAttempts.increment();
        };

        doPuts(regionName, numPuts);

        /*
         * The game here is to ensure that both the dispatcher thread and the ack reader thread
         * each get at least one whack at processing (a batch or an ack, respectively).
         * Note: both those conditions aren't obviously necessary from our method signature, but
         * trust us: callers rely on that guarantee! (the "Processing" in the method name
         * implies that _both_ threads tried).
         *
         * Notice the particular awfulness of the second term in the conditional below. Callers
         * have to send us a flag to tell us that the ack reader thread is not running so we know
         * not to look for its attempts.
         */
        await().until(() -> dispatchAttempts.sum() > 0 &&
            (!isAckThreadRunning || ackReadAttempts.sum() > 0));

        checkQueueSize(senderId, numPuts);
      } finally {
        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {
        };
      }
    } else {
      doPuts(regionName, numPuts);
      // caller is assuming queue will drain eventually
      checkQueueSize(senderId, 0);
    }
    verifyRunningWithConnectedState(senderId, shouldBeConnected);
  }

  private void createSecuredReceiver(Integer nyPort, String regionName,
      Properties receiverSecurityPropsWithCorrectSenderCreds,
      Object javaProps) {
    createSecuredCache(receiverSecurityPropsWithCorrectSenderCreds, javaProps, nyPort);
    createReplicatedRegion(regionName, null, isOffHeap());
    createReceiverInSecuredCache();
  }

  private void verifyRunningWithConnectedState(
      final String senderId,
      final boolean shouldBeConnected) {
    await().untilAsserted(() -> {
      verifySenderRunningState(senderId);
      verifySenderConnectedState(senderId, shouldBeConnected);
    });
  }

  private static Properties buildProperties(String clientauthenticator, String clientAuthInit,
      String accessor, Properties extraAuthProps, Properties extraAuthzProps) {
    Properties authProps = new Properties();
    if (clientauthenticator != null) {
      authProps.setProperty(SECURITY_CLIENT_AUTHENTICATOR, clientauthenticator);
    }
    if (accessor != null) {
      authProps.setProperty(SECURITY_CLIENT_ACCESSOR, accessor);
    }
    if (clientAuthInit != null) {
      authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit);
    }
    if (extraAuthProps != null) {
      authProps.putAll(extraAuthProps);
    }
    if (extraAuthzProps != null) {
      authProps.putAll(extraAuthzProps);
    }
    return authProps;
  }

  private static Properties buildSecurityProperties(final String username, final String password) {
    final Properties props = buildSecurityProperties();
    props.put("security-username", username);
    props.put("security-password", password);
    return props;
  }

  private static Properties buildSecurityProperties(
      final String securityJsonResource) {
    final Properties props = buildSecurityProperties();
    props.put("security-json", securityJsonResource);
    return props;
  }

  private static Properties buildSecurityProperties() {
    final Properties props = new Properties();
    props.put(SECURITY_MANAGER, TestSecurityManager.class.getName());
    props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
    props.put("security-json", securityJsonResource);
    return props;
  }

  private static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
    authProps.setProperty(MCAST_PORT, "0");
    authProps.setProperty(LOCATORS, "localhost[" + locPort + "]");

    logger.info("Set the server properties to: " + authProps);
    logger.info("Set the java properties to: " + javaProps);

    SecurityTestUtils tmpInstance = new SecurityTestUtils();
    DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties) javaProps);
    assertNotNull(ds);
    assertTrue(ds.isConnected());
    cache = CacheFactory.create(ds);
    assertNotNull(cache);
  }

  public static class UserPasswdAI extends UserPasswordAuthInit {

    public static AuthInitialize createAI() {
      return new UserPasswdAI();
    }

    @Override
    public Properties getCredentials(Properties props, DistributedMember server, boolean isPeer)
        throws AuthenticationFailedException {
      boolean val = (CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember()
          .getProcessId() != server.getProcessId());
      Assert.assertTrue(val, "getCredentials: Server should be different");
      Properties p = super.getCredentials(props, server, isPeer);
      if (val) {
        isDifferentServerInGetCredentialCall = true;
        CacheFactory.getAnyInstance().getLogger()
            .config("setting  isDifferentServerInGetCredentialCall "
                + isDifferentServerInGetCredentialCall);
      } else {
        CacheFactory.getAnyInstance().getLogger()
            .config("setting22  isDifferentServerInGetCredentialCall "
                + isDifferentServerInGetCredentialCall);
      }
      return p;
    }
  }

  private static void verifyDifferentServerInGetCredentialCall() {
    Assert.assertTrue(isDifferentServerInGetCredentialCall,
        "verifyDifferentServerInGetCredentialCall: Server should be different");
    isDifferentServerInGetCredentialCall = false;
  }
}
