blob: a3ae0fc18ebc8ad073c1602206bbcb512acbf50e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.tests.e2e.hive;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashSet;
import com.google.common.collect.Sets;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
import org.apache.sentry.api.common.ApiConstants.ClientConfig;
import org.apache.sentry.tests.e2e.hive.fs.TestFSContants;
import org.fest.reflect.core.Reflection;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl;
import org.apache.sentry.core.model.db.DBModelAction;
import org.apache.sentry.core.model.db.DBModelAuthorizable;
import org.apache.sentry.core.model.db.DBModelAuthorizables;
import org.apache.sentry.provider.db.SimpleDBProviderBackend;
import org.apache.sentry.api.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.file.PolicyFile;
import org.apache.sentry.service.thrift.KerberosConfiguration;
import org.apache.sentry.service.thrift.SentryServiceClientFactory;
import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
import org.apache.sentry.tests.e2e.hive.fs.DFS;
import org.apache.sentry.tests.e2e.hive.fs.DFSFactory;
import org.apache.sentry.tests.e2e.hive.hiveserver.HiveServer;
import org.apache.sentry.tests.e2e.hive.hiveserver.HiveServerFactory;
import org.apache.sentry.tests.e2e.minisentry.SentrySrvFactory;
import org.apache.sentry.tests.e2e.minisentry.SentrySrvFactory.SentrySrvType;
import org.apache.sentry.tests.e2e.minisentry.SentrySrv;
import org.apache.tools.ant.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.LoginContext;
import static org.apache.sentry.core.common.utils.SentryConstants.AUTHORIZABLE_SPLITTER;
import static org.apache.sentry.core.common.utils.SentryConstants.PRIVILEGE_PREFIX;
import static org.apache.sentry.core.common.utils.SentryConstants.ROLE_SPLITTER;
public abstract class AbstractTestWithStaticConfiguration extends RulesForE2ETest {
private static final Logger LOGGER = LoggerFactory
.getLogger(AbstractTestWithStaticConfiguration.class);
protected static final String SINGLE_TYPE_DATA_FILE_NAME = "kv1.dat";
protected static final String ALL_DB1 = "server=server1->db=db_1",
ALL_DB2 = "server=server1->db=db_2",
SELECT_DB1_TBL1 = "server=server1->db=db_1->table=tb_1->action=select",
SELECT_DB1_TBL2 = "server=server1->db=db_1->table=tb_2->action=select",
SELECT_DB1_NONTABLE = "server=server1->db=db_1->table=blahblah->action=select",
INSERT_DB1_TBL1 = "server=server1->db=db_1->table=tb_1->action=insert",
SELECT_DB2_TBL2 = "server=server1->db=db_2->table=tb_2->action=select",
INSERT_DB2_TBL1 = "server=server1->db=db_2->table=tb_1->action=insert",
SELECT_DB1_VIEW1 = "server=server1->db=db_1->table=view_1->action=select",
ADMIN1 = StaticUserGroup.ADMIN1,
ADMINGROUP = StaticUserGroup.ADMINGROUP,
USER1_1 = StaticUserGroup.USER1_1,
USER1_2 = StaticUserGroup.USER1_2,
USER2_1 = StaticUserGroup.USER2_1,
USER3_1 = StaticUserGroup.USER3_1,
USER4_1 = StaticUserGroup.USER4_1,
USER5_1 = StaticUserGroup.USER5_1,
USERGROUP1 = StaticUserGroup.USERGROUP1,
USERGROUP2 = StaticUserGroup.USERGROUP2,
USERGROUP3 = StaticUserGroup.USERGROUP3,
USERGROUP4 = StaticUserGroup.USERGROUP4,
USERGROUP5 = StaticUserGroup.USERGROUP5,
GROUP1_ROLE = "group1_role",
DB1 = "db_1",
DB2 = "db_2",
DB3 = "db_3",
TBL1 = "tb_1",
TBL2 = "tb_2",
TBL3 = "tb_3",
VIEW1 = "view_1",
VIEW2 = "view_2",
VIEW3 = "view_3",
INDEX1 = "index_1",
DEFAULT = "default";
protected static final String SERVER_HOST = "localhost";
private static final String EXTERNAL_SENTRY_SERVICE = "sentry.e2etest.external.sentry";
protected static final String EXTERNAL_HIVE_LIB = "sentry.e2etest.hive.lib";
protected static boolean policyOnHdfs = false;
protected static boolean defaultFSOnHdfs = false;
protected static boolean useSentryService = false;
protected static boolean setMetastoreListener = true;
protected static String testServerType = null;
protected static boolean enableHiveConcurrency = false;
protected static boolean enableAuthorizingObjectStore = true;
protected static boolean enableAuthorizeReadMetaData = false;
protected static boolean enableFilter = false;
protected static int hmsFollowerIntervalInMilliseconds = 10000;
// indicate if the database need to be clear for every test case in one test class
protected static boolean clearDbPerTest = true;
protected static boolean showDbOnSelectOnly = false;
protected static boolean showTableOnSelectOnly = false;
protected static boolean restrictDefaultDatabase = false;
protected static String cacheClassName = HiveAuthzConf.AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getDefault();
protected static File baseDir;
protected static File logDir;
protected static File confDir;
protected static File dataDir;
protected static File policyFileLocation;
protected static HiveServer hiveServer;
protected static FileSystem fileSystem;
protected static HiveServerFactory.HiveServer2Type hiveServer2Type;
protected static DFS dfs;
protected static Map<String, String> properties;
protected static SentrySrv sentryServer;
protected static Configuration sentryConf;
protected static boolean enableNotificationLog = true;
protected static Context context;
protected final String semanticException = "SemanticException No valid privileges";
protected static boolean clientKerberos = false;
protected static String REALM = System.getProperty("sentry.service.realm", "EXAMPLE.COM");
protected static final String SERVER_KERBEROS_NAME = "sentry/" + SERVER_HOST + "@" + REALM;
protected static final String SERVER_KEY_TAB = System.getProperty("sentry.service.server.keytab");
private static LoginContext clientLoginContext;
protected static SentryPolicyServiceClient client;
private static boolean startSentry = Boolean.getBoolean(EXTERNAL_SENTRY_SERVICE);
protected static boolean enableHDFSAcls = false;
protected static String dfsType;
/**
* Get sentry client with authenticated Subject
* (its security-related attributes(for example, kerberos principal and key)
* @param clientShortName
* @param clientKeyTabDir
* @return client's Subject
*/
public static Subject getClientSubject(String clientShortName, String clientKeyTabDir) {
String clientKerberosPrincipal = clientShortName + "@" + REALM;
File clientKeyTabFile = new File(clientKeyTabDir);
Subject clientSubject = new Subject(false, Sets.newHashSet(
new KerberosPrincipal(clientKerberosPrincipal)), new HashSet<Object>(),
new HashSet<Object>());
try {
clientLoginContext = new LoginContext("", clientSubject, null,
KerberosConfiguration.createClientConfig(clientKerberosPrincipal, clientKeyTabFile));
clientLoginContext.login();
} catch (Exception ex) {
LOGGER.error("Exception: " + ex);
}
clientSubject = clientLoginContext.getSubject();
return clientSubject;
}
public static void createContext() throws Exception {
context = new Context(hiveServer, fileSystem,
baseDir, dataDir, policyFileLocation);
}
protected void dropDb(String user, String...dbs) throws Exception {
Connection connection = context.createConnection(user);
Statement statement = connection.createStatement();
for(String db : dbs) {
exec(statement, "DROP DATABASE IF EXISTS " + db + " CASCADE");
}
statement.close();
connection.close();
}
protected void createDb(String user, String...dbs) throws Exception {
Connection connection = context.createConnection(user);
Statement statement = connection.createStatement();
ArrayList<String> allowedDBs = new ArrayList<String>(Arrays.asList(DB1, DB2, DB3));
for(String db : dbs) {
Assert.assertTrue(db + " is not part of known test dbs which will be cleaned up after the test", allowedDBs.contains(db));
exec(statement, "CREATE DATABASE " + db);
}
statement.close();
connection.close();
}
protected void createTable(String user, String db, File dataFile, String...tables)
throws Exception {
Connection connection = context.createConnection(user);
Statement statement = connection.createStatement();
exec(statement, "USE " + db);
for(String table : tables) {
exec(statement, "DROP TABLE IF EXISTS " + table);
exec(statement, "create table " + table
+ " (under_col int comment 'the under column', value string)");
if(dataFile != null) {
exec(statement, "load data local inpath '" + dataFile.getPath()
+ "' into table " + table);
ResultSet res = statement.executeQuery("select * from " + table);
Assert.assertTrue("Table should have data after load", res.next());
res.close();
}
}
statement.close();
connection.close();
}
protected static File assertCreateDir(File dir) {
if(!dir.isDirectory()) {
Assert.assertTrue("Failed creating " + dir, dir.mkdirs());
}
return dir;
}
@BeforeClass
public static void setupTestStaticConfiguration() throws Exception {
LOGGER.info("AbstractTestWithStaticConfiguration setupTestStaticConfiguration");
properties = Maps.newHashMap();
if(!policyOnHdfs) {
policyOnHdfs = Boolean.valueOf(System.getProperty("sentry.e2etest.policyonhdfs", "false"));
}
if (testServerType != null) {
properties.put("sentry.e2etest.hiveServer2Type", testServerType);
}
baseDir = Files.createTempDir();
LOGGER.info("BaseDir = " + baseDir);
logDir = assertCreateDir(new File(baseDir, "log"));
confDir = assertCreateDir(new File(baseDir, "etc"));
dataDir = assertCreateDir(new File(baseDir, "data"));
policyFileLocation = new File(confDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME);
dfsType = System.getProperty(TestFSContants.SENTRY_E2E_TEST_DFS_TYPE,
DFSFactory.DFSType.MiniDFS.toString());
dfs = DFSFactory.create(dfsType, baseDir, testServerType, enableHDFSAcls);
fileSystem = dfs.getFileSystem();
PolicyFile policyFile = PolicyFile.setAdminOnServer1(ADMIN1)
.setUserGroupMapping(StaticUserGroup.getStaticMapping());
policyFile.write(policyFileLocation);
String policyURI;
if (policyOnHdfs) {
String dfsUri = FileSystem.getDefaultUri(fileSystem.getConf()).toString();
LOGGER.info("dfsUri " + dfsUri);
policyURI = dfsUri + System.getProperty("sentry.e2etest.hive.policy.location",
"/user/hive/sentry");
policyURI += "/" + HiveServerFactory.AUTHZ_PROVIDER_FILENAME;
} else {
policyURI = policyFileLocation.getPath();
}
if (enableAuthorizingObjectStore) {
properties.put(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname,
"org.apache.sentry.binding.metastore.AuthorizingObjectStore");
}else {
properties.put(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, ConfVars.METASTORE_RAW_STORE_IMPL.defaultStrVal);
}
if (enableFilter) {
properties.put(ConfVars.METASTORE_FILTER_HOOK.varname, "org.apache.sentry.binding.metastore.SentryMetaStoreFilterHook");
} else {
properties.put(ConfVars.METASTORE_FILTER_HOOK.varname, ConfVars.METASTORE_FILTER_HOOK.defaultStrVal);
}
if (enableAuthorizeReadMetaData) {
properties.put("senry.metastore.read.authorization.enabled", "true");
} else {
properties.put("senry.metastore.read.authorization.enabled", "false");
}
if (enableHiveConcurrency) {
properties.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
properties.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
properties.put(HiveConf.ConfVars.HIVE_LOCK_MANAGER.varname,
"org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager");
}
properties.put(AuthzConfVars.SHOWDATABASES_ON_SELECT_ONLY.getVar(), String.valueOf(showDbOnSelectOnly));
properties.put(AuthzConfVars.SHOWTABLES_ON_SELECT_ONLY.getVar(), String.valueOf(showTableOnSelectOnly));
properties.put(AuthzConfVars.AUTHZ_RESTRICT_DEFAULT_DB.getVar(), String.valueOf(restrictDefaultDatabase));
properties.put(AuthzConfVars.AUTHZ_PRIVILEGE_CACHE.getVar(), cacheClassName);
if (useSentryService && (!startSentry)) {
configureHiveAndMetastoreForSentry();
setupSentryService();
}
if (defaultFSOnHdfs) {
properties.put("fs.defaultFS", fileSystem.getUri().toString());
}
hiveServer = create(properties, baseDir, confDir, logDir, policyURI, fileSystem);
hiveServer.start();
createContext();
// Create tmp as scratch dir if it doesn't exist
Path tmpPath = new Path("/tmp");
if (!fileSystem.exists(tmpPath)) {
fileSystem.mkdirs(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
}
public static HiveServer create(Map<String, String> properties,
File baseDir, File confDir, File logDir, String policyFile,
FileSystem fileSystem) throws Exception {
String type = properties.get(HiveServerFactory.HIVESERVER2_TYPE);
if(type == null) {
type = System.getProperty(HiveServerFactory.HIVESERVER2_TYPE);
}
if(type == null) {
type = HiveServerFactory.HiveServer2Type.InternalHiveServer2.name();
}
hiveServer2Type = HiveServerFactory.HiveServer2Type.valueOf(type.trim());
return HiveServerFactory.create(hiveServer2Type, properties,
baseDir, confDir, logDir, policyFile, fileSystem);
}
protected static void writePolicyFile(PolicyFile policyFile) throws Exception {
policyFile.write(context.getPolicyFile());
if(policyOnHdfs) {
LOGGER.info("use policy file on HDFS");
dfs.writePolicyFile(context.getPolicyFile());
} else if(useSentryService) {
LOGGER.info("use sentry service, granting permissions");
grantPermissions(policyFile);
}
}
private static void grantPermissions(PolicyFile policyFile) throws Exception {
Connection connection = context.createConnection(ADMIN1);
Statement statement = context.createStatement(connection);
// remove existing metadata
ResultSet resultSet = statement.executeQuery("SHOW ROLES");
while( resultSet.next()) {
Statement statement1 = context.createStatement(connection);
String roleName = resultSet.getString(1).trim();
if(!roleName.equalsIgnoreCase("admin_role")) {
LOGGER.info("Dropping role :" + roleName);
statement1.execute("DROP ROLE " + roleName);
}
}
// create roles and add privileges
for (Map.Entry<String, Collection<String>> roleEntry : policyFile.getRolesToPermissions()
.asMap().entrySet()) {
String roleName = roleEntry.getKey();
if(!roleEntry.getKey().equalsIgnoreCase("admin_role")){
LOGGER.info("Creating role : " + roleName);
exec(statement, "CREATE ROLE " + roleName);
for (String privilege : roleEntry.getValue()) {
addPrivilege(roleEntry.getKey(), privilege, statement);
}
}
}
// grant roles to groups
for (Map.Entry<String, Collection<String>> groupEntry : policyFile.getGroupsToRoles().asMap()
.entrySet()) {
for (String roleNames : groupEntry.getValue()) {
for (String roleName : roleNames.split(",")) {
String sql = "GRANT ROLE " + roleName + " TO GROUP " + groupEntry.getKey();
LOGGER.info("Granting role to group: " + sql);
exec(statement, sql);
}
}
}
}
private static void addPrivilege(String roleName, String privileges, Statement statement)
throws Exception {
String serverName = null, dbName = null, tableName = null, uriPath = null, columnName = null;
String action = "ALL";//AccessConstants.ALL;
for (String privilege : ROLE_SPLITTER.split(privileges)) {
for(String section : AUTHORIZABLE_SPLITTER.split(privilege)) {
// action is not an authorizeable
if(!section.toLowerCase().startsWith(PRIVILEGE_PREFIX)) {
DBModelAuthorizable dbAuthorizable = DBModelAuthorizables.from(section);
if(dbAuthorizable == null) {
throw new IOException("Unknown Auth type " + section);
}
if (DBModelAuthorizable.AuthorizableType.Server.equals(dbAuthorizable.getAuthzType())) {
serverName = dbAuthorizable.getName();
} else if (DBModelAuthorizable.AuthorizableType.Db.equals(dbAuthorizable.getAuthzType())) {
dbName = dbAuthorizable.getName();
} else if (DBModelAuthorizable.AuthorizableType.Table.equals(dbAuthorizable.getAuthzType())) {
tableName = dbAuthorizable.getName();
} else if (DBModelAuthorizable.AuthorizableType.Column.equals(dbAuthorizable.getAuthzType())) {
columnName = dbAuthorizable.getName();
} else if (DBModelAuthorizable.AuthorizableType.URI.equals(dbAuthorizable.getAuthzType())) {
uriPath = dbAuthorizable.getName();
} else {
throw new IOException("Unsupported auth type " + dbAuthorizable.getName()
+ " : " + dbAuthorizable.getTypeName());
}
} else {
action = DBModelAction.valueOf(
StringUtils.removePrefix(section, PRIVILEGE_PREFIX).toUpperCase())
.toString();
}
}
LOGGER.info("addPrivilege");
if (columnName != null) {
exec(statement, "CREATE DATABASE IF NOT EXISTS " + dbName);
exec(statement, "USE " + dbName);
String sql = "GRANT " + action + " ( " + columnName + " ) ON TABLE " + tableName + " TO ROLE " + roleName;
LOGGER.info("Granting column level privilege: database = " + dbName + ", sql = " + sql);
exec(statement, sql);
} else if (tableName != null) {
exec(statement, "CREATE DATABASE IF NOT EXISTS " + dbName);
exec(statement, "USE " + dbName);
String sql = "GRANT " + action + " ON TABLE " + tableName + " TO ROLE " + roleName;
LOGGER.info("Granting table level privilege: database = " + dbName + ", sql = " + sql);
exec(statement, sql);
} else if (dbName != null) {
String sql = "GRANT " + action + " ON DATABASE " + dbName + " TO ROLE " + roleName;
LOGGER.info("Granting db level privilege: " + sql);
exec(statement, sql);
} else if (uriPath != null) {
String sql = "GRANT " + action + " ON URI '" + uriPath + "' TO ROLE " + roleName;
LOGGER.info("Granting uri level privilege: " + sql);
exec(statement, sql);//ALL?
} else if (serverName != null) {
String sql = "GRANT ALL ON SERVER " + serverName + " TO ROLE " + roleName;
LOGGER.info("Granting server level privilege: " + sql);
exec(statement, sql);
}
}
}
private static int findPort() throws IOException {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
return port;
}
private static HiveConf configureHiveAndMetastoreForSentry() throws IOException, InterruptedException {
HiveConf hiveConf = new HiveConf();
int hmsPort = findPort();
LOGGER.info("\n\n HMS port : " + hmsPort + "\n\n");
// Sets hive.metastore.authorization.storage.checks to true, so that
// disallow the operations such as drop-partition if the user in question
// doesn't have permissions to delete the corresponding directory
// on the storage.
hiveConf.set("hive.metastore.authorization.storage.checks", "true");
hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort);
hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check
hiveConf.set("datanucleus.schema.autoCreateTables", "true");
File confDir = assertCreateDir(new File(baseDir, "etc"));
File hiveSite = new File(confDir, "hive-site.xml");
hiveConf.set("hive.server2.enable.doAs", "false");
OutputStream out = new FileOutputStream(hiveSite);
hiveConf.writeXml(out);
out.close();
Reflection.staticField("hiveSiteURL")
.ofType(URL.class)
.in(HiveConf.class)
.set(hiveSite.toURI().toURL());
return hiveConf;
}
private static void setupSentryService() throws Exception {
sentryConf = new Configuration(true);
// HMS is not started in this class, and tests based on this class does not use HMS
// set the interval that HMS client contacts HMS to reduce connection exception in log
properties.put(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, String.valueOf(hmsFollowerIntervalInMilliseconds));
properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND,
SimpleDBProviderBackend.class.getName());
properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname,
SentryHiveAuthorizationTaskFactoryImpl.class.getName());
properties
.put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2");
properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
properties.put(ServerConfig.ADMIN_GROUPS, ADMINGROUP);
properties.put(ServerConfig.RPC_ADDRESS, SERVER_HOST);
properties.put(ServerConfig.RPC_PORT, String.valueOf(0));
properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
properties.put(ServerConfig.SENTRY_STORE_JDBC_URL,
"jdbc:derby:;databaseName=" + baseDir.getPath()
+ "/sentrystore_db;create=true");
properties.put(ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy");
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
properties.put(ServerConfig.RPC_MIN_THREADS, "3");
for (Map.Entry<String, String> entry : properties.entrySet()) {
sentryConf.set(entry.getKey(), entry.getValue());
}
sentryServer = SentrySrvFactory.create(
SentrySrvType.INTERNAL_SERVER, sentryConf, 1);
properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.get(0)
.getAddress()
.getHostName());
sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.get(0)
.getAddress()
.getHostName());
properties.put(ClientConfig.SERVER_RPC_PORT,
String.valueOf(sentryServer.get(0).getAddress().getPort()));
sentryConf.set(ClientConfig.SERVER_RPC_PORT,
String.valueOf(sentryServer.get(0).getAddress().getPort()));
startSentryService();
if (setMetastoreListener) {
LOGGER.info("setMetastoreListener is enabled");
properties.put(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS.varname,
DbNotificationListener.class.getName());
properties.put(ConfVars.METASTORE_EVENT_MESSAGE_FACTORY.varname,
SentryJSONMessageFactory.class.getName());
}
}
private static void startSentryService() throws Exception {
sentryServer.startAll();
}
public static SentryPolicyServiceClient getSentryClient() throws Exception {
if (sentryServer == null) {
throw new IllegalAccessException("Sentry service not initialized");
}
return SentryServiceClientFactory.create(sentryServer.get(0).getConf());
}
/**
* Get Sentry authorized client to communicate with sentry server,
* the client can be for a minicluster, real distributed cluster,
* sentry server can use policy file or it's a service.
* @param clientShortName: principal prefix string
* @param clientKeyTabDir: authorization key path
* @return sentry client to talk to sentry server
* @throws Exception
*/
public static SentryPolicyServiceClient getSentryClient(String clientShortName,
String clientKeyTabDir) throws Exception {
if (!startSentry) {
LOGGER.info("Running on a minicluser env.");
return getSentryClient();
}
if (clientKerberos) {
if (sentryConf == null ) {
sentryConf = new Configuration(true);
}
final String SENTRY_HOST = System.getProperty("sentry.host", SERVER_HOST);
final String SERVER_KERBEROS_PRINCIPAL = "sentry/" + SENTRY_HOST + "@" + REALM;
sentryConf.set(ServerConfig.PRINCIPAL, SERVER_KERBEROS_PRINCIPAL);
sentryConf.set(ServerConfig.KEY_TAB, SERVER_KEY_TAB);
sentryConf.set(ServerConfig.ALLOW_CONNECT, "hive");
sentryConf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "false");
sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS,
System.getProperty("sentry.service.server.rpc.address"));
sentryConf.set(ClientConfig.SERVER_RPC_PORT,
System.getProperty("sentry.service.server.rpc.port", "8038"));
sentryConf.set(ClientConfig.SERVER_RPC_CONN_TIMEOUT, "720000"); //millis
Subject clientSubject = getClientSubject(clientShortName, clientKeyTabDir);
client = Subject.doAs(clientSubject,
new PrivilegedExceptionAction<SentryPolicyServiceClient>() {
@Override
public SentryPolicyServiceClient run() throws Exception {
return SentryServiceClientFactory.create(sentryConf);
}
});
} else {
client = getSentryClient();
}
return client;
}
@Before
public void setup() throws Exception{
LOGGER.info("AbstractTestStaticConfiguration setup");
dfs.createBaseDir();
if (clearDbPerTest) {
LOGGER.info("Before per test run clean up");
clearAll(true);
}
}
@After
public void clearAfterPerTest() throws Exception {
LOGGER.info("AbstractTestStaticConfiguration clearAfterPerTest");
if (clearDbPerTest) {
LOGGER.info("After per test run clean up");
clearAll(true);
}
}
protected static void clearAll(boolean clearDb) throws Exception {
LOGGER.info("About to run clearAll");
ResultSet resultSet;
Connection connection = context.createConnection(ADMIN1);
Statement statement = context.createStatement(connection);
if (clearDb) {
LOGGER.info("About to clear all databases and default database tables");
resultSet = execQuery(statement, "SHOW DATABASES");
while (resultSet.next()) {
String db = resultSet.getString(1);
if (!db.equalsIgnoreCase("default")) {
try (Statement statement1 = context.createStatement(connection)) {
exec(statement1, "DROP DATABASE IF EXISTS " + db + " CASCADE");
} catch (Exception ex) {
// For database and tables managed by other processes than Sentry
// drop them might run into exception
LOGGER.error("Exception: " + ex);
}
}
}
if (resultSet != null) { resultSet.close(); }
exec(statement, "USE default");
resultSet = execQuery(statement, "SHOW TABLES");
while (resultSet.next()) {
try (Statement statement1 = context.createStatement(connection)) {
exec(statement1, "DROP TABLE IF EXISTS " + resultSet.getString(1));
} catch (Exception ex) {
// For table managed by other processes than Sentry
// drop it might run into exception
LOGGER.error("Exception: " + ex);
}
}
if (resultSet != null) { resultSet.close(); }
}
if(useSentryService) {
LOGGER.info("About to clear all roles");
resultSet = execQuery(statement, "SHOW ROLES");
while (resultSet.next()) {
try (Statement statement1 = context.createStatement(connection)) {
String role = resultSet.getString(1);
if (!role.toLowerCase().contains("admin")) {
exec(statement1, "DROP ROLE " + role);
}
}
}
if (resultSet != null) { resultSet.close(); }
}
if (statement != null) { statement.close(); }
if (connection != null) { connection.close(); }
}
protected static void setupAdmin() throws Exception {
if(useSentryService) {
LOGGER.info("setupAdmin to create admin_role");
Connection connection = context.createConnection(ADMIN1);
Statement statement = connection.createStatement();
try {
exec(statement, "CREATE ROLE admin_role");
} catch ( Exception e) {
//It is ok if admin_role already exists
}
exec(statement, "GRANT ALL ON SERVER "
+ HiveServerFactory.DEFAULT_AUTHZ_SERVER_NAME + " TO ROLE admin_role");
exec(statement, "GRANT ROLE admin_role TO GROUP " + ADMINGROUP);
statement.close();
connection.close();
}
}
protected PolicyFile setupPolicy() throws Exception {
LOGGER.info("Pre create policy file with admin group mapping");
PolicyFile policyFile = PolicyFile.setAdminOnServer1(ADMINGROUP);
policyFile.setUserGroupMapping(StaticUserGroup.getStaticMapping());
writePolicyFile(policyFile);
return policyFile;
}
@AfterClass
public static void tearDownTestStaticConfiguration() throws Exception {
if(hiveServer != null) {
hiveServer.shutdown();
hiveServer = null;
}
if (sentryServer != null) {
sentryServer.close();
sentryServer = null;
}
if(baseDir != null) {
if(System.getProperty(HiveServerFactory.KEEP_BASEDIR) == null) {
FileUtils.deleteQuietly(baseDir);
}
baseDir = null;
}
if(dfs != null) {
try {
dfs.tearDown();
} catch (Exception e) {
LOGGER.info("Exception shutting down dfs", e);
}
}
if (context != null) {
context.close();
}
}
public static SentrySrv getSentrySrv() {
return sentryServer;
}
/**
* A convenience method to validate:
* if expected is equivalent to returned;
* Firstly check if each expected item is in the returned list;
* Secondly check if each returned item in in the expected list.
*/
protected void validateReturnedResult(List<String> expected, List<String> returned) {
for (String obj : expected) {
Assert.assertTrue("expected " + obj + " not found in the returned list: " + returned.toString(),
returned.contains(obj));
}
for (String obj : returned) {
Assert.assertTrue("returned " + obj + " not found in the expected list: " + expected.toString(),
expected.contains(obj));
}
}
/**
* A convenient function to run a sequence of sql commands
* @param user
* @param sqls
* @throws Exception
*/
protected static void execBatch(String user, List<String> sqls) throws Exception {
Connection conn = context.createConnection(user);
Statement stmt = context.createStatement(conn);
for (String sql : sqls) {
exec(stmt, sql);
}
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}
/**
* A convenient funciton to run one sql with log
* @param stmt
* @param sql
* @throws Exception
*/
protected static void exec(Statement stmt, String sql) throws Exception {
if (stmt == null) {
LOGGER.error("Statement is null");
return;
}
LOGGER.info("Running [" + sql + "]");
stmt.execute(sql);
}
/**
* A convenient funciton to execute query with log then return ResultSet
* @param stmt
* @param sql
* @return ResetSet
* @throws Exception
*/
protected static ResultSet execQuery(Statement stmt, String sql) throws Exception {
LOGGER.info("Running [" + sql + "]");
return stmt.executeQuery(sql);
}
}