blob: ed34f96819a818752050581b76ff095a70322156 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
<p>
http://www.apache.org/licenses/LICENSE-2.0
<p>
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.sentry.service.thrift;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.hdfs.UniquePathsUpdate;
import org.apache.sentry.provider.db.service.persistent.SentryStoreInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class used to fetch Hive MetaStore notifications.
*/
public final class HiveNotificationFetcher implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class);
private final SentryStoreInterface sentryStore;
private final HiveConnectionFactory hmsConnectionFactory;
private HiveMetaStoreClient hmsClient;
/* The following cache and last filtered ID help us to avoid making less calls to the DB */
private long lastIdFiltered = 0;
private Set<String> cache = new HashSet<>();
public HiveNotificationFetcher(SentryStoreInterface sentryStore, HiveConnectionFactory hmsConnectionFactory) {
this.sentryStore = sentryStore;
this.hmsConnectionFactory = hmsConnectionFactory;
}
/**
* Fetch new HMS notifications appeared since a specified event ID. The returned list may
* include notifications with the same specified ID if they were not seen by Sentry.
*
* @param lastEventId The event ID to use to request notifications.
* @return A list of newer notifications unseen by Sentry.
* @throws Exception If an error occurs on the HMS communication.
*/
public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
return fetchNotifications(lastEventId, Integer.MAX_VALUE);
}
/**
* Fetch new HMS notifications appeared since a specified event ID. The returned list may
* include notifications with the same specified ID if they were not seen by Sentry.
*
* @param lastEventId The event ID to use to request notifications.
* @param maxEvents The maximum number of events to fetch.
* @return A list of newer notifications unseen by Sentry.
* @throws Exception If an error occurs on the HMS communication.
*/
public List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception {
NotificationFilter filter = null;
/*
* HMS may bring duplicated events that were committed later than the previous request. To bring
* those newer duplicated events, we request new notifications from the last seen ID - 1.
*
* A current problem is that we could miss duplicates committed much more later, but because
* HMS does not guarantee the order of those, then it is safer to avoid processing them.
*
* TODO: We can avoid doing this once HIVE-16886 is fixed.
*/
if (lastEventId > 0) {
filter = createNotificationFilterFor(lastEventId);
lastEventId--;
}
LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
NotificationEventResponse response;
try {
response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
} catch (Exception e) {
close();
throw e;
}
if (response != null && response.isSetEvents()) {
LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize());
return response.getEvents();
}
return Collections.emptyList();
}
/**
* Returns a HMS notification filter for a specific notification ID. HMS notifications may
* have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification.
*
* @param id the notification ID to filter
* @return the HMS notification filter
*/
private NotificationFilter createNotificationFilterFor(final long id) {
/*
* A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB.
* To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the
* specified ID. If a new filter ID is used, then we clean up the cache.
*/
if (lastIdFiltered != id) {
lastIdFiltered = id;
cache.clear();
}
return new NotificationFilter() {
@Override
public boolean accept(NotificationEvent notificationEvent) {
if (notificationEvent.getEventId() == id) {
String hash = UniquePathsUpdate.sha1(notificationEvent);
try {
if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) {
cache.add(hash);
LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id);
return false;
}
} catch (Exception e) {
LOGGER.error("An error occurred while checking if notification {} is already "
+ "processed: {}", id, e.getMessage());
// We cannot throw an exception on this filter, so we return false assuming this
// notification is already processed
return false;
}
}
return true;
}
};
}
/**
* Gets the HMS client connection object.
* If will create a new connection if no connection object exists.
*
* @return The HMS client used to communication with the Hive MetaStore.
* @throws Exception If it cannot connect to the HMS service.
*/
private HiveMetaStoreClient getHmsClient() throws Exception {
if (hmsClient == null) {
try {
hmsClient = hmsConnectionFactory.connect().getClient();
} catch (Exception e) {
LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage());
throw e;
}
}
return hmsClient;
}
/**
* @return the latest notification Id logged by the HMS
* @throws Exception when an error occurs when talking to the HMS client
*/
public long getCurrentNotificationId() throws Exception {
CurrentNotificationEventId eventId;
try {
eventId = getHmsClient().getCurrentNotificationEventId();
} catch (Exception e) {
close();
throw e;
}
if (eventId != null && eventId.isSetEventId()) {
return eventId.getEventId();
}
return SentryConstants.EMPTY_NOTIFICATION_ID;
}
/* AutoCloseable implementations */
@Override
public void close() {
try {
if (hmsClient != null) {
hmsClient.close();
}
cache.clear();
} finally {
hmsClient = null;
}
}
}