/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.authentication;

import com.google.common.collect.ImmutableSet;

import java.io.File;
import java.io.FileWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.security.auth.login.Configuration;

import lombok.Cleanup;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
	private static final Logger log = LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);

	public static File kdcDir;
	public static File kerberosWorkDir;
	public static File brokerSecretKeyFile;
	public static File proxySecretKeyFile;

	private static MiniKdc kdc;
	private static Properties properties;

	private static String localHostname = "localhost";

	@BeforeClass
	public static void startMiniKdc() throws Exception {
		kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
		kerberosWorkDir = Files.createTempDirectory("test-kerberos-work-dir").toFile();

		properties = MiniKdc.createConf();
		kdc = new MiniKdc(properties, kdcDir);
		kdc.start();

		String principalBrokerNoRealm = "broker/" + localHostname;
		String principalBroker = "broker/" + localHostname + "@" + kdc.getRealm();
		log.info("principalBroker: " + principalBroker);

		String principalClientNoRealm = "client/" + localHostname;
		String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
		log.info("principalClient: " + principalClient);

		String principalProxyNoRealm = "proxy/" + localHostname;
		String principalProxy = principalProxyNoRealm + "@" + kdc.getRealm();
		log.info("principalProxy: " + principalProxy);

		File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
		kdc.createPrincipal(keytabClient, principalClientNoRealm);

		File keytabBroker = new File(kerberosWorkDir, "pulsarbroker.keytab");
		kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);

		File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
		kdc.createPrincipal(keytabProxy, principalProxyNoRealm);

		File jaasFile = new File(kerberosWorkDir, "jaas.conf");
		try (FileWriter writer = new FileWriter(jaasFile)) {
			writer.write("\n"
				+ "PulsarBroker {\n"
				+ "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
				+ "  useKeyTab=true\n"
				+ "  keyTab=\"" + keytabBroker.getAbsolutePath() + "\n"
				+ "  storeKey=true\n"
				+ "  useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
				+ "  principal=\"" + principalBroker + "\";\n"
				+ "};\n"
				+ "\n"
				+ "\n"
				+ "\n"
				+ "PulsarProxy{\n"
				+ "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
				+ "  useKeyTab=true\n"
				+ "  keyTab=\"" + keytabProxy.getAbsolutePath() + "\n"
				+ "  storeKey=true\n"
				+ "  useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
				+ "  principal=\"" + principalProxy + "\";\n"
				+ "};\n"
				+ "\n"
				+ "\n"
				+ "\n"
				+ "PulsarClient {\n"
				+ "  com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
				+ "  useKeyTab=true\n"
				+ "  keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
				+ "  storeKey=true\n"
				+ "  useTicketCache=false\n"
				+ "  principal=\"" + principalClient + "\";\n"
				+ "};\n"
			);
		}

		File krb5file = new File(kerberosWorkDir, "krb5.conf");
		try (FileWriter writer = new FileWriter(krb5file)) {
			String conf = "[libdefaults]\n"
				+ " default_realm = " + kdc.getRealm() + "\n"
				+ " udp_preference_limit = 1\n" // force use TCP
				+ "\n"
				+ "\n"
				+ "[realms]\n"
				+ " " + kdc.getRealm() + "  = {\n"
				+ "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
				+ " }";
			writer.write(conf);
			log.info("krb5.conf:\n" + conf);
		}

		System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
		System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
		Configuration.getConfiguration().refresh();

		// Client config

		log.info("created AuthenticationSasl");
	}

	@AfterClass(alwaysRun = true)
	public static void stopMiniKdc() {
		System.clearProperty("java.security.auth.login.config");
		System.clearProperty("java.security.krb5.conf");
		if (kdc != null) {
			kdc.stop();
		}
		FileUtils.deleteQuietly(kdcDir);
		FileUtils.deleteQuietly(kerberosWorkDir);
		Assert.assertFalse(kdcDir.exists());
		Assert.assertFalse(kerberosWorkDir.exists());
	}

	@BeforeMethod
	@Override
	protected void setup() throws Exception {
		log.info("-- {} --, start at host: {}", methodName, localHostname);
		isTcpLookup = true;
		conf.setAdvertisedAddress(localHostname);
		conf.setAuthenticationEnabled(true);
		conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
		conf.setSaslJaasServerSectionName("PulsarBroker");
		brokerSecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", ".key");
		Files.write(Paths.get(brokerSecretKeyFile.toString()), "PulsarSecret".getBytes());
		conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
		Set<String> providers = new HashSet<>();
		providers.add(AuthenticationProviderSasl.class.getName());
		conf.setAuthenticationProviders(providers);
		conf.setClusterName("test");
		conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
		// set admin auth, to verify admin web resources
		Map<String, String> clientSaslConfig = new HashMap<>();
		clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
		clientSaslConfig.put("serverType", "broker");
		conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
		conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
				.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));

		super.init();

		lookupUrl = new URI(pulsar.getBrokerServiceUrl());
		log.info("set client jaas section name: PulsarClient");
		admin = PulsarAdmin.builder()
			.serviceHttpUrl(brokerUrl.toString())
			.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
			.build();
		super.producerBaseSetup();
		log.info("-- {} --, end.", methodName);
	}

	@Override
	@AfterMethod(alwaysRun = true)
	protected void cleanup() throws Exception {
		FileUtils.deleteQuietly(brokerSecretKeyFile);
		Assert.assertFalse(brokerSecretKeyFile.exists());
		FileUtils.deleteQuietly(proxySecretKeyFile);
		Assert.assertFalse(proxySecretKeyFile.exists());
		super.internalCleanup();
	}

	@Test
	void testAuthentication() throws Exception {
		log.info("-- Starting {} test --", methodName);

		// Step 1: Create Admin Client

		// create a client which connects to proxy and pass authData
		String topicName = "persistent://my-property/my-ns/my-topic1";

		ProxyConfiguration proxyConfig = new ProxyConfiguration();
		proxyConfig.setAuthenticationEnabled(true);
		proxyConfig.setServicePort(Optional.of(0));
		proxyConfig.setBrokerProxyAllowedTargetPorts("*");
		proxyConfig.setWebServicePort(Optional.of(0));
		proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
		proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
		proxyConfig.setSaslJaasServerSectionName("PulsarProxy");

		// proxy connect to broker
		proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
		proxyConfig.setBrokerClientAuthenticationParameters(
			"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
				"\"serverType\": " + "\"broker\"}");
		proxySecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", ".key");
		Files.write(Paths.get(proxySecretKeyFile.toString()), "PulsarSecret".getBytes());
		proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
		// proxy as a server, it will use sasl to authn
		Set<String> providers = new HashSet<>();
		providers.add(AuthenticationProviderSasl.class.getName());
		proxyConfig.setAuthenticationProviders(providers);

		proxyConfig.setForwardAuthorizationCredentials(true);
		AuthenticationService authenticationService = new AuthenticationService(
                        PulsarConfigurationLoader.convertFrom(proxyConfig));
		ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);

		proxyService.start();
		final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
		log.info("1 proxy service started {}", proxyService);

		// Step 3: Pass correct client params
		@Cleanup
		PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 1);
		log.info("2 create proxy client {}, {}", proxyServiceUrl, proxyClient);

		Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
		log.info("3 created producer.");

		Consumer<byte[]> consumer = proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
		log.info("4 created consumer.");

		for (int i = 0; i < 10; i++) {
			String message = "my-message-" + i;
			producer.send(message.getBytes());
			log.info("Produced message: [{}]", message);
		}

		Message<byte[]> msg = null;
		Set<String> messageSet = new HashSet<>();
		for (int i = 0; i < 10; i++) {
			msg = consumer.receive(5, TimeUnit.SECONDS);
			String receivedMessage = new String(msg.getData());
			log.info("Received message: [{}]", receivedMessage);
			String expectedMessage = "my-message-" + i;
			testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
		}
		// Acknowledge the consumption of all messages at once
		consumer.acknowledgeCumulative(msg);
		consumer.close();

		proxyService.close();
	}

	private PulsarClient createProxyClient(String proxyServiceUrl, int numberOfConnections) throws PulsarClientException {
		Map<String, String> clientSaslConfig = new HashMap<>();
		clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
		clientSaslConfig.put("serverType", "proxy");
		log.info("set client jaas section name: PulsarClient, serverType: proxy");
		Authentication authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);

		return PulsarClient.builder().serviceUrl(proxyServiceUrl)
				.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
	}
}
