| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sentry.tests.e2e.kafka; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.sentry.core.model.kafka.Cluster; |
| import org.apache.sentry.core.model.kafka.KafkaActionConstant; |
| import org.apache.sentry.core.model.kafka.Host; |
| import org.apache.sentry.kafka.conf.KafkaAuthConf; |
| import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; |
| import org.apache.sentry.provider.db.generic.UpdatableCache; |
| import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; |
| import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; |
| import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; |
| import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; |
| import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; |
| import org.apache.sentry.provider.file.PolicyFile; |
| import org.apache.sentry.service.thrift.SentryService; |
| import org.apache.sentry.service.thrift.SentryServiceFactory; |
| import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; |
| import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeoutException; |
| |
| import static org.junit.Assert.assertTrue; |
| |
| |
| /** |
| * This class used to test the Kafka integration with Sentry. |
| */ |
| public class AbstractKafkaSentryTestBase { |
| |
| protected static final String COMPONENT = "kafka"; |
| protected static final String ADMIN_USER = "kafka"; |
| protected static final String ADMIN_GROUP = "group_kafka"; |
| protected static final String ADMIN_ROLE = "role_kafka"; |
| |
| protected static SentryService sentryServer; |
| protected static File sentrySitePath; |
| |
| protected static File baseDir; |
| protected static File dbDir; |
| protected static File policyFilePath; |
| |
| protected static PolicyFile policyFile; |
| |
| protected static String bootstrapServers = null; |
| protected static KafkaTestServer kafkaServer = null; |
| |
| private static final int CACHE_TTL_MS = 1; |
| private static final int SAFETY_FACTOR = 2; // Sleep for specified times of expected time for an operation to complete. |
| |
| @BeforeClass |
| public static void beforeTestEndToEnd() throws Exception { |
| // Stop background update thread |
| UpdatableCache.disable(); |
| setupConf(); |
| startSentryServer(); |
| // We started a new server, invalidate all connections to the old one |
| SentryGenericServiceClientFactory.factoryReset(); |
| setUserGroups(); |
| setAdminPrivilege(); |
| startKafkaServer(); |
| } |
| |
| @AfterClass |
| public static void afterTestEndToEnd() throws Exception { |
| // Stop background update thread |
| UpdatableCache.disable(); |
| stopKafkaServer(); |
| stopSentryServer(); |
| } |
| |
| private static void stopKafkaServer() { |
| if (kafkaServer != null) { |
| kafkaServer.shutdown(); |
| kafkaServer = null; |
| } |
| } |
| |
| private static void stopSentryServer() throws Exception { |
| if (sentryServer != null) { |
| sentryServer.stop(); |
| sentryServer = null; |
| } |
| |
| FileUtils.deleteDirectory(baseDir); |
| } |
| |
| public static void setupConf() throws Exception { |
| baseDir = createTempDir(); |
| sentrySitePath = new File(baseDir, "sentry-site.xml"); |
| dbDir = new File(baseDir, "sentry_policy_db"); |
| policyFilePath = new File(baseDir, "local_policy_file.ini"); |
| policyFile = new PolicyFile(); |
| |
| /** set the configuration for Sentry Service */ |
| Configuration conf = new Configuration(); |
| |
| conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); |
| conf.set(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); |
| conf.set(ServerConfig.ADMIN_GROUPS, Joiner.on(",").join(ADMIN_GROUP, |
| UserGroupInformation.getLoginUser().getPrimaryGroupName())); |
| conf.set(ServerConfig.RPC_PORT, String.valueOf(TestUtils.getFreePort())); |
| conf.set(ServerConfig.RPC_ADDRESS, NetUtils.createSocketAddr( |
| InetAddress.getLocalHost().getHostAddress() + ":" + conf.get(ServerConfig.RPC_PORT)) |
| .getAddress().getCanonicalHostName()); |
| conf.set(ServerConfig.SENTRY_STORE_JDBC_URL, |
| "jdbc:derby:;databaseName=" + dbDir.getPath() + ";create=true"); |
| conf.set(ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy"); |
| conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING, |
| ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); |
| conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, |
| policyFilePath.getPath()); |
| sentryServer = SentryServiceFactory.create(conf); |
| } |
| |
| public static File createTempDir() { |
| File baseDir = new File(System.getProperty("java.io.tmpdir")); |
| String baseName = "kafka-e2e-"; |
| File tempDir = new File(baseDir, baseName + UUID.randomUUID().toString()); |
| if (tempDir.mkdir()) { |
| return tempDir; |
| } |
| throw new IllegalStateException("Failed to create temp directory"); |
| } |
| |
| public static void startSentryServer() throws Exception { |
| sentryServer.start(); |
| final long start = System.currentTimeMillis(); |
| while(!sentryServer.isRunning()) { |
| Thread.sleep(1000); |
| if(System.currentTimeMillis() - start > 60000L) { |
| throw new TimeoutException("Server did not start after 60 seconds"); |
| } |
| } |
| } |
| |
| public static void setUserGroups() throws Exception { |
| for (String user : StaticUserGroupRole.getUsers()) { |
| Set<String> groups = StaticUserGroupRole.getGroups(user); |
| policyFile.addGroupsToUser(user, |
| groups.toArray(new String[groups.size()])); |
| } |
| UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); |
| policyFile.addGroupsToUser(loginUser.getShortUserName(), loginUser.getGroupNames()); |
| |
| policyFile.write(policyFilePath); |
| } |
| |
| public static void setAdminPrivilege() throws Exception { |
| try (SentryGenericServiceClient sentryClient = getSentryClient()){ |
| // grant all privilege to admin user |
| sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); |
| sentryClient.grantRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); |
| final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>(); |
| Host host = new Host(InetAddress.getLocalHost().getHostName()); |
| authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); |
| Cluster cluster = new Cluster(); |
| authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName())); |
| sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, |
| new TSentryPrivilege(COMPONENT, "kafka", authorizables, |
| KafkaActionConstant.ALL)); |
| } |
| } |
| |
| static SentryGenericServiceClient getSentryClient() throws Exception { |
| return SentryGenericServiceClientFactory.create(getClientConfig()); |
| } |
| |
| public static void assertCausedMessage(Exception e, String message) { |
| if (e.getCause() != null) { |
| assertTrue("Expected message: " + message + ", but got: " + e.getCause().getMessage(), e.getCause().getMessage().contains(message)); |
| } else { |
| assertTrue("Expected message: " + message + ", but got: " + e.getMessage(), e.getMessage().contains(message)); |
| } |
| } |
| |
| public static void assertCausedMessages(Exception e, String message1, String message2) { |
| if (e.getCause() != null) { |
| assertTrue("Expected message: " + message1 + " OR " + message2 , |
| (e.getCause().getMessage().contains(message1) || |
| e.getCause().getMessage().contains(message2))); |
| } else { |
| assertTrue("Expected message: " + message1 + " OR " + message2 + ", but got: " + e.getMessage(), |
| (e.getMessage().contains(message1) || |
| e.getMessage().contains(message2))); |
| } |
| } |
| |
| private static Configuration getClientConfig() { |
| Configuration conf = new Configuration(); |
| /** set the Sentry client configuration for Kafka Service integration */ |
| conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); |
| conf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress().getHostName()); |
| conf.setInt(ClientConfig.SERVER_RPC_PORT, sentryServer.getAddress().getPort()); |
| |
| conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER.getVar(), |
| LocalGroupResourceAuthorizationProvider.class.getName()); |
| conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(), |
| SentryGenericProviderBackend.class.getName()); |
| conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), policyFilePath.getPath()); |
| return conf; |
| } |
| |
| private static void startKafkaServer() throws Exception { |
| // Workaround for SentryKafkaAuthorizer to be added to classpath |
| Class.forName("org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer"); |
| getClientConfig().writeXml(new FileOutputStream(sentrySitePath)); |
| |
| kafkaServer = new KafkaTestServer(sentrySitePath); |
| kafkaServer.start(); |
| bootstrapServers = kafkaServer.getBootstrapServers(); |
| } |
| |
| static void sleepIfCachingEnabled() throws InterruptedException { |
| if (getClientConfig().getBoolean(ClientConfig.ENABLE_CACHING, false)) { |
| Thread.sleep(CACHE_TTL_MS * SAFETY_FACTOR); |
| } |
| } |
| } |