blob: c37f3706092f2239e45e1cef2f31cb8ab1c886f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.binding.metastore;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.login.LoginException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
// import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; TODO: Enable once HIVE-18031 is available
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.api.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
import org.apache.sentry.service.thrift.SentryServiceClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
* whenever a DDL event happens on the Hive metastore.
*/
public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
private static final Logger LOGGER = LoggerFactory
.getLogger(SentrySyncHMSNotificationsPostEventListener.class);
private final HiveAuthzConf authzConf;
private final String serverName;
/*
* Latest processed ID by the Sentry server. May only increase.
*
* This listener will track the latest event ID processed by the Sentry server so that it avoids calling
* the sync request in case a late thread attempts to synchronize again an already processed ID.
*
* The variable is shared across threads, so the AtomicLong variable guarantees that is increased
* monotonically.
*/
private final AtomicLong latestProcessedId = new AtomicLong(0);
/*
* A client used for testing purposes only. I
*
* It may be set by unit-tests as a mock object and used to verify that the client methods
* were called correctly (see TestSentrySyncHMSNotificationsPostEventListener)
*/
private SentryPolicyServiceClient serviceClient;
public SentrySyncHMSNotificationsPostEventListener(Configuration config) {
super(config);
if (!(config instanceof HiveConf)) {
String error = "Could not initialize Plugin - Configuration is not an instanceof HiveConf";
LOGGER.error(error);
throw new RuntimeException(error);
}
authzConf = HiveAuthzConf.getAuthzConf((HiveConf) config);
serverName = getServerName();
}
/**
* Notify sentry server when new table is created
*
* @param tableEvent Create table Event
* @throws MetaException
*/
@Override
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
// Failure event, Need not be notified.
if (failedEvent(tableEvent, EventType.CREATE_TABLE)) {
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, tableEvent);
notifyHmsEvent(event);
}
/**
* Notify sentry server when table is dropped
*
* @param tableEvent Drop table event
* @throws MetaException
*/
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
// Failure event, Need not be notified.
if (failedEvent(tableEvent, EventType.DROP_TABLE)) {
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, tableEvent);
notifyHmsEvent(event);
}
/**
* Notify sentry server when when table is altered.
* Owner information is updated in the request only when there is owner change.
* Sentry is not notified when neither rename happened nor owner is changed
*
* @param tableEvent Alter table event
* @throws MetaException When both the owner change and rename is seen.
*/
@Override
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
if (tableEvent == null) {
return;
}
Table oldTable = tableEvent.getOldTable();
Table newTable = tableEvent.getNewTable();
if (oldTable == null) {
return;
}
if (newTable == null) {
return;
}
// Failure event, Need not be notified.
if (failedEvent(tableEvent, EventType.ALTER_TABLE)) {
return;
}
if(StringUtils.equals(oldTable.getOwner(), newTable.getOwner()) &&
StringUtils.equalsIgnoreCase(oldTable.getDbName(), newTable.getDbName()) &&
StringUtils.equalsIgnoreCase(oldTable.getTableName(), newTable.getTableName())) {
// nothing to notify, neither rename happened nor owner is changed
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, tableEvent);
notifyHmsEvent(event);
}
@Override
public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
// no-op
}
@Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
// no-op
}
@Override
public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
// no-op
}
/**
* Notify sentry server when new database is created
*
* @param dbEvent Create database event
* @throws MetaException
*/
@Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
// Failure event, Need not be notified.
if (failedEvent(dbEvent, EventType.CREATE_DATABASE)) {
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, dbEvent);
notifyHmsEvent(event);
}
/**
* Notify sentry server when database is dropped
*
* @param dbEvent Drop database event.
* @throws MetaException
*/
@Override
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
// Failure event, Need not be notified.
if (failedEvent(dbEvent, EventType.DROP_DATABASE)) {
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, dbEvent);
notifyHmsEvent(event);
}
/**
* Notify sentry server when database is altered
*
* @param dbEvent Alter database event
* @throws MetaException
*/
/* TODO: Enable once HIVE-18031 is available
@Override
public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
// Failure event, Need not be notified.
if (failedEvent(dbEvent, EventType.ALTER_DATABASE)) {
return;
}
SentryHmsEvent event = new SentryHmsEvent(serverName, dbEvent);
notifyHmsEvent(event);
}
*/
/**
* Notifies sentry server about the HMS Event and related metadata.
*
* @param event Sentry HMS event.
*/
private void notifyHmsEvent(SentryHmsEvent event) {
/* If the HMS is running in an active transaction, then we do not want to sync with Sentry
* because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
* forever or until a read time-out happens.
* */
if(event.isMetastoreTransactionActive()) {
return;
}
if (!shouldSyncEvent(event)) {
event.setEventId(0L);
}
try (SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
LOGGER.debug("Notifying sentry about Notification for {} (id: {})", event.getEventType(),
event.getEventId());
long sentryLatestProcessedId = sentryClient.notifyHmsEvent(
getUserName(),
event.getEventId(),
event.getEventType().toString(),
event.getOwnerType(),
event.getOwnerName(),
event.getAuthorizable());
LOGGER.debug("Finished Notifying sentry about Notification for {} (id: {})", event.getEventType(),
event.getEventId());
LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
updateProcessedId(sentryLatestProcessedId);
} catch (Exception e) {
// This error is only logged. There is no need to throw an error to Hive because HMS sync is called
// after the notification is already generated by Hive (as post-event).
LOGGER.error("Encountered failure while notifying notification for {} (id: {})",
event.getEventType(), event.getEventId(), e);
}
}
/**
* Updates the latest processed ID, if and only if eventId is bigger. This keeps the contract that
* {@link #latestProcessedId} may only increase.
*
* @param eventId The value to be set on the {@link #latestProcessedId}
*/
private void updateProcessedId(long eventId) {
long oldVal = latestProcessedId.get();
if (eventId > oldVal) {
// It is fine for the compareAndSet to fail
latestProcessedId.compareAndSet(oldVal, eventId);
}
}
/**
* Sets the sentry client object (for testing purposes only)
* <p>
* It may be set by unit-tests as a mock object and used to verify that the client methods
* were called correctly (see TestSentrySyncHMSNotificationsPostEventListener).
*/
@VisibleForTesting
void setSentryServiceClient(SentryPolicyServiceClient serviceClient) {
this.serviceClient = serviceClient;
}
private SentryPolicyServiceClient getSentryServiceClient() throws MetaException {
// Return the sentry client in case was set by the unit tests.
if (serviceClient != null) {
return serviceClient;
}
try {
return SentryServiceClientFactory.create(authzConf);
} catch (Exception e) {
throw new MetaException("Failed to connect to Sentry service " + e.getMessage());
}
}
private boolean failedEvent(ListenerEvent event, EventType eventType) {
if (!event.getStatus()) {
LOGGER.debug("Skip HMS synchronization request with the Sentry server for {} " +
"{} since the operation failed. \n", eventType.toString(), event);
return true;
}
return false;
}
/**
* Performs checks to make sure if the event should be synced.
*
* @param event SentryHmsEvent
* @return False: if Event should not be synced, True otherwise.
*/
private boolean shouldSyncEvent(SentryHmsEvent event) {
// Sync need not be performed, Event id is not updated in the event.
if(event.getEventId() < 0) {
return false;
}
// This check is only for performance reasons to avoid calling the sync thrift call if the Sentry
// server already processed the requested eventId.
if (event.getEventId() <= latestProcessedId.get()) {
return false;
}
return true;
}
private String getServerName() {
String serverName = authzConf.get(AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
if (!StringUtils.isEmpty(serverName)) {
return serverName;
}
return authzConf.get(AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getVar(),
AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getDefault());
}
private String getUserName() throws MetaException {
try {
return Utils.getUGI().getShortUserName();
} catch (LoginException e) {
throw new MetaException("Failed to get username " + e.getMessage());
} catch (IOException e) {
throw new MetaException("Failed to get username " + e.getMessage());
}
}
}