SENTRY-1014: Add end-to-end tests for Kafka (Ashish K Singh, Reviewed by: Hao Hao, Anne Yu and Dapeng Sun)

Change-Id: I4398811da2f80e56ab9fa6ae7a9967e4c22d0558
diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
index 129191a..85e7d21 100644
--- a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
+++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
@@ -118,7 +118,7 @@
         }
     }
 
-    public String getBootstrapServers() {
-        return "localhost:" + kafkaPort;
+    public String getBootstrapServers() throws UnknownHostException {
+        return InetAddress.getLocalHost().getHostAddress() + ":" + kafkaPort;
     }
 }
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
new file mode 100644
index 0000000..a5cd3da
--- /dev/null
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.kafka;
+
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class TestAuthorize extends AbstractKafkaSentryTestBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestAuthorize.class);
+
+  @Test
+  public void testProduceConsumeForSuperuser() {
+    try {
+      final String SuperuserName = "test";
+      testProduce(SuperuserName);
+      testConsume(SuperuserName);
+    } catch (Exception ex) {
+      Assert.fail("Superuser must have been allowed to perform any and all actions. \nException: \n" + ex);
+    }
+  }
+
+  @Test
+  public void testProduceConsumeCycle() throws Exception {
+    final String localhost = InetAddress.getLocalHost().getHostAddress();
+
+    // START TESTING PRODUCER
+    try {
+      testProduce("user1");
+      Assert.fail("user1 must not have been authorized to create topic t1.");
+    } catch (ExecutionException ex) {
+      assertCausedMessage(ex, "Not authorized to access topics: [t1]");
+    }
+
+    final String role = StaticUserGroupRole.ROLE_1;
+    final String group = StaticUserGroupRole.GROUP_1;
+
+    // Allow HOST=localhost->Topic=t1->action=describe
+    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
+    Host host = new Host(localhost);
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    Topic topic = new Topic("t1");
+    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
+    try {
+      testProduce("user1");
+      Assert.fail("user1 must not have been authorized to create topic t1.");
+    } catch (ExecutionException ex) {
+      assertCausedMessage(ex, "Not authorized to access topics: [t1]");
+    }
+
+    // Allow HOST=localhost->Cluster=kafka-cluster->action=create
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    Cluster cluster = new Cluster();
+    authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName()));
+    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
+    try {
+      testProduce("user1");
+      Assert.fail("user1 must not have been authorized to create topic t1.");
+    } catch (ExecutionException ex) {
+      assertCausedMessage(ex, "Not authorized to access topics: [t1]");
+    }
+
+    // Allow HOST=localhost->Topic=t1->action=write
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.WRITE, authorizables);
+    try {
+      testProduce("user1");
+    } catch (Exception ex) {
+      Assert.fail("user1 should have been able to successfully produce to topic t1. \n Exception: " + ex);
+    }
+
+    // START TESTING CONSUMER
+    try {
+      testConsume("user1");
+      Assert.fail("user1 must not have been authorized to describe consumer group sentrykafkaconsumer.");
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Not authorized to access group: sentrykafkaconsumer");
+    }
+
+    // HOST=localhost->Group=SentryKafkaConsumer->action=describe
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer");
+    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName()));
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
+    try {
+      testConsume("user1");
+      Assert.fail("user1 must not have been authorized to read consumer group sentrykafkaconsumer.");
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Not authorized to access group: sentrykafkaconsumer");
+    }
+
+    // HOST=localhost->Group=SentryKafkaConsumer->action=read
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName()));
+    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
+    try {
+      testConsume("user1");
+      Assert.fail("user1 must not have been authorized to read from topic t1.");
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Not authorized to access topics: [t1]");
+    }
+
+    // HOST=localhost->Topic=t1->action=read
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
+    testConsume("user1");
+  }
+
+  private void addPermissions(String role, String group, String action, ArrayList<TAuthorizable> authorizables) throws Exception {
+    SentryGenericServiceClient sentryClient = getSentryClient();
+    try {
+      sentryClient.createRoleIfNotExist(ADMIN_USER, role, COMPONENT);
+      sentryClient.addRoleToGroups(ADMIN_USER, role, COMPONENT, Sets.newHashSet(group));
+
+      sentryClient.grantPrivilege(ADMIN_USER, role, COMPONENT,
+          new TSentryPrivilege(COMPONENT, "kafka", authorizables,
+              action));
+    } finally {
+      if (sentryClient != null) {
+        sentryClient.close();
+        sentryClient = null;
+      }
+    }
+  }
+
+  private void testProduce(String producerUser) throws Exception {
+    final KafkaProducer<String, String> kafkaProducer = createKafkaProducer(producerUser);
+    try {
+      final String topic = "t1";
+      final String msg = "message1";
+      ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, msg);
+      kafkaProducer.send(producerRecord).get();
+      LOGGER.debug("Sent message: " + producerRecord);
+    } finally {
+      kafkaProducer.close();
+    }
+  }
+
+  private void testConsume(String consumerUser) throws Exception {
+    final KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer(consumerUser);
+    try {
+      final String topic = "t1";
+      final String msg = "message1";
+      kafkaConsumer.subscribe(Collections.singletonList(topic), new CustomRebalanceListener(kafkaConsumer));
+      waitTillTrue("Did not receive expected message.", 60, 2, new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
+          if (records.isEmpty())
+            LOGGER.debug("No record received from consumer.");
+          for (ConsumerRecord<String, String> record : records) {
+            if (record.value().equals(msg)) {
+              LOGGER.debug("Received message: " + record);
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+    } finally {
+      kafkaConsumer.close();
+    }
+  }
+
+  private KafkaProducer<String, String> createKafkaProducer(String user) {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "SentryKafkaProducer");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
+    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
+    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
+    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
+    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");
+
+    return new KafkaProducer<String, String>(props);
+  }
+
+  private KafkaConsumer<String, String> createKafkaConsumer(String user) {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "sentrykafkaconsumer");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
+    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
+    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
+    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
+    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");
+
+    return new KafkaConsumer<String, String>(props);
+  }
+
+  /**
+   * Wait for a condition to succeed up to specified time.
+   *
+   * @param failureMessage Message to be displayed on failure.
+   * @param maxWaitTime    Max waiting time for success in seconds.
+   * @param loopInterval   Wait time between checks in seconds.
+   * @param testFunc       Check to be performed for success, should return boolean.
+   * @throws Exception
+   */
+  private void waitTillTrue(
+      String failureMessage, long maxWaitTime, long loopInterval, Callable<Boolean> testFunc)
+      throws Exception {
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime <= maxWaitTime * 1000L) {
+      if (testFunc.call()) {
+        return; // Success
+      }
+      Thread.sleep(loopInterval * 1000L);
+    }
+
+    Assert.fail(failureMessage);
+  }
+
+  class CustomRebalanceListener implements ConsumerRebalanceListener {
+
+    KafkaConsumer consumer = null;
+
+    CustomRebalanceListener(KafkaConsumer kafkaConsumer) {
+      consumer = kafkaConsumer;
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
+      for (TopicPartition tp : collection) {
+        consumer.seekToBeginning(tp);
+      }
+    }
+  }
+}
\ No newline at end of file