/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.tcp;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.SerialAckedMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;

@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public class TCPConduitDUnitTest extends DistributedTestCase {
  public static final String TEST_REGION = "testRegion";
  private final Properties properties;

  @Parameterized.Parameters
  public static Collection<Properties> data() {
    Properties nonSSL = new Properties();
    nonSSL.setProperty(ConfigurationProperties.SOCKET_LEASE_TIME, "1000");
    nonSSL.setProperty(ConfigurationProperties.DISABLE_AUTO_RECONNECT, "true");

    Properties SSL = new Properties();
    SSL.putAll(nonSSL);

    final String keystorePath =
        createTempFileFromResource(TCPConduitDUnitTest.class,
            "/org/apache/geode/cache/client/internal/default.keystore").getAbsolutePath();

    SSL.setProperty(ConfigurationProperties.SSL_ENABLED_COMPONENTS, "cluster");
    SSL.setProperty(ConfigurationProperties.SSL_KEYSTORE, keystorePath);
    SSL.setProperty(ConfigurationProperties.SSL_KEYSTORE_PASSWORD, "password");
    SSL.setProperty(ConfigurationProperties.SSL_TRUSTSTORE, keystorePath);
    SSL.setProperty(ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD, "password");

    return Arrays.asList(nonSSL, SSL);
  }

  public TCPConduitDUnitTest(Properties properties) {
    this.properties = properties;
  }

  /**
   * This was a bug where SSL connections were being configured with a timeout that was non-zero,
   * but didn't handle socket timeouts. This resulted in a permanently hung P2P socket.
   *
   * We set the lease time to 1000, which in that case would cause the socket to break in 1 second.
   */
  @Test
  public void basicAcceptConnection() throws Exception {
    final VM vm1 = VM.getVM(1);
    final VM vm2 = VM.getVM(2);
    final VM vm3 = VM.getVM(3);

    disconnectAllFromDS();

    int port = startLocator();
    properties.put(ConfigurationProperties.LOCATORS, "localhost[" + port + "]");

    vm1.invoke(() -> startServer(properties));
    vm2.invoke(() -> startServer(properties));
    vm3.invoke(() -> startServer(properties));

    await().untilAsserted(() -> {
      assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
    });

    // ensure that the closing of a shared/unordered connection to another node does not
    // remove all connections for that node
    InternalDistributedMember otherMember =
        (InternalDistributedMember) system.getAllOtherMembers().iterator().next();
    DistributionImpl distribution =
        (DistributionImpl) system.getDistributionManager().getDistribution();
    final ConnectionTable connectionTable =
        distribution.getDirectChannel().getConduit().getConTable();

    assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

    Connection sharedUnordered = connectionTable.get(otherMember, false,
        System.currentTimeMillis(), 15000, 0);
    sharedUnordered.requestClose("for testing");
    // the sender connection has been closed so we should only have 2 senders now
    assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
    // there should still be receivers for the other member - endpoint not removed!
    assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

    try {
      await("for message to be sent").until(() -> {
        final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
        serialAckedMessage.send(system.getAllOtherMembers(), false);
        return true;
      });
    } finally {
      // DUnit won't clean up properly if the sockets are hung; we have to crash the systems.
      IgnoredException.addIgnoredException("ForcedDisconnectException|loss of quorum");
      for (VM vm : Arrays.asList(vm1, vm2, vm3)) {
        vm.invoke("crash system in case it's hung", () -> {
          if (system != null && system.isConnected()) {
            DistributedTestUtils.crashDistributedSystem(system);
          }
        });
      }
      DistributedTestUtils.crashDistributedSystem(system);
    }
  }

  private int startLocator() throws Exception {
    Locator locator = Locator.startLocatorAndDS(0, new File(""), properties);
    system = (InternalDistributedSystem) locator.getDistributedSystem();
    return locator.getPort();
  }

  private void startServer(Properties properties) throws IOException {
    system = (InternalDistributedSystem) DistributedSystem.connect(properties);
  }
}
