blob: 53a3fa401bf024ffc33cb160a1a0a5d7189dd940 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.sentry.service.thrift;
import com.google.common.annotations.VisibleForTesting;
import java.net.SocketException;
import java.util.Collection;
import java.util.List;
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry.
* It gets the full update and notification logs from HMS and applies it to
* update permissions stored in Sentry using SentryStore and also update the &lt obj,path &gt state
* stored for HDFS-Sentry sync.
*/
public class HMSFollower implements Runnable, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
private static boolean connectedToHms = false;
private SentryHMSClient client;
private final Configuration authzConf;
private final SentryStore sentryStore;
private final NotificationProcessor notificationProcessor;
private final boolean hdfsSyncEnabled;
private final LeaderStatusMonitor leaderMonitor;
/**
* Configuring Hms Follower thread.
*
* @param conf sentry configuration
* @param store sentry store
* @param leaderMonitor singleton instance of LeaderStatusMonitor
*/
HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
HiveSimpleConnectionFactory hiveConnectionFactory) {
this(conf, store, leaderMonitor, hiveConnectionFactory, null);
}
/**
* Constructor should be used only for testing purposes.
*
* @param conf sentry configuration
* @param store sentry store
* @param leaderMonitor
* @param authServerName Server that sentry is Authorizing
*/
@VisibleForTesting
public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) {
LOGGER.info("HMSFollower is being initialized");
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = store;
if (authServerName == null) {
authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(),
conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
}
notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
client = new SentryHMSClient(authzConf, hiveConnectionFactory);
hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
}
@VisibleForTesting
public static boolean isConnectedToHms() {
return connectedToHms;
}
@VisibleForTesting
void setSentryHmsClient(SentryHMSClient client) {
this.client = client;
}
@Override
public void close() {
if (client != null) {
// Close any outstanding connections to HMS
try {
client.disconnect();
} catch (Exception failure) {
LOGGER.error("Failed to close the Sentry Hms Client", failure);
}
}
}
@Override
public void run() {
long lastProcessedNotificationId;
try {
// Initializing lastProcessedNotificationId based on the latest persisted notification ID.
lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
} catch (Exception e) {
LOGGER.error("Failed to get the last processed notification id from sentry store, "
+ "Skipping the processing", e);
return;
}
// Wake any clients connected to this service waiting for HMS already processed notifications.
wakeUpWaitingClientsForSync(lastProcessedNotificationId);
// Only the leader should listen to HMS updates
if (!isLeader()) {
// Close any outstanding connections to HMS
close();
return;
}
syncupWithHms(lastProcessedNotificationId);
}
private boolean isLeader() {
return (leaderMonitor == null) || leaderMonitor.isLeader();
}
@VisibleForTesting
String getAuthServerName() {
return notificationProcessor.getAuthServerName();
}
/**
* Processes new Hive Metastore notifications.
*
* <p>If no notifications are processed yet, then it
* does a full initial snapshot of the Hive Metastore followed by new notifications updates that
* could have happened after it.
*
* <p>Clients connections waiting for an event notification will be
* woken up afterwards.
*/
private void syncupWithHms(long notificationId) {
try {
client.connect();
connectedToHms = true;
} catch (Throwable e) {
LOGGER.error("HMSFollower cannot connect to HMS!!", e);
return;
}
try {
/* Before getting notifications, it checks if a full HMS snapshot is required. */
if (isFullSnapshotRequired(notificationId)) {
createFullSnapshot();
return;
}
Collection<NotificationEvent> notifications = client.getNotifications(notificationId);
// After getting notifications, it checks if the HMS did some clean-up and notifications
// are out-of-sync with Sentry.
if (areNotificationsOutOfSync(notifications, notificationId)) {
createFullSnapshot();
return;
}
// Continue with processing new notifications if no snapshots are done.
processNotifications(notifications);
} catch (TException e) {
// If the underlying exception is around socket exception,
// it is better to retry connection to HMS
if (e.getCause() instanceof SocketException) {
LOGGER.error("Encountered Socket Exception during fetching Notification entries,"
+ " will attempt to reconnect to HMS after configured interval", e);
close();
} else {
LOGGER.error("ThriftException occurred communicating with HMS", e);
}
} catch (Throwable t) {
// catching errors to prevent the executor to halt.
LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(),
t);
}
}
/**
* Checks if a new full HMS snapshot request is needed by checking if:
* <ul>
* <li>No snapshots has been persisted yet.</li>
* <li>The current notification Id on the HMS is less than the
* latest processed by Sentry.</li>
* </ul>
*
* @param latestSentryNotificationId The notification Id to check against the HMS
* @return True if a full snapshot is required; False otherwise.
* @throws Exception If an error occurs while checking the SentryStore or the HMS client.
*/
private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception {
if (sentryStore.isAuthzPathsMappingEmpty()) {
return true;
}
long currentHmsNotificationId = client.getCurrentNotificationId();
if (currentHmsNotificationId < latestSentryNotificationId) {
LOGGER.info("The latest notification ID on HMS is less than the latest notification ID "
+ "processed by Sentry. Need to request a full HMS snapshot.");
return true;
}
return false;
}
/**
* Checks if the HMS and Sentry processed notifications are out-of-sync.
* This could happen because the HMS did some clean-up of old notifications
* and Sentry was not requesting notifications during that time.
*
* @param events All new notifications to check for an out-of-sync.
* @param latestProcessedId The latest notification processed by Sentry to check against the
* list of notifications events.
* @return True if an out-of-sync is found; False otherwise.
*/
private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
long latestProcessedId) {
if (events.isEmpty()) {
return false;
}
List<NotificationEvent> eventList = (List<NotificationEvent>) events;
long firstNotificationId = eventList.get(0).getEventId();
long lastNotificationId = eventList.get(eventList.size() - 1).getEventId();
//
// If the next expected notification is not available, then an out-of-sync might
// have happened due to the following issue:
//
// - HDFS sync was disabled or Sentry was shutdown for a time period longer than
// the HMS notification clean-up thread causing old notifications to be deleted.
//
if ((latestProcessedId + 1) != firstNotificationId) {
LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processed"
+ "notifications. Need to request a full HMS snapshot.");
return true;
}
long expectedSize = lastNotificationId - latestProcessedId;
if (expectedSize < eventList.size()) {
LOGGER.info("The HMS notifications fetched has some gaps in the # of events received. These"
+ "should not cause an out-of-sync issue. (expected = {}, fetched = {})",
expectedSize, eventList.size());
}
return false;
}
/**
* Request for full snapshot and persists it if there is no snapshot available in the
* sentry store. Also, wakes-up any waiting clients.
*
* @return ID of last notification processed.
* @throws Exception if there are failures
*/
private long createFullSnapshot() throws Exception {
LOGGER.debug("Attempting to take full HMS snapshot");
PathsImage snapshotInfo = client.getFullSnapshot();
if (snapshotInfo.getPathImage().isEmpty()) {
return snapshotInfo.getId();
}
// Check we're still the leader before persisting the new snapshot
if (!isLeader()) {
return SentryStore.EMPTY_NOTIFICATION_ID;
}
try {
LOGGER.debug("Persisting HMS path full snapshot");
if (hdfsSyncEnabled) {
sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
}
// We need to persist latest notificationID for next poll
sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId());
} catch (Exception failure) {
LOGGER.error("Received exception while persisting HMS path full snapshot ");
throw failure;
}
// Wake up any HMS waiters that could have been put on hold before getting the
// eventIDBefore value.
wakeUpWaitingClientsForSync(snapshotInfo.getId());
// HMSFollower connected to HMS and it finished full snapshot if that was required
// Log this message only once
LOGGER.info("Sentry HMS support is ready");
return snapshotInfo.getId();
}
/**
* Process the collection of notifications and wake up any waiting clients.
* Also, persists the notification ID regardless of processing result.
*
* @param events list of event to be processed
* @throws Exception if the complete notification list is not processed because of JDO Exception
*/
public void processNotifications(Collection<NotificationEvent> events) throws Exception {
boolean isNotificationProcessed;
if (events.isEmpty()) {
return;
}
for (NotificationEvent event : events) {
isNotificationProcessed = false;
try {
// Only the leader should process the notifications
if (!isLeader()) {
return;
}
isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
} catch (Exception e) {
if (e.getCause() instanceof JDODataStoreException) {
LOGGER.info("Received JDO Storage Exception, Could be because of processing "
+ "duplicate notification");
if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
// Rest of the notifications need not be processed.
LOGGER.error("Received event with Id: {} which is smaller then the ID "
+ "persisted in store", event.getEventId());
break;
}
} else {
LOGGER.error("Processing the notification with ID:{} failed with exception {}",
event.getEventId(), e);
}
}
if (!isNotificationProcessed) {
try {
// Update the notification ID in the persistent store even when the notification is
// not processed as the content in in the notification is not valid.
// Continue processing the next notification.
LOGGER.debug("Explicitly Persisting Notification ID:{}", event.getEventId());
sentryStore.persistLastProcessedNotificationID(event.getEventId());
} catch (Exception failure) {
LOGGER.error("Received exception while persisting the notification ID "
+ event.getEventId());
throw failure;
}
}
// Wake up any HMS waiters that are waiting for this ID.
wakeUpWaitingClientsForSync(event.getEventId());
}
}
/**
* Wakes up HMS waiters waiting for a specific event notification.
*
* @param eventId Id of a notification
*/
private void wakeUpWaitingClientsForSync(long eventId) {
CounterWait counterWait = sentryStore.getCounterWait();
// Wake up any HMS waiters that are waiting for this ID.
// counterWait should never be null, but tests mock SentryStore and a mocked one
// doesn't have it.
if (counterWait != null) {
counterWait.update(eventId);
}
}
}