blob: 2cd18ea9d2ca40b5f46aaafde532fb3bf9769c07 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.sentry.service.thrift;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.hive.hcatalog.messaging.MessageDeserializer;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Apply newer events to the full update.
*
* <p>The process of obtaining ful snapshot from HMS is not atomic.
* While we read information from HMS it may change - some new objects can be created,
* or some can be removed or modified. This class is used to reconsile changes to
* the full snapshot.
*/
final class FullUpdateModifier {
private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class);
// Prevent creation of class instances
private FullUpdateModifier() {
}
/**
* Take a full snapshot and apply an MS event to it.
*
* <p>We pass serializer as a parameter to simplify testing.
*
* @param image Full snapshot
* @param event HMS notificatin event
* @param deserializer Message deserializer -
* should produce Sentry JSON serializer type messages.
*/
// NOTE: we pass deserializer here instead of using built-in one to simplify testing.
// Tests use mock serializers and thus we do not have to construct proper events.
static void applyEvent(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
HCatEventMessage.EventType eventType =
HCatEventMessage.EventType.valueOf(event.getEventType());
switch (eventType) {
case CREATE_DATABASE:
createDatabase(image, event, deserializer);
break;
case DROP_DATABASE:
dropDatabase(image, event, deserializer);
break;
case CREATE_TABLE:
createTable(image, event, deserializer);
break;
case DROP_TABLE:
dropTable(image, event, deserializer);
break;
case ALTER_TABLE:
alterTable(image, event, deserializer);
break;
case ADD_PARTITION:
addPartition(image, event, deserializer);
break;
case DROP_PARTITION:
dropPartition(image, event, deserializer);
break;
case ALTER_PARTITION:
alterPartition(image, event, deserializer);
break;
default:
LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
event.getEventType());
break;
}
}
/**
* Add mapping from the new database name to location {dbname: {location}}.
*/
private static void createDatabase(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONCreateDatabaseMessage message =
(SentryJSONCreateDatabaseMessage) deserializer
.getCreateDatabaseMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Create database event is missing database name");
return;
}
dbName = dbName.toLowerCase();
String location = message.getLocation();
if ((location == null) || location.isEmpty()) {
LOGGER.error("Create database event is missing database location");
return;
}
String path = FullUpdateInitializer.pathFromURI(location);
if (path == null) {
return;
}
// Add new database if it doesn't exist yet
if (!image.containsKey(dbName)) {
LOGGER.debug("create database {} with location {}", dbName, location);
image.put(dbName.intern(), Collections.singleton(path));
} else {
// Sanity check the information and print warnings if database exists but
// with a different location
Set<String> oldLocations = image.get(dbName);
LOGGER.debug("database {} already exists, ignored", dbName);
if (!oldLocations.contains(location)) {
LOGGER.warn("database {} exists but location is different from {}", dbName, location);
}
}
}
/**
* Remove a mapping from database name and remove all mappings which look like dbName.tableName
* where dbName matches database name.
*/
private static void dropDatabase(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONDropDatabaseMessage message =
(SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Drop database event is missing database name");
return;
}
dbName = dbName.toLowerCase();
String location = message.getLocation();
if ((location == null) || location.isEmpty()) {
LOGGER.error("Drop database event is missing database location");
return;
}
String path = FullUpdateInitializer.pathFromURI(location);
if (path == null) {
return;
}
// If the database is alreday deleted, we have nothing to do
Set<String> locations = image.get(dbName);
if (locations == null) {
LOGGER.debug("database {} is already deleted", dbName);
return;
}
if (!locations.contains(path)) {
LOGGER.warn("Database {} location does not match {}", dbName, path);
return;
}
LOGGER.debug("drop database {} with location {}", dbName, location);
// Drop information about the database
image.remove(dbName);
String dbPrefix = dbName + ".";
// Remove all objects for this database
for (Iterator<Map.Entry<String, Set<String>>> it = image.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<String, Set<String>> entry = it.next();
String key = entry.getKey();
if (key.startsWith(dbPrefix)) {
LOGGER.debug("Removing {}", key);
it.remove();
}
}
}
/**
* Add mapping for dbName.tableName.
*/
private static void createTable(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer
.getCreateTableMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Create table event is missing database name");
return;
}
String tableName = message.getTable();
if ((tableName == null) || tableName.isEmpty()) {
LOGGER.error("Create table event is missing table name");
return;
}
String location = message.getLocation();
if ((location == null) || location.isEmpty()) {
LOGGER.error("Create table event is missing table location");
return;
}
String path = FullUpdateInitializer.pathFromURI(location);
if (path == null) {
return;
}
String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
// Add new table if it doesn't exist yet
if (!image.containsKey(authName)) {
LOGGER.debug("create table {} with location {}", authName, location);
Set<String> locations = new HashSet<>(1);
locations.add(path);
image.put(authName.intern(), locations);
} else {
// Sanity check the information and print warnings if table exists but
// with a different location
Set<String> oldLocations = image.get(authName);
LOGGER.debug("Table {} already exists, ignored", authName);
if (!oldLocations.contains(location)) {
LOGGER.warn("Table {} exists but location is different from {}", authName, location);
}
}
}
/**
* Drop mapping from dbName.tableName
*/
private static void dropTable(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer
.getDropTableMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Drop table event is missing database name");
return;
}
String tableName = message.getTable();
if ((tableName == null) || tableName.isEmpty()) {
LOGGER.error("Drop table event is missing table name");
return;
}
String location = message.getLocation();
if ((location == null) || location.isEmpty()) {
LOGGER.error("Drop table event is missing table location");
return;
}
String path = FullUpdateInitializer.pathFromURI(location);
if (path == null) {
return;
}
String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
Set<String> locations = image.get(authName);
if (locations != null && locations.contains(path)) {
LOGGER.debug("Removing {}", authName);
image.remove(authName);
} else {
LOGGER.warn("can't find matching table {} with location {}", authName, location);
}
}
/**
* ALTER TABLE is a complicated function that can alter multiple things.
*
* <p>We take care iof the following cases:
* <ul>
* <li>Change database name. This is the most complicated one.
* We need to change the actual database name and change all mappings
* that look like "dbName.tableName" to the new dbName</li>
* <li>Change table name</li>
* <li>Change location</li>
* </ul>
*
*/
private static void alterTable(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONAlterTableMessage message =
(SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage());
String prevDbName = message.getDB();
if ((prevDbName == null) || prevDbName.isEmpty()) {
LOGGER.error("Alter table event is missing old database name");
return;
}
prevDbName = prevDbName.toLowerCase();
String prevTableName = message.getTable();
if ((prevTableName == null) || prevTableName.isEmpty()) {
LOGGER.error("Alter table event is missing old table name");
return;
}
prevTableName = prevTableName.toLowerCase();
String newDbName = event.getDbName();
if ((newDbName == null) || newDbName.isEmpty()) {
LOGGER.error("Alter table event is missing new database name");
return;
}
newDbName = newDbName.toLowerCase();
String newTableName = event.getTableName();
if ((newTableName == null) || newTableName.isEmpty()) {
LOGGER.error("Alter table event is missing new table name");
return;
}
newTableName = newTableName.toLowerCase();
String prevLocation = message.getOldLocation();
if ((prevLocation == null) || prevLocation.isEmpty()) {
LOGGER.error("Alter table event is missing old location");
return;
}
String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
if (prevPath == null) {
return;
}
String newLocation = message.getNewLocation();
if ((newLocation == null) || newLocation.isEmpty()) {
LOGGER.error("Alter table event is missing new location");
return;
}
String newPath = FullUpdateInitializer.pathFromURI(newLocation);
if (newPath == null) {
return;
}
String prevAuthName = prevDbName + "." + prevTableName;
String newAuthName = newDbName + "." + newTableName;
if (!prevDbName.equals(newDbName)) {
// Database name change
LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName);
Set<String> locations = image.get(prevDbName);
if (locations != null) {
// Rename database if it is not renamed yet
if (!image.containsKey(newDbName)) {
image.put(newDbName, locations);
image.remove(prevDbName);
// Walk through all tables and rename DB part of the AUTH name
// AUTH name is "dbName.TableName" so we need to replace dbName with the new name
String prevDbPrefix = prevDbName + ".";
String newDbPrefix = newDbName + ".";
renamePrefixKeys(image, prevDbPrefix, newDbPrefix);
} else {
LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName);
}
} else {
LOGGER.debug("database {} not found", prevDbName);
}
}
if (!prevAuthName.equals(newAuthName)) {
// Either the database name or table name changed, rename objects
Set<String> locations = image.get(prevAuthName);
if (locations != null) {
// Rename if it is not renamed yet
if (!image.containsKey(newAuthName)) {
LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName);
image.put(newAuthName, locations);
image.remove(prevAuthName);
} else {
LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName);
}
} else {
LOGGER.debug("auth {} not found", prevAuthName);
}
}
if (!prevPath.equals(newPath)) {
LOGGER.debug("Location change: {} -> {}", prevPath, newPath);
// Location change
Set<String> locations = image.get(newAuthName);
if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) {
locations.remove(prevPath);
locations.add(newPath);
} else {
LOGGER.warn("can not process location change for {}", newAuthName);
LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath);
}
}
}
/**
* Add partition just adds a new location to the existing table.
*/
private static void addPartition(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONAddPartitionMessage message =
(SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Add partition event is missing database name");
return;
}
String tableName = message.getTable();
if ((tableName == null) || tableName.isEmpty()) {
LOGGER.error("Add partition event for {} is missing table name", dbName);
return;
}
String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
List<String> locations = message.getLocations();
if (locations == null || locations.isEmpty()) {
LOGGER.error("Add partition event for {} is missing partition locations", authName);
return;
}
Set<String> oldLocations = image.get(authName);
if (oldLocations == null) {
LOGGER.warn("Add partition for {}: missing table locations",authName);
return;
}
// Add each partition
for (String location: locations) {
String path = FullUpdateInitializer.pathFromURI(location);
if (path != null) {
LOGGER.debug("Adding partition {}:{}", authName, path);
oldLocations.add(path);
}
}
}
/**
* Drop partition removes location from the existing table.
*/
private static void dropPartition(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONDropPartitionMessage message =
(SentryJSONDropPartitionMessage) deserializer
.getDropPartitionMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Drop partition event is missing database name");
return;
}
String tableName = message.getTable();
if ((tableName == null) || tableName.isEmpty()) {
LOGGER.error("Drop partition event for {} is missing table name", dbName);
return;
}
String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
List<String> locations = message.getLocations();
if (locations == null || locations.isEmpty()) {
LOGGER.error("Drop partition event for {} is missing partition locations", authName);
return;
}
Set<String> oldLocations = image.get(authName);
if (oldLocations == null) {
LOGGER.warn("Add partition for {}: missing table locations",authName);
return;
}
// Drop each partition
for (String location: locations) {
String path = FullUpdateInitializer.pathFromURI(location);
if (path != null) {
oldLocations.remove(path);
}
}
}
private static void alterPartition(Map<String, Set<String>> image, NotificationEvent event,
MessageDeserializer deserializer) {
SentryJSONAlterPartitionMessage message =
(SentryJSONAlterPartitionMessage) deserializer
.getAlterPartitionMessage(event.getMessage());
String dbName = message.getDB();
if ((dbName == null) || dbName.isEmpty()) {
LOGGER.error("Alter partition event is missing database name");
return;
}
String tableName = message.getTable();
if ((tableName == null) || tableName.isEmpty()) {
LOGGER.error("Alter partition event for {} is missing table name", dbName);
return;
}
String authName = dbName.toLowerCase() + "." + tableName.toLowerCase();
String prevLocation = message.getOldLocation();
if (prevLocation == null || prevLocation.isEmpty()) {
LOGGER.error("Alter partition event for {} is missing old location", authName);
}
String prevPath = FullUpdateInitializer.pathFromURI(prevLocation);
if (prevPath == null) {
return;
}
String newLocation = message.getNewLocation();
if (newLocation == null || newLocation.isEmpty()) {
LOGGER.error("Alter partition event for {} is missing new location", authName);
}
String newPath = FullUpdateInitializer.pathFromURI(newLocation);
if (newPath == null) {
return;
}
if (prevPath.equals(newPath)) {
LOGGER.warn("Alter partition event for {} has the same old and new path {}",
authName, prevPath);
return;
}
Set<String> locations = image.get(authName);
if (locations == null) {
LOGGER.warn("Missing partition locations for {}", authName);
return;
}
// Rename partition
if (locations.remove(prevPath)) {
LOGGER.debug("Renaming {} to {}", prevPath, newPath);
locations.add(newPath);
}
}
/**
* Walk through the map and rename all instances of oldKey to newKey.
*/
@VisibleForTesting
protected static void renamePrefixKeys(Map<String, Set<String>> image,
String oldKey, String newKey) {
// The trick is that we can't just iterate through the map, remove old values and
// insert new values. While we can remove old values with iterators,
// we can't insert new ones while we walk. So we collect the keys to be added in
// a new map and merge them in the end.
Map<String, Set<String>> replacement = new HashMap<>();
for (Iterator<Map.Entry<String, Set<String>>> it = image.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<String, Set<String>> entry = it.next();
String key = entry.getKey();
if (key.startsWith(oldKey)) {
String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1");
if (!image.containsKey(updatedKey)) {
LOGGER.debug("Rename {} to {}", key, updatedKey);
replacement.put(updatedKey, entry.getValue());
it.remove();
} else {
LOGGER.warn("skipping key {} - already present", updatedKey);
}
}
}
mergeMaps(image, replacement);
}
/**
* Merge replacement values into the original map but only if they are not
* already there.
*
* @param m1 source map
* @param m2 map with replacement values
*/
private static void mergeMaps(Map<String, Set<String>> m1, Map<String, Set<String>> m2) {
// Merge replacement values into the original map but only if they are not
// already there
for (Map.Entry<String, Set<String>> entry : m2.entrySet()) {
if (!m1.containsKey(entry.getKey())) {
m1.put(entry.getKey(), entry.getValue());
}
}
}
}