/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.sentry.tests.e2e.kafka;

import com.google.common.collect.Sets;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.sentry.core.model.kafka.Cluster;
import org.apache.sentry.core.model.kafka.ConsumerGroup;
import org.apache.sentry.core.model.kafka.KafkaActionConstant;
import org.apache.sentry.core.model.kafka.Host;
import org.apache.sentry.core.model.kafka.Topic;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public class TestAuthorize extends AbstractKafkaSentryTestBase {
  private static final Logger LOGGER = LoggerFactory.getLogger(TestAuthorize.class);
  private static final String TOPIC_NAME = "tOpIc1";

  @Test
  public void testProduceConsumeForSuperuser() {
    LOGGER.debug("testProduceConsumeForSuperuser");
    SentryGenericServiceClientFactory.factoryReset();
    try {
      final String SuperuserName = "test";
      testProduce(TOPIC_NAME, SuperuserName);
      testConsume(TOPIC_NAME, SuperuserName);
    } catch (Exception ex) {
      Assert.fail("Superuser must have been allowed to perform any and all actions. \nException: \n" + ex);
    }
  }
/*
  Here are the list of permissions needed for a role a send to produce a message on a topic using kafkaProducer.
  HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
  HOST=<hostname>->Cluster=<cluster name>->action=CREATE
  HOST=<hostname>->Topic=<topic name>->action=WRITE
 */

/*
  Here are the list of permissions needed for a role to subscribe and read the messages on a topic using kafkaConsumer.
  HOST=<hostname>->CONSUMERGROUP=<group id>sentrykafkaconsumer->action=DESCRIBE
  HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
  HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
  HOST=<hostname>->Topic=<topic name>->action=READ
 */


  @Test
  @Ignore ("This test should be enabled after KAFKA-6091 is resolved")
  public void testProduceConsumeCycleWithNoPrivileges() throws Exception {
    // START TESTING PRODUCER
    final String TOPIC_NAME = "tOpIc1";
    LOGGER.debug("testProduceConsumeCycleWithNoPrivileges");
    try {
      testProduce(TOPIC_NAME, "user1");
      Assert.fail("user1 must not have been authorized to create topic " + TOPIC_NAME + ".");
    } catch (ExecutionException ex) {
      assertCausedMessage(ex, "Not authorized to access topics: [" + TOPIC_NAME + "]");
    }

    // START TESTING CONSUMER
    try {
      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
      Assert.fail("user1 must not have been authorized to describe consumer group sentrykafkaconsumer.");
    } catch (Exception ex) {
      assertCausedMessage(ex, "Not authorized to access group: sentrykafkaconsumer");
    }
  }

  @Test
  public void testProduceCycleWithInsufficientPrivileges() throws Exception {
    LOGGER.debug("testProduceCycleWithInsufficientPrivileges");
    final String TOPIC_NAME = "tOpIc2";
    final String localhost = InetAddress.getLocalHost().getHostAddress();
    SentryGenericServiceClientFactory.factoryReset();

    final String role = StaticUserGroupRole.ROLE_1;
    final String group = StaticUserGroupRole.GROUP_1;

    // START TESTING PRODUCER
    /*
     Permissions Added
     HOST=<hostname>->Topic=<topic name>->action=DESCRIBE

     Permissions Missing
     HOST=<hostname>->Cluster=<cluster name>->action=CREATE
     HOST=<hostname>->Topic=<topic name>->action=WRITE
    */
    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
    Host host = new Host(localhost);
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    Topic topic = new Topic(TOPIC_NAME); // Topic name is case sensitive.
    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
    try {
      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
      Assert.fail("user1 must not have been authorized to create topic " + TOPIC_NAME + ".");
    } catch (ExecutionException ex) {
      assertCausedMessage(ex, "Not authorized to access topics: [" + TOPIC_NAME + "]");
    }

    /*
     Permissions Added
     HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
     HOST=<hostname>->Cluster=<cluster name>->action=CREATE

     Permissions Missing
     HOST=<hostname>->Topic=<topic name>->action=WRITE
    */
    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    Cluster cluster = new Cluster();
    authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName()));
    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
    try {
      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
      Assert.fail("user1 must not have been authorized to create topic " + TOPIC_NAME + ".");
    } catch (ExecutionException ex) {
      assertCausedMessage(ex, "Not authorized to access topics: [" + TOPIC_NAME + "]");
    }
  }

  @Test
  public void testProduceConsumeSuccess() throws Exception {
    LOGGER.debug("testProduceConsumeSuccess");
    final String TOPIC_NAME = "tOpIc3";
    final String localhost = InetAddress.getLocalHost().getHostAddress();

    SentryGenericServiceClientFactory.factoryReset();

    // START PRODUCER
    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
    Topic topic = new Topic(TOPIC_NAME); // Topic name is case sensitive.
    Host host = new Host(localhost);
    final String role = StaticUserGroupRole.ROLE_1;
    final String group = StaticUserGroupRole.GROUP_1;

  /*
    Permissions Added
    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
    HOST=<hostname>->Topic=<topic name>->action=WRITE
    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
  */
    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
    addPermissions(role, group, KafkaActionConstant.WRITE, authorizables);

    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    Cluster cluster = new Cluster();
    authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName()));
    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
    try {
      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
    } catch (Exception ex) {
      Assert.fail("user1 should have been able to successfully produce to topic " + TOPIC_NAME + ". \n Exception: " + ex);
    }

  // START TESTING CONSUMER
  /*
    Permissions Added
    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
    HOST=<hostname>->Topic=<topic name>->action=WRITE
    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
    HOST=<hostname>->Topic=<topic name>->action=READ
  */
    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);

    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer");
    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName()));
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
    try {
      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
    } catch (Exception ex) {
      Assert.fail("user1 should have been able to successfully read from topic " + TOPIC_NAME + ". \n Exception: " + ex);

    }
  }

  @Test
  public void testConsumeCycleWithInsufficientPrivileges() throws Exception {
    LOGGER.debug("testConsumeCycleWithInsufficientPrivileges");
    final String TOPIC_NAME = "tOpIc4";
    final String localhost = InetAddress.getLocalHost().getHostAddress();
    SentryGenericServiceClientFactory.factoryReset();

    // START TESTING PRODUCER
    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
    Topic topic = new Topic(TOPIC_NAME); // Topic name is case sensitive.
    Host host = new Host(localhost);
    final String role = StaticUserGroupRole.ROLE_1;
    final String group = StaticUserGroupRole.GROUP_1;

  /*
    Permissions Added
    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
    HOST=<hostname>->Topic=<topic name>->action=WRITE
    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
  */

    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
    addPermissions(role, group, KafkaActionConstant.WRITE, authorizables);

    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    Cluster cluster = new Cluster();
    authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName()));
    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
    try {
      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
    } catch (Exception ex) {
      Assert.fail("user1 should have been able to successfully produce to topic " + TOPIC_NAME + ". \n Exception: " + ex);
    }
  /*
    Permissions Added
    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
    HOST=<hostname>->Topic=<topic name>->action=WRITE
    HOST=<hostname>->Cluster=<cluster name>->action=CREATE

    Permissions Missing
    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
    HOST=<hostname>->Topic=<topic name>->action=READ
  */
    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer");
    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName()));
    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
    try {
      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
      Assert.fail("user1 must not have been authorized to read consumer group sentrykafkaconsumer.");
    } catch (Exception ex) {
      assertCausedMessage(ex, "Not authorized to access topics: [" + TOPIC_NAME + "]");
    }

  /*
    Permissions Added
    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
    HOST=<hostname>->Topic=<topic name>->action=WRITE
    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ

    Missing Permissions
    HOST=<hostname>->Topic=<topic name>->action=READ
  */
    authorizables = new ArrayList<TAuthorizable>();
    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName()));
    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
    try {
      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
      Assert.fail("user1 must not have been authorized to read from topic " + TOPIC_NAME + ".");
    } catch (Exception ex) {
      assertCausedMessage(ex, "Not authorized to access topics: [" + TOPIC_NAME + "]");
    }
  }

  private void addPermissions(String role, String group, String action, ArrayList<TAuthorizable> authorizables)
          throws Exception {
    SentryGenericServiceClient sentryClient = getSentryClient();
    try {
      sentryClient.createRoleIfNotExist(ADMIN_USER, role, COMPONENT);
      sentryClient.addRoleToGroups(ADMIN_USER, role, COMPONENT, Sets.newHashSet(group));

      sentryClient.grantPrivilege(ADMIN_USER, role, COMPONENT,
              new TSentryPrivilege(COMPONENT, "kafka", authorizables,
                      action));
    } finally {
      if (sentryClient != null) {
        sentryClient.close();
        sentryClient = null;
      }
    }
    sleepIfCachingEnabled();
  }

  private void testProduce(String topic, String producerUser) throws Exception {
    final KafkaProducer<String, String> kafkaProducer = createKafkaProducer(producerUser);
    try {
      final String msg = "message1";
      ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, msg);
      kafkaProducer.send(producerRecord).get();
      LOGGER.debug("Sent message: " + producerRecord);
    } finally {
      kafkaProducer.close();
    }
  }

  private void testConsume(String topic, String consumerUser) throws Exception {
    final KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer(consumerUser);
    try {
      final String msg = "message1";
      kafkaConsumer.subscribe(Collections.singletonList(topic), new CustomRebalanceListener(kafkaConsumer));
      waitTillTrue("Did not receive expected message.", 60, 2, new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
          ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
          if (records.isEmpty()) {
            LOGGER.debug("No record received from consumer.");
          }
          for (ConsumerRecord<String, String> record : records) {
            if (record.value().equals(msg)) {
              LOGGER.debug("Received message: " + record);
              return true;
            }
          }
          return false;
        }
      });
    } finally {
      kafkaConsumer.close();
    }
  }

  private KafkaProducer<String, String> createKafkaProducer(String user) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "SentryKafkaProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");

    return new KafkaProducer<String, String>(props);
  }

  private KafkaConsumer<String, String> createKafkaConsumer(String user) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "sentrykafkaconsumer");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");

    return new KafkaConsumer<String, String>(props);
  }

  /**
   * Wait for a condition to succeed up to specified time.
   *
   * @param failureMessage Message to be displayed on failure.
   * @param maxWaitTime    Max waiting time for success in seconds.
   * @param loopInterval   Wait time between checks in seconds.
   * @param testFunc       Check to be performed for success, should return boolean.
   * @throws Exception
   */
  private void waitTillTrue(
          String failureMessage, long maxWaitTime, long loopInterval, Callable<Boolean> testFunc)
          throws Exception {
    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime <= maxWaitTime * 1000L) {
      if (testFunc.call()) {
        return; // Success
      }
      Thread.sleep(loopInterval * 1000L);
    }

    Assert.fail(failureMessage);
  }

  private static class CustomRebalanceListener implements ConsumerRebalanceListener {

    private KafkaConsumer<String, String>  consumer = null;

    CustomRebalanceListener(KafkaConsumer<String, String>  kafkaConsumer) {
      consumer = kafkaConsumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
      consumer.seekToBeginning(collection);
    }
  }
}
