blob: 310cf6a6fbdf86ff41003b2da5c5e2be9703b004 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.provider.db.service.persistent;
import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_HMS_FETCH_SIZE;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
import org.apache.sentry.core.common.utils.PubSub;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.hdfs.UniquePathsUpdate;
import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.SentryHMSClient;
import org.apache.sentry.service.thrift.HiveConnectionFactory;
import org.apache.sentry.service.thrift.HiveSimpleConnectionFactory;
import org.apache.sentry.service.common.ServiceConstants;
import org.apache.sentry.service.thrift.HMSClient;
import org.apache.sentry.api.service.thrift.TSentryAuthorizable;
import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import javax.security.auth.login.LoginException;
public class TestHMSFollower {
private final static String hiveInstance = "server2";
private final static Configuration configuration = new Configuration();
private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
private final SentryStore sentryStore = Mockito.mock(SentryStore.class);
private static HiveSimpleConnectionFactory hiveConnectionFactory;
private final static HiveConnectionFactory hmsConnectionMock
= Mockito.mock(HiveConnectionFactory.class);
private final static HiveMetaStoreClient hmsClientMock
= Mockito.mock(HiveMetaStoreClient.class);
@BeforeClass
public static void setup() throws IOException, LoginException {
configuration.set(ServerConfig.PRINCIPAL, "sentry/_HOST@TEST.COM");
hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf());
hiveConnectionFactory.init();
configuration.set("sentry.hive.sync.create", "true");
configuration.set(SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true");
enableHdfsSyncInSentry(configuration);
}
@Before
public void setupMocks() throws Exception {
reset(hmsConnectionMock, hmsClientMock, sentryStore);
when(hmsConnectionMock.connect()).thenReturn(new HMSClient(hmsClientMock));
}
@After
public void resetConfig() throws Exception {
enableHdfsSyncInSentry(configuration);
}
private static void enableHdfsSyncInSentry(Configuration conf) {
// enable HDFS sync, so perm and path changes will be saved into DB
conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES,
"org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS,
"org.apache.sentry.hdfs.SentryPlugin");
}
private static void disableHdfsSyncInSentry(Configuration conf) {
// enable HDFS sync, so perm and path changes will be saved into DB
conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "");
conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "");
}
@Test
public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) that Sentry has not processed any notifications, so this
* should trigger a new full HMS snapshot request with the eventId = 1
*/
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
final long HMS_PROCESSED_EVENT_ID = 1L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot because AuthzPathsMapping is empty
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
// Saving notificationID is in the same transaction of saving full snapshot
verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
reset(sentryStore);
// 2nd run should not get a snapshot because is already processed
when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
@Test
public void testPersistAFullSnapshotWhenFullSnapshotTrigger() throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) the following:
*
* HMS client always returns the paths image with the eventId == 1.
*
* On the 1st run: Sentry has not processed any notifications, so this
* should trigger a new full HMS snapshot request with the eventId = 1
*
* On the 2nd run: Sentry store returns the latest eventId == 1,
* which matches the eventId returned by HMS client. Because of the match,
* no full update is triggered.
*
* On the 3d run: before the run, full update flag in HMSFollower is set via
* publish-subscribe mechanism.
* Sentry store still returns the latest eventId == 1,
* which matches the eventId returned by HMS client. Because of the match,
* no full update should be triggered. However, because of the trigger set,
* a new full HMS snapshot will be triggered.
*
* On the 4th run: Sentry store returns the latest eventId == 1,
* which matches the eventId returned by HMS client. Because of the match,
* no full update is triggered. This is to check that forced trigger set
* for run 3 only works once.
*
*/
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
final long HMS_PROCESSED_EVENT_ID = 1L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot because AuthzPathsMapping is empty
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
// Saving notificationID is in the same transaction of saving full snapshot
verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
reset(sentryStore);
// 2nd run should not get a snapshot because is already processed
when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
// 3d run should not get a snapshot because is already processed,
// but because of full update trigger it will, as in the first run
PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message");
when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
reset(sentryStore);
// 4th run should not get a snapshot because is already processed and publish-subscribe
// trigger is only supposed to work once. This is exactly as 2nd run.
when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
@Test
public void testPersistAFullSnapshotWhenAuthzsnapshotIsEmptyAndHDFSSyncIsEnabled() throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) the following:
*
* Disable HDFSSync before triggering a full snapshot
*
* HMS client always returns the paths image with the eventId == 1.
*
* On the 1st run: Hdfs sync is disabled in sentry server
* Sentry notification table is empty, so this
* should not trigger a new full HMS snapshot request but should
* fetch all the HMS notifications and persist them.
*
* On the 2nd run: Just enable hdfs sync and a full snapshot should be triggered
* because MAuthzPathsMapping table is empty
*
*/
disableHdfsSyncInSentry(configuration);
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
final long HMS_PROCESSED_EVENT_ID = 1L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot because hms notificaions is empty
// but it should never be persisted because HDFS sync is disabled
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
// Since HDFS sync is disabled, fullsnapshot should not be fetched from HMS
verify(sentryStore, times(0)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(1)).getLastProcessedNotificationID();
// Making sure that HMS client is invoked to get all the notifications
// starting from event-id 0
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
reset(sentryStore);
//Re-enable HDFS Sync and simply start the HMS follower thread, full snap shot
// should be triggered because MAuthzPathsMapping table is empty
enableHdfsSyncInSentry(configuration);
//Create a new hmsFollower instance since configuration is changing
hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
//Set last processed notification Id to match the full new value 1L
final long LATEST_EVENT_ID = 1L;
when(sentryStore.getLastProcessedNotificationID()).thenReturn(LATEST_EVENT_ID);
//Mock that sets isHmsNotificationEmpty to false
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
// Mock that sets the current HMS notification ID. Set it to match
// last processed notification Id so that doesn't trigger a full snapshot
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(LATEST_EVENT_ID));
//Mock that sets getting next notification eve
when(hmsClientMock.getNextNotification(Mockito.eq(HMS_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE),
(NotificationFilter) Mockito.notNull()))
.thenReturn(new NotificationEventResponse(
Arrays.<NotificationEvent>asList(
new NotificationEvent(LATEST_EVENT_ID, 0, "", "")
)
));
//Mock that sets isAuthzPathsSnapshotEmpty to true so trigger this particular test
when(sentryStore.isAuthzPathsSnapshotEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(0)).setLastProcessedNotificationID(fullSnapshot.getId());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
}
@Test
public void testDisablingAndEnablingHDFSSync() throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) the following:
*
* Disable HDFSSync before and enable it later.
*
* HMS client always returns the paths image with the eventId == 1.
*
* On the 1st run: Hdfs sync is disabled in sentry server.
* This should not trigger a new full HMS snapshot request but should
* fetch all the HMS notifications and persist them.
*
* On the 2nd run: Hdfs sync is enabled in sentry server
* Full snapshot should be fetched and persisted because MAuthzPathsMapping table is empty
*
* On 3rd run: Hdfs sync is disabled in sentry server.
* Sentry should remove the HMS path information(MAuthzPathsMapping and MSentryPathChange)
* but continue to process the notifications based on the information persisted in MSentryHmsNotification.
*
* on 4th run: Hdfs sync is enabled in sentry server
* Full snapshot should be fetched and persisted because MAuthzPathsMapping table is empty
*
*/
disableHdfsSyncInSentry(configuration);
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
final long HMS_PROCESSED_EVENT_ID = 1L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
NotificationEventResponse response = new NotificationEventResponse();
response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
response.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", ""));
response.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", ""));
when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
Mockito.anyObject())).thenReturn(response);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should not fetch full snapshot but should fetch notifications from 0
// and persists them
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(1)).clearHmsPathInformation();
// Making sure that HMS client is invoked to get all the notifications
// starting from event-id 0
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
verify(sentryStore, times(1)).persistLastProcessedNotificationID(1L);
verify(sentryStore, times(1)).persistLastProcessedNotificationID(2L);
verify(sentryStore, times(1)).persistLastProcessedNotificationID(3L);
reset(sentryStore, hmsClientMock);
//Enable HDFS sync to make sure that Full snapshot is fetched from HMS and persisted.
enableHdfsSyncInSentry(configuration);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsSnapshotEmpty()).thenReturn(true);
// Mock that sets the current HMS notification ID. Set it to match
// last processed notification Id so that doesn't trigger a full snapshot
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(HMS_PROCESSED_EVENT_ID));
hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 2nd run get a full snapshot because there was no snapshot persisted before.
hmsFollower.run();
verify(sentryStore, times(0)).clearHmsPathInformation();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(0)).setLastProcessedNotificationID(fullSnapshot.getId());
reset(sentryStore, hmsClientMock);
disableHdfsSyncInSentry(configuration);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsSnapshotEmpty()).thenReturn(true);
// Mock that sets the current HMS notification ID. Set it to match
// last processed notification Id so that doesn't trigger a full snapshot
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(HMS_PROCESSED_EVENT_ID));
hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 3rd run
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(1)).clearHmsPathInformation();
verify(sentryStore, times(0)).setLastProcessedNotificationID(Mockito.anyLong());
//Make sure that HMSFollower continues to fetch notifications based on persisted notifications.
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(fullSnapshot.getId()-1),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
reset(sentryStore, hmsClientMock);
enableHdfsSyncInSentry(configuration);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsSnapshotEmpty()).thenReturn(true);
hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 4th run
hmsFollower.run();
verify(sentryStore, times(0)).clearHmsPathInformation();
verify(sentryStore, times(1)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
verify(sentryStore, times(0)).setLastProcessedNotificationID(fullSnapshot.getId());
reset(sentryStore, hmsClientMock);
}
@Test
public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed()
throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) that Sentry already processed (and persisted) a notification
* with Id = 5, but the latest notification processed by the HMS is eventId = 1. So, an
* out-of-sync issue is happening on Sentry and HMS. This out-of-sync issue should trigger
* a new full HMS snapshot request with the same eventId = 1;
*/
final long SENTRY_PROCESSED_EVENT_ID = 5L;
final long HMS_PROCESSED_EVENT_ID = 1L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
// 2nd run should not get a snapshot because is already processed
when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
@Test
public void testPersistAFullSnapshotWhenNextExpectedEventIsNotAvailable() throws Exception {
/*
* TEST CASE
*
* Simulates (by using mocks) that Sentry already processed (and persisted) a notification
* with Id = 1, and the latest notification processed by the HMS is eventId = 5. So, new
* notifications should be fetched.
*
* The number of expected notifications should be 4, but we simulate that we fetch only one
* notification with eventId = 5 causing an out-of-sync because the expected notificatoin
* should be 2. This out-of-sync should trigger a new full HMS snapshot request with the
* same eventId = 5.
*/
final long SENTRY_PROCESSED_EVENT_ID = 1L;
final long HMS_PROCESSED_EVENT_ID = 5L;
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE),
(NotificationFilter) Mockito.notNull()))
.thenReturn(new NotificationEventResponse(
Arrays.<NotificationEvent>asList(
new NotificationEvent(fullSnapshot.getId(), 0, "", "")
)
));
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot
when(sentryStore.getLastProcessedNotificationID())
.thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
// 2nd run should not get a snapshot because is already processed
when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
/**
* Test that HMSFollower uses the input authentication server name when it is not null
*/
@Test
public void testInputConfigurationGetInputAuthServerName() {
Configuration sentryConfiguration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
String authServerName = hmsFollower.getAuthServerName();
Assert.assertEquals(true, authServerName.equals(hiveInstance));
}
/**
* Test that HMSFollower uses the default authentication server name when its constructor input
* value is null and the configuration does not configure AUTHZ_SERVER_NAME nor
* AUTHZ_SERVER_NAME_DEPRECATED
*/
@Test
public void testNoConfigurationGetDefaultAuthServerName() {
Configuration sentryConfiguration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null,
hiveConnectionFactory, null);
String authServerName = hmsFollower.getAuthServerName();
Assert.assertEquals(true, authServerName.equals(AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
}
/**
* Test that HMSFollower uses the configured authentication server name when its constructor input
* value is null and the configuration contains configuration for AUTHZ_SERVER_NAME
*/
@Test
public void testNewNameConfigurationGetAuthServerName() {
String serverName = "newServer";
Configuration sentryConfiguration = new Configuration();
sentryConfiguration.set(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar(), serverName);
HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null,
hiveConnectionFactory, null);
String authServerName = hmsFollower.getAuthServerName();
Assert.assertEquals(true, authServerName.equals(serverName));
}
/**
* Test that HMSFollower uses the configured deprecated authentication server name when its constructor input
* value is null and the configuration contains configuration for AUTHZ_SERVER_NAME_DEPRECATED
*/
@Test
public void testOldNameConfigurationGetAuthServerName() {
String serverName = "oldServer";
Configuration sentryConfiguration = new Configuration();
sentryConfiguration.set(AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getVar(), serverName);
HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null,
hiveConnectionFactory, null);
String authServerName = hmsFollower.getAuthServerName();
Assert.assertEquals(true, authServerName.equals(serverName));
}
/**
* Constructs create database event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db1";
// Create notification events
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.CREATE_DATABASE.toString(),
messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
.toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
verify(sentryStore, times(1))
.dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
/**
* Constructs drop database event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testDropDatabase() throws Exception {
String dbName = "db1";
// Create notification events
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.DROP_DATABASE.toString(),
messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
.toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
verify(sentryStore, times(1))
.dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
/**
* Constructs create table event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testCreateTable() throws Exception {
String dbName = "db1";
String tableName = "table1";
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
Collections.emptyIterator()).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
verify(sentryStore, times(1))
.dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
/**
* Constructs drop table event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testDropTable() throws Exception {
String dbName = "db1";
String tableName = "table1";
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.DROP_TABLE.toString(),
messageFactory.buildDropTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
.toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
verify(sentryStore, times(1))
.dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
/**
* Constructs rename table event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testRenameTable() throws Exception {
String dbName = "db1";
String tableName = "table1";
String newDbName = "db1";
String newTableName = "table2";
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.ALTER_TABLE.toString(),
messageFactory.buildAlterTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null))
.toString());
notificationEvent.setDbName(newDbName);
notificationEvent.setTableName(newTableName);
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb(dbName);
authorizable.setTable(tableName);
TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
newAuthorizable.setDb(newDbName);
newAuthorizable.setTable(newTableName);
verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
}
@Ignore
/**
* Constructs a bunch of events and passed to processor of hms follower. One of those is alter
* partition event with out actually changing anything(invalid event). Idea is to make sure that
* hms follower calls appropriate sentry store API's for the events processed by hms follower
* after processing the invalid alter partition event.
*
* @throws Exception
*/
@Test
public void testAlterPartitionWithInvalidEvent() throws Exception {
String dbName = "db1";
String tableName1 = "table1";
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
NotificationEvent notificationEvent;
List<FieldSchema> partCols;
StorageDescriptor sd;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
//noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
Configuration configuration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
null);
notificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(table, Collections.emptyIterator()).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
// Create a partition
List<Partition> partitions = new ArrayList<>();
StorageDescriptor invalidSd = new StorageDescriptor();
invalidSd.setLocation(null);
Partition partition = new Partition(Collections.singletonList("today"), dbName, tableName1,
0, 0, sd, null);
partitions.add(partition);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(),
messageFactory.buildAddPartitionMessage(table, partitions.iterator(), Collections.emptyIterator()).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
//Process the notification
hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification
// and persistLastProcessedNotificationID was not invoked.
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
// Create a alter notification with out actually changing anything.
// This is an invalid event and should be processed by sentry store.
// Event Id should be explicitly persisted using persistLastProcessedNotificationID
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
messageFactory.buildAlterPartitionMessage(table, partition, partition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that persistLastProcessedNotificationID is invoked explicitly.
verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
events.clear();
// Create a alter notification with some actual change.
sd = new StorageDescriptor();
sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1");
Partition updatedPartition = new Partition(partition);
updatedPartition.setSd(sd);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
messageFactory.buildAlterPartitionMessage(table, partition, updatedPartition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION
// notification and persistLastProcessedNotificationID was not invoked.
verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
events.clear();
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
null);
notificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(table1, Collections.emptyIterator()).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
/**
* Constructs a bunch of events and passed to processor of hms follower. One of those is alter
* table event with out actually changing anything(invalid event). Idea is to make sure that
* hms follower calls appropriate sentry store API's for the events processed by hms follower
* after processing the invalid alter table event.
*
* @throws Exception
*/
@Test
public void testAlterTableWithInvalidEvent() throws Exception {
String dbName = "db1";
String tableName1 = "table1";
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
NotificationEvent notificationEvent;
List<FieldSchema> partCols;
StorageDescriptor sd;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
//noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
Configuration configuration = new Configuration();
// enable HDFS sync, so perm and path changes will be saved into DB
enableHdfsSyncInSentry(configuration);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
null);
notificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(table, Collections.emptyIterator()).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
// Create alter table notification with out actually changing anything.
// This notification should not be processed by sentry server
// Notification should be persisted explicitly
notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.ALTER_TABLE.toString(),
messageFactory.buildAlterTableMessage(
new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
.toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events = new ArrayList<>();
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked
// to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID is explicitly invoked
verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(),
Mockito.any(UniquePathsUpdate.class));
//noinspection unchecked
verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
null);
notificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(table1, Collections.emptyIterator()).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
/**
* Constructs a two events and passed to processor of hms follower. First one being create table
* event with location information(Invalid Event). Idea is to make sure that hms follower calls
* appropriate sentry store API's for the event processed by hms follower after processing the
* invalid create table event.
*
* @throws Exception
*/
@Test
public void testCreateTableAfterInvalidEvent() throws Exception {
String dbName = "db1";
String tableName = "table1";
long inputEventId = 1;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
//noinspection unchecked
Mockito.doNothing().when(sentryStore)
.addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
Mockito.any(UniquePathsUpdate.class));
// Create invalid notification event. The location of the storage descriptor is null, which is invalid for creating table
StorageDescriptor invalidSd = new StorageDescriptor();
invalidSd.setLocation(null);
NotificationEvent invalidNotificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, null, null, null),
Collections.emptyIterator()).toString());
// Create valid notification event
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
inputEventId += 1;
NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 0,
EventMessage.EventType.CREATE_TABLE.toString(),
messageFactory.buildCreateTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
Collections.emptyIterator()).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(invalidNotificationEvent);
events.add(notificationEvent);
Configuration configuration = new Configuration();
enableHdfsSyncInSentry(configuration);
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
// invalid event updates notification ID directly
verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
// next valid event update path, which updates notification ID
//noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
Mockito.any(UniquePathsUpdate.class));
}
@Test
public void testNoHdfsNoPersistAFullSnapshot() throws Exception {
// TEST CASE
//
// Simulates (by using mocks) that Sentry has not processed any notifications.
// Test makes sure that this does not trigger a full snapshot and also makes sure that
// HMSFollower tries to fetch all notifications from HMS.
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
final long HMS_PROCESSED_EVENT_ID = 1L;
NotificationEventResponse response = new NotificationEventResponse();
response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
response.addToEvents(new NotificationEvent(1L, 0, "CREATE_TABLE", ""));
response.addToEvents(new NotificationEvent(2L, 0, "ALTER_TABLE", ""));
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(HMS_PROCESSED_EVENT_ID));
when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
Mockito.anyObject())).thenReturn(response);
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
// when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
Configuration configuration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should get a full snapshot because AuthzPathsMapping is empty
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).isAuthzPathsMappingEmpty();
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
verify(sentryStore, times(3)).persistLastProcessedNotificationID(Mockito.anyLong());
}
/**
* Tests the out-of-sync scenario when HDFS sync is disabled to make sure that
* HMSFollower starting fetching notifications from beginning after out-of-sync
* is detected.
* @throws Exception
*/
@Test
public void testNoHdfsOutofSync() throws Exception {
// TEST CASE
//
// Simulates (by using mocks) that Sentry is out-of-sync with sentry
final long SENTRY_PROCESSED_EVENT_ID = SentryConstants.EMPTY_NOTIFICATION_ID;
NotificationEventResponse response = new NotificationEventResponse();
response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
response.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", ""));
response.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", ""));
// Mock that returns the current HMS notification ID
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(3L));
when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
Mockito.anyObject())).thenReturn(response);
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
Configuration configuration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should not fetch t he full snapshot but should fetch all the notifications
// from HMS.
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
verify(sentryStore, times(0)).isAuthzPathsMappingEmpty();
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
verify(sentryStore, times(3)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore, hmsClientMock);
//Update the mock so that it returns the max(event-id) that was fetched in previous run
when(sentryStore.getLastProcessedNotificationID()).thenReturn(3L);
// Mock HMSClient so that it returns appropriate event-id
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(3L));
//2nd run
hmsFollower.run();
// Verify that HMSFollower starting fetching the notifications beyond what it already processed.
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(3L-1),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
// Mock that returns the current HMS notification ID which is less than what
// sentry already processed.
when(hmsClientMock.getCurrentNotificationEventId())
.thenReturn(new CurrentNotificationEventId(1L));
//Update the mock so that it returns the max(event-id) that was fetched in previous run
when(sentryStore.getLastProcessedNotificationID()).thenReturn(0L);
// 3rd run
hmsFollower.syncupWithHms(3L);
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
}
@Test
public void testNoHdfsSyncAlterTableIsPersisted() throws Exception {
String dbName = "db1";
String tableName = "table1";
String newDbName = "db1";
String newTableName = "table2";
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
NotificationEvent notificationEvent = new NotificationEvent(1, 0,
EventMessage.EventType.ALTER_TABLE.toString(),
messageFactory.buildAlterTableMessage(
new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null))
.toString());
notificationEvent.setDbName(newDbName);
notificationEvent.setTableName(newTableName);
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
Configuration configuration = new Configuration();
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hiveConnectionFactory, hiveInstance);
hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb(dbName);
authorizable.setTable(tableName);
TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
newAuthorizable.setDb(newDbName);
newAuthorizable.setTable(newTableName);
verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
}
/**
* Constructs events and fetch a portion of them. Make sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.
*
* @throws Exception
*/
@Test
public void testPartialHMSFetch() throws Exception {
final long SENTRY_PROCESSED_EVENT_ID = 1L;
final long HMS_PROCESSED_EVENT_ID = 1L;
SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
// Mock that returns a full snapshot
Map<String, Collection<String>> snapshotObjects = new HashMap<>();
snapshotObjects.put("db", Sets.newHashSet("/db"));
snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
//Just fetch 3 notificaitons at a time
int sentryHMSFetchSize = 3;
configuration.setInt(SENTRY_HMS_FETCH_SIZE, sentryHMSFetchSize);
NotificationEventResponse response = new NotificationEventResponse();
NotificationEventResponse partialResponse = new NotificationEventResponse();
response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
partialResponse.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
response.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", ""));
partialResponse.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", ""));
response.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", ""));
partialResponse.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", ""));
response.addToEvents(new NotificationEvent(4L, 0, "ALTER_TABLE", ""));
response.addToEvents(new NotificationEvent(5L, 0, "ALTER_TABLE", ""));
when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
Mockito.anyObject())).thenReturn(response);
when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(sentryHMSFetchSize),
Mockito.anyObject())).thenReturn(partialResponse);
when(hmsClientMock.getCurrentNotificationEventId()).thenReturn(new CurrentNotificationEventId(SENTRY_PROCESSED_EVENT_ID));
HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
hmsConnectionMock, hiveInstance);
hmsFollower.setSentryHmsClient(sentryHmsClient);
// 1st run should not fetch full snapshot but should fetch notifications from 0
// and persists them
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
hmsFollower.run();
verify(sentryStore, times(0)).persistFullPathsImage(
fullSnapshot.getPathImage(), fullSnapshot.getId());
// Making sure that HMS client is invoked to get all the notifications
// starting from event-id 0
verify(hmsClientMock, times(0)).getNextNotification(Mockito.eq(0L),
Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
Mockito.eq(sentryHMSFetchSize), Mockito.anyObject());
verify(sentryStore, times(1)).persistLastProcessedNotificationID(1L);
verify(sentryStore, times(1)).persistLastProcessedNotificationID(2L);
verify(sentryStore, times(1)).persistLastProcessedNotificationID(3L);
verify(sentryStore, times(0)).persistLastProcessedNotificationID(4L);
verify(sentryStore, times(0)).persistLastProcessedNotificationID(5L);
reset(sentryStore, hmsClientMock);
}
}