SENTRY-2307: Avoid HMS event synchronization while sentry is fetching full snapshot (Kalyan Kumar Kalvagadda reviewed by Lina li)
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
index 36b635a..008a48e 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryPolicyStoreProcessor.java
@@ -63,6 +63,8 @@
import org.apache.sentry.service.common.ServiceConstants.SentryPrincipalType;
import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
import org.apache.sentry.api.common.Status;
+import org.apache.sentry.service.thrift.FullUpdateInitializerState;
+import org.apache.sentry.service.thrift.SentryStateBank;
import org.apache.sentry.service.thrift.TSentryResponseStatus;
import org.apache.thrift.TException;
import org.apache.log4j.Logger;
@@ -1631,7 +1633,14 @@
*/
long syncEventId(long eventId) {
try {
- return sentryStore.getCounterWait().waitFor(eventId);
+ if (!SentryStateBank.isEnabled(FullUpdateInitializerState.COMPONENT,
+ FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS)) {
+ return sentryStore.getCounterWait().waitFor(eventId);
+ } else {
+ LOGGER.info("HMS event synchronization is disabled temporarily as sentry is in the process of " +
+ "fetching full snapshot. No action needed");
+ return eventId;
+ }
} catch (InterruptedException e) {
String msg = String.format("wait request for id %d is interrupted",
eventId);
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializerState.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializerState.java
new file mode 100644
index 0000000..8f423a4
--- /dev/null
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializerState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+/**
+ * States for the FullUpdateInitializer
+ */
+public enum FullUpdateInitializerState implements SentryState {
+ /**
+ * If the FullUpdateInitializer is in the process of taking full snapshot
+ */
+ FULL_SNAPSHOT_INPROGRESS;
+
+ /**
+ * The component name this state is for.
+ */
+ public static final String COMPONENT = "FullUpdateInitializer";
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getValue() {
+ return 1 << this.ordinal();
+ }
+}
\ No newline at end of file
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index cfb0d78..4baeb67 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -250,6 +250,7 @@
try (FullUpdateInitializer updateInitializer =
new FullUpdateInitializer(hiveConnectionFactory, conf);
Context context = updateTimer.time()) {
+ SentryStateBank.enableState(FullUpdateInitializerState.COMPONENT,FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS);
Map<String, Collection<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
logMessage = "Obtained full HMS snapshot";
LOGGER.info(logMessage);
@@ -259,6 +260,8 @@
failedSnapshotsCount.inc();
LOGGER.error("Snapshot created failed ", exception);
throw exception;
+ } finally {
+ SentryStateBank.disableState(FullUpdateInitializerState.COMPONENT,FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS);
}
}
}
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryStateBank.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryStateBank.java
index 2c05d49..2afe919 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryStateBank.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryStateBank.java
@@ -67,6 +67,7 @@
private static final Logger LOGGER = LoggerFactory.getLogger(SentryStateBank.class);
private static final AtomicLongMap<String> states = AtomicLongMap.create();
+ private static final AtomicLongMap<String> allStates = AtomicLongMap.create();
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
protected SentryStateBank() {
@@ -75,12 +76,14 @@
@VisibleForTesting
static void clearAllStates() {
states.clear();
+ allStates.clear();
LOGGER.debug("All states have been cleared.");
}
@VisibleForTesting
static void resetComponentState(String component) {
states.remove(component);
+ allStates.remove(component);
LOGGER.debug("All states have been cleared for component {}", component);
}
@@ -94,6 +97,7 @@
lock.writeLock().lock();
try {
states.put(component, states.get(component) | state.getValue());
+ allStates.put(component, states.get(component) | state.getValue());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} entered state {}", component, state.toString());
}
@@ -156,4 +160,24 @@
lock.readLock().unlock();
}
}
+
+ /**
+ * Checks if all of the states passed in were enabled
+ *
+ * @param component The component for the states
+ * @param passedStates the SentryStates to check
+ */
+ public static boolean wereStatesEnabled(String component, Set<SentryState> passedStates) {
+ lock.readLock().lock();
+ try {
+ long value = 0L;
+
+ for (SentryState state : passedStates) {
+ value += state.getValue();
+ }
+ return (allStates.get(component) & value) == value;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/api/service/thrift/TestSentryPolicyStoreProcessor.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/api/service/thrift/TestSentryPolicyStoreProcessor.java
index 2de6253..c23b385 100644
--- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/api/service/thrift/TestSentryPolicyStoreProcessor.java
+++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/api/service/thrift/TestSentryPolicyStoreProcessor.java
@@ -40,6 +40,8 @@
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.service.common.ServiceConstants.SentryPrincipalType;
import org.apache.sentry.service.common.ServiceConstants.ServerConfig;
+import org.apache.sentry.service.thrift.FullUpdateInitializerState;
+import org.apache.sentry.service.thrift.SentryStateBank;
import org.junit.After;
import org.junit.Assert;
@@ -419,6 +421,47 @@
}
@Test
+ public void testNotificationSync() throws Exception {
+
+ SentryPolicyStoreProcessor sentryServiceHandler =
+ new SentryPolicyStoreProcessor(ApiConstants.SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,
+ conf, sentryStore);
+ TSentryAuthorizable authorizable = new TSentryAuthorizable();
+ authorizable.setDb(DBNAME);
+
+ TSentryHmsEventNotification notification = new TSentryHmsEventNotification();
+ notification.setId(1L);
+ notification.setOwnerType(TSentryPrincipalType.ROLE);
+ notification.setOwnerName(OWNER);
+ notification.setAuthorizable(authorizable);
+ notification.setEventType(EventType.CREATE_DATABASE.toString());
+
+ sentryServiceHandler.sentry_notify_hms_event(notification);
+
+ // Verify that synchronization is attempted
+ Mockito.verify(
+ sentryStore, Mockito.times(1)
+ ).getCounterWait();
+
+ Mockito.verify(counterWait, Mockito.times(1)).waitFor(1L);
+
+ SentryStateBank.enableState(FullUpdateInitializerState.COMPONENT,
+ FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS);
+
+ sentryServiceHandler.sentry_notify_hms_event(notification);
+
+ // Verify that synchronization is not attempted because
+ // full snapshot is in progress
+ Mockito.reset(sentryStore);
+ Mockito.reset(counterWait);
+ Mockito.verify(
+ sentryStore, Mockito.times(0)
+ ).getCounterWait();
+ Mockito.verify(counterWait, Mockito.times(0)).waitFor(1L);
+
+ }
+
+ @Test
public void testAlterTableEventProcessing() throws Exception {
conf.set(SENTRY_DB_POLICY_STORE_OWNER_AS_PRIVILEGE, SentryOwnerPrivilegeType.ALL.toString());
diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
index 38668ca..04e562f 100644
--- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
+++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
@@ -25,6 +25,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.HashSet;
+import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -36,6 +38,7 @@
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.thrift.TException;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -107,6 +110,11 @@
client = new SentryHMSClient(conf, (HiveConnectionFactory)hiveConnectionFactory);
}
+ @Before
+ public void setUp() {
+ SentryStateBank.clearAllStates();
+ }
+
/**
* Creating snapshot when SentryHMSClient is not connected to HMS
*/
@@ -116,6 +124,10 @@
Assert.assertFalse(client.isConnected());
PathsImage snapshotInfo = client.getFullSnapshot();
Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
+ Assert.assertFalse("FullUpdateInitializer is not expected to be in progress",
+ SentryStateBank.isEnabled(FullUpdateInitializerState.COMPONENT, FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS));
+ Assert.assertFalse(SentryStateBank.wereStatesEnabled(FullUpdateInitializerState.COMPONENT, new HashSet<SentryState>(
+ Arrays.asList(FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS))));
}
/**
@@ -129,6 +141,10 @@
Assert.assertTrue(client.isConnected());
PathsImage snapshotInfo = client.getFullSnapshot();
Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
+ Assert.assertFalse("FullUpdateInitializer is not expected to be in progress",
+ SentryStateBank.isEnabled(FullUpdateInitializerState.COMPONENT, FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS));
+ Assert.assertTrue(SentryStateBank.wereStatesEnabled(FullUpdateInitializerState.COMPONENT, new HashSet<SentryState>(
+ Arrays.asList(FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS))));
}
/**
@@ -164,6 +180,10 @@
snapshotInfo = client.getFullSnapshot();
Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
+ Assert.assertFalse("FullUpdateInitializer is not expected to be in progress",
+ SentryStateBank.isEnabled(FullUpdateInitializerState.COMPONENT, FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS));
+ Assert.assertTrue(SentryStateBank.wereStatesEnabled(FullUpdateInitializerState.COMPONENT, new HashSet<SentryState>(
+ Arrays.asList(FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS))));
}
/**
@@ -194,7 +214,10 @@
snapshotInfo.getPathImage().get("db2.tab21"));
Assert.assertEquals(Sets.newHashSet("db3/tab31"), snapshotInfo.getPathImage().get("db3.tab31"));
Assert.assertEquals(snapshotInfo.getId(), mockClient.eventId);
-
+ Assert.assertFalse("FullUpdateInitializer is not expected to be in progress",
+ SentryStateBank.isEnabled(FullUpdateInitializerState.COMPONENT, FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS));
+ Assert.assertTrue(SentryStateBank.wereStatesEnabled(FullUpdateInitializerState.COMPONENT, new HashSet<SentryState>(
+ Arrays.asList(FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS))));
}
/**