blob: c8e104367852a751732a11ce2886689c8a98e68c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.events;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.ClassUtil;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.apache.hadoop.hive.common.FileUtils;
/**
* Main class which provides Metastore event objects for various event types. Also
* provides a MetastoreEventFactory to get or create the event instances for a given event
* type
*/
public class MetastoreEvents {
/**
* This enum contains keys for parameters added in Metastore entities, relevant for
* event processing. When eventProcessor is instantiated, we make sure during config
* validation that these parameters are not filtered out through the Metastore config
* EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS.
*/
public enum MetastoreEventPropertyKey {
// key to be used for catalog version in table properties for detecting self-events
CATALOG_VERSION("impala.events.catalogVersion"),
// key to be used for catalog service id for detecting self-events
CATALOG_SERVICE_ID("impala.events.catalogServiceId"),
// flag to be set in the table/database parameters to disable event based metadata
// sync. Note the this is a user-facing property. Any changes to this key name
// will break backwards compatibility
DISABLE_EVENT_HMS_SYNC("impala.disableHmsSync");
private String key_;
MetastoreEventPropertyKey(String key) { this.key_ = key; }
public String getKey() { return key_; }
}
public enum MetastoreEventType {
CREATE_TABLE("CREATE_TABLE"),
DROP_TABLE("DROP_TABLE"),
ALTER_TABLE("ALTER_TABLE"),
CREATE_DATABASE("CREATE_DATABASE"),
DROP_DATABASE("DROP_DATABASE"),
ALTER_DATABASE("ALTER_DATABASE"),
ADD_PARTITION("ADD_PARTITION"),
ALTER_PARTITION("ALTER_PARTITION"),
DROP_PARTITION("DROP_PARTITION"),
INSERT("INSERT"),
OTHER("OTHER");
private final String eventType_;
MetastoreEventType(String msEventType) {
this.eventType_ = msEventType;
}
@Override
public String toString() {
return eventType_;
}
/**
* Returns the MetastoreEventType from a given string value of event from Metastore's
* NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
* return OTHER
*
* @param eventType EventType value from the <code>NotificationEvent</code>
*/
public static MetastoreEventType from(String eventType) {
for (MetastoreEventType metastoreEventType : values()) {
if (metastoreEventType.eventType_.equalsIgnoreCase(eventType)) {
return metastoreEventType;
}
}
return OTHER;
}
}
/**
* Factory class to create various MetastoreEvents.
*/
public static class MetastoreEventFactory {
private static final Logger LOG =
LoggerFactory.getLogger(MetastoreEventFactory.class);
// catalog service instance to be used for creating eventHandlers
private final CatalogServiceCatalog catalog_;
// metrics registry to be made available for each events to publish metrics
private final Metrics metrics_;
public MetastoreEventFactory(CatalogServiceCatalog catalog, Metrics metrics) {
this.catalog_ = Preconditions.checkNotNull(catalog);
this.metrics_ = Preconditions.checkNotNull(metrics);
}
/**
* creates instance of <code>MetastoreEvent</code> used to process a given event type.
* If the event type is unknown, returns a IgnoredEvent
*/
private MetastoreEvent get(NotificationEvent event)
throws MetastoreNotificationException {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType =
MetastoreEventType.from(event.getEventType());
switch (metastoreEventType) {
case CREATE_TABLE: return new CreateTableEvent(catalog_, metrics_, event);
case DROP_TABLE: return new DropTableEvent(catalog_, metrics_, event);
case ALTER_TABLE: return new AlterTableEvent(catalog_, metrics_, event);
case CREATE_DATABASE: return new CreateDatabaseEvent(catalog_, metrics_, event);
case DROP_DATABASE: return new DropDatabaseEvent(catalog_, metrics_, event);
case ALTER_DATABASE: return new AlterDatabaseEvent(catalog_, metrics_, event);
case ADD_PARTITION:
// add partition events triggers invalidate table currently
return new AddPartitionEvent(catalog_, metrics_, event);
case DROP_PARTITION:
// drop partition events triggers invalidate table currently
return new DropPartitionEvent(catalog_, metrics_, event);
case ALTER_PARTITION:
// alter partition events triggers invalidate table currently
return new AlterPartitionEvent(catalog_, metrics_, event);
case INSERT:
// Insert events trigger refresh on a table/partition currently
return new InsertEvent(catalog_, metrics_, event);
default:
// ignore all the unknown events by creating a IgnoredEvent
return new IgnoredEvent(catalog_, metrics_, event);
}
}
/**
* Given a list of notification events, returns a list of <code>MetastoreEvent</code>
* In case there are create events which are followed by drop events for the same
* object, the create events are filtered out. The drop events do not need to be
* filtered out
*
* This is needed to avoid the replay problem. For example, if catalog created and
* removed a table, the create event received will try to add the object again. This
* table will be visible until the drop table event is processed. This can be avoided
* by "looking ahead" in the event stream to see if the table with the same name was
* dropped. In such a case, the create event can be ignored
*
* @param events NotificationEvents fetched from metastore
* @return A list of MetastoreEvents corresponding to the given the NotificationEvents
* @throws MetastoreNotificationException if a NotificationEvent could not be
* parsed into MetastoreEvent
*/
List<MetastoreEvent> getFilteredEvents(List<NotificationEvent> events)
throws MetastoreNotificationException {
Preconditions.checkNotNull(events);
if (events.isEmpty()) return Collections.emptyList();
List<MetastoreEvent> metastoreEvents = new ArrayList<>(events.size());
for (NotificationEvent event : events) {
metastoreEvents.add(get(event));
}
// filter out the create events which has a corresponding drop event later
int sizeBefore = metastoreEvents.size();
int numFilteredEvents = 0;
int i = 0;
while (i < metastoreEvents.size()) {
MetastoreEvent currentEvent = metastoreEvents.get(i);
String eventDb = currentEvent.getDbName();
String eventTbl = currentEvent.getTableName();
// if the event is on blacklisted db or table we should filter it out
if (catalog_.isBlacklistedDb(eventDb) || (eventTbl != null && catalog_
.isBlacklistedTable(eventDb, eventTbl))) {
String blacklistedObject = eventTbl != null ? new TableName(eventDb,
eventTbl).toString() : eventDb;
LOG.info(currentEvent.debugString("Filtering out this event since it is on a "
+ "blacklisted database or table %s", blacklistedObject));
metastoreEvents.remove(i);
numFilteredEvents++;
} else if (currentEvent.isRemovedAfter(metastoreEvents.subList(i + 1,
metastoreEvents.size()))) {
LOG.info(currentEvent.debugString("Filtering out this event since the object is "
+ "either removed or renamed later in the event stream"));
metastoreEvents.remove(i);
numFilteredEvents++;
} else {
i++;
}
}
LOG.info(String.format("Total number of events received: %d Total number of events "
+ "filtered out: %d", sizeBefore, numFilteredEvents));
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
.inc(numFilteredEvents);
return metastoreEvents;
}
}
/**
* Abstract base class for all MetastoreEvents. A MetastoreEvent is a object used to
* process a NotificationEvent received from metastore. It is self-contained with all
* the information needed to take action on catalog based on a the given
* NotificationEvent
*/
public static abstract class MetastoreEvent {
// String.format compatible string to prepend event id and type
private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s ";
// logger format compatible string to prepend to a log formatted message
private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} ";
// CatalogServiceCatalog instance on which the event needs to be acted upon
protected final CatalogServiceCatalog catalog_;
// the notification received from metastore which is processed by this
protected final NotificationEvent event_;
// Logger available for all the sub-classes
protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
// dbName from the event
protected final String dbName_;
// tblName from the event
protected final String tblName_;
// eventId of the event. Used instead of calling getter on event_ everytime
protected final long eventId_;
// eventType from the NotificationEvent
protected final MetastoreEventType eventType_;
// Actual notificationEvent object received from Metastore
protected final NotificationEvent metastoreNotificationEvent_;
// metrics registry so that events can add metrics
protected final Metrics metrics_;
// version number from the event object parameters used for self-event detection
protected long versionNumberFromEvent_ = -1;
// service id from the event object parameters used for self-event detection
protected String serviceIdFromEvent_ = null;
// the list of versions which this catalog expects to be see in the
// self-generated events in order. Anything which is seen out of order of this list
// will be used to determine that this is not a self-event and table will be
// invalidated. See <code>isSelfEvent</code> for more details.
protected List<Long> pendingVersionNumbersFromCatalog_ = Collections.EMPTY_LIST;
MetastoreEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
NotificationEvent event) {
this.catalog_ = catalogServiceCatalog;
this.event_ = event;
this.eventId_ = event_.getEventId();
this.eventType_ = MetastoreEventType.from(event.getEventType());
// certain event types in Hive-3 like COMMIT_TXN may not have dbName set
this.dbName_ = event.getDbName();
this.tblName_ = event.getTableName();
this.metastoreNotificationEvent_ = event;
this.metrics_ = metrics;
}
public String getDbName() { return dbName_; }
public String getTableName() { return tblName_; }
/**
* Process this event if it is enabled based on the flags on this object
*
* @throws CatalogException If Catalog operations fail
* @throws MetastoreNotificationException If NotificationEvent parsing fails
*/
public void processIfEnabled()
throws CatalogException, MetastoreNotificationException {
if (isEventProcessingDisabled()) {
LOG.info(debugString("Skipping this event because of flag evaluation"));
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
return;
}
process();
}
/**
* Process the information available in the NotificationEvent to take appropriate
* action on Catalog
*
* @throws MetastoreNotificationException in case of event parsing errors out
* @throws CatalogException in case catalog operations could not be performed
*/
protected abstract void process()
throws MetastoreNotificationException, CatalogException;
/**
* Helper method to get debug string with helpful event information prepended to the
* message. This can be used to generate helpful exception messages
*
* @param msgFormatString String value to be used in String.format() for the given
* message
* @param args args to the <code>String.format()</code> for the given
* msgFormatString
*/
protected String debugString(String msgFormatString, Object... args) {
String formatString =
new StringBuilder(STR_FORMAT_EVENT_ID_TYPE).append(msgFormatString).toString();
Object[] formatArgs = getLogFormatArgs(args);
return String.format(formatString, formatArgs);
}
/**
* Helper method to generate the format args after prepending the event id and type
*/
private Object[] getLogFormatArgs(Object[] args) {
Object[] formatArgs = new Object[args.length + 2];
formatArgs[0] = eventId_;
formatArgs[1] = eventType_;
int i = 2;
for (Object arg : args) {
formatArgs[i] = arg;
i++;
}
return formatArgs;
}
/**
* Logs at info level the given log formatted string and its args. The log formatted
* string should have {} pair at the appropriate location in the string for each arg
* value provided. This method prepends the event id and event type before logging the
* message. No-op if the log level is not at INFO
*/
protected void infoLog(String logFormattedStr, Object... args) {
if (!LOG.isInfoEnabled()) return;
String formatString =
new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
Object[] formatArgs = getLogFormatArgs(args);
LOG.info(formatString, formatArgs);
}
/**
* Similar to infoLog excepts logs at debug level
*/
protected void debugLog(String logFormattedStr, Object... args) {
if (!LOG.isDebugEnabled()) return;
String formatString =
new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
Object[] formatArgs = getLogFormatArgs(args);
LOG.debug(formatString, formatArgs);
}
/**
* Search for a inverse event (for example drop_table is a inverse event for
* create_table) for this event from a given list of notificationEvents starting for
* the startIndex. This is useful for skipping certain events from processing
*
* @param events List of NotificationEvents to be searched
* @return true if the object is removed after this event, else false
*/
protected boolean isRemovedAfter(List<MetastoreEvent> events) {
return false;
}
/**
* Returns true if event based sync is disabled for this table/database associated
* with this event
*/
protected abstract boolean isEventProcessingDisabled();
/**
* This method detects if this event is self-generated or not (see class
* documentation of <code>MetastoreEventProcessor</code> to understand what a
* self-event is).
*
* In order to determine this, it compares the value of catalogVersion from the
* event with the list of pending version numbers stored in the catalog
* database/table. The event could be generated by another instance of CatalogService
* which can potentially have the same versionNumber. In order to resolve such
* conflict, it compares the CatalogService's serviceId before comparing the version
* number. If it is determined that this is indeed a self-event, this method also
* clears the version number from the catalog database/table's list of pending
* versions for in-flight events. This is needed so that a subsequent event with the
* same service id or version number is not incorrectly determined as a self-event. A
* subsequent event with the same serviceId and versionNumber is most likely generated
* by a non-Impala system because it cached the table object having those values of
* serviceId and version. More details on complete flow of self-event handling
* logic can be read in <code>MetastoreEventsProcessor</code> documentation.
*
* @return True if this event is a self-generated event. If the returned value is
* true, this method also clears the version number from the catalog database/table.
* Returns false if the version numbers or service id don't match
* @throws CatalogException in case of exceptions while removing the version number
* from the database/table or when reading the values of version list from catalog
* database/table
*/
protected boolean isSelfEvent() throws CatalogException {
initSelfEventIdentifiersFromEvent();
if (versionNumberFromEvent_ == -1 || pendingVersionNumbersFromCatalog_.isEmpty()) {
return false;
}
// first check if service id is a match, then check if the event version is what we
// expect in the list
if (catalog_.getCatalogServiceId().equals(serviceIdFromEvent_)
&& pendingVersionNumbersFromCatalog_.get(0).equals(versionNumberFromEvent_)) {
// we see this version for the first time. This is a self-event
// remove this version number from the catalog so that next time we see this
// version we don't determine it wrongly as self-event
// TODO we should improve by atomically doing this check and invalidating the
// table to avoid any races. Currently, it is possible the some other thread
// in CatalogService changes the pendingVersionNumbersFromCatalog after do
// the check here. However, there are only two possible operations that can
// happen from outside with respect to this version list. Either some thread
// adds a new version to the list after we looked at it, or the table is
// invalidated. In both the cases, it is OK since the new version added is
// guaranteed to be greater than versionNumberFromEvent and if the table is
// invalidated, this operation (this whole event) becomes a no-op
catalog_.removeFromInFlightVersionsForEvents(
dbName_, tblName_, versionNumberFromEvent_);
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
return true;
}
return false;
}
/**
* This method should be implemented by subclasses to initialize the values of
* self-event identifiers by parsing the event data. These identifiers are later
* used to determine if this event needs to be processed or not. See
* <code>isSelfEvent</code> method for details.
*/
protected void initSelfEventIdentifiersFromEvent() {
throw new UnsupportedOperationException(
String.format("%s is not supported", ClassUtil.getMethodName()));
}
protected static String getStringProperty(
Map<String, String> params, String key, String defaultVal) {
if (params == null) return defaultVal;
return params.getOrDefault(key, defaultVal);
}
}
/**
* Base class for all the table events
*/
public static abstract class MetastoreTableEvent extends MetastoreEvent {
// tblName from the event
protected final String tblName_;
// tbl object from the Notification event, corresponds to the before tableObj in
// case of alter events
protected org.apache.hadoop.hive.metastore.api.Table msTbl_;
private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
Metrics metrics, NotificationEvent event) {
super(catalogServiceCatalog, metrics, event);
Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
tblName_ = Preconditions.checkNotNull(event.getTableName());
debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
getFullyQualifiedTblName());
}
/**
* Util method to return the fully qualified table name which is of the format
* dbName.tblName for this event
*/
protected String getFullyQualifiedTblName() {
return new TableName(dbName_, tblName_).toString();
}
/**
* Util method to issue invalidate on a given table on the catalog. This method
* atomically invalidates the table if it exists in the catalog. No-op if the table
* does not exist
*/
protected boolean invalidateCatalogTable() {
boolean tableInvalidated =
catalog_.invalidateTableIfExists(dbName_, tblName_) != null;
if (tableInvalidated) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_INVALIDATES).inc();
}
return tableInvalidated;
}
/**
* Checks if the table level property is set in the parameters of the table from the
* event. If it is available, it takes precedence over database level flag for this
* table. If the table level property is not set, returns the value from the database
* level property.f
*
* @return Boolean value of the table property with the key
* <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code>. Else,
* returns the database property which is associated with this table. Returns
* false if neither of the properties are set.
*/
@Override
protected boolean isEventProcessingDisabled() {
Preconditions.checkNotNull(msTbl_);
Boolean tblProperty = getHmsSyncProperty(msTbl_);
if (tblProperty != null) {
infoLog("Found table level flag {} is set to {} for table {}",
MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
tblProperty.toString(),
getFullyQualifiedTblName());
return tblProperty;
}
// if the tbl property is not set check at db level
String dbFlagVal = catalog_.getDbProperty(dbName_,
MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
if (dbFlagVal != null) {
// no need to spew unnecessary logs. Most tables/databases are expected to not
// have this flag set when event based HMS polling is enabled
debugLog("Table level flag is not set. Db level flag {} is {} for "
+ "database {}",
MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(),
dbFlagVal, dbName_);
}
// flag value of null also returns false
return Boolean.valueOf(dbFlagVal);
}
/**
* Gets the value of the parameter with the key
* <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> from the given
* table
*
* @return the Boolean value of the property with the key
* <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> if it is
* available else returns null
*/
public static Boolean getHmsSyncProperty(
org.apache.hadoop.hive.metastore.api.Table tbl) {
if (!tbl.isSetParameters()) return null;
String val =
tbl.getParameters()
.get(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
if (val == null || val.isEmpty()) return null;
return Boolean.valueOf(val);
}
/**
* Util method to create partition key-value map from HMS Partition objects.
*/
protected static List<TPartitionKeyValue> getTPartitionSpecFromHmsPartition(
org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
List<TPartitionKeyValue> tPartSpec = new ArrayList<>();
List<org.apache.hadoop.hive.metastore.api.FieldSchema> fsList =
msTbl.getPartitionKeys();
List<String> partVals = partition.getValues();
Preconditions.checkNotNull(partVals);
Preconditions.checkState(fsList.size() == partVals.size());
for (int i = 0; i < fsList.size(); i++) {
tPartSpec.add(new TPartitionKeyValue(fsList.get(i).getName(), partVals.get(i)));
}
return tPartSpec;
}
/**
* Util method to create a partition spec string out of a TPartitionKeyValue objects.
*/
protected static String constructPartitionStringFromTPartitionSpec(
List<TPartitionKeyValue> tPartSpec) {
List<String> partitionCols = new ArrayList<>();
List<String> partitionVals = new ArrayList<>();
for (TPartitionKeyValue kv: tPartSpec) {
partitionCols.add(kv.getName());
partitionVals.add(kv.getValue());
}
String partString = FileUtils.makePartName(partitionCols, partitionVals);
return partString;
}
/*
* Helper function to initiate a table reload on Catalog. Re-throws the exception if
* the catalog operation throws.
*/
protected void reloadTableFromCatalog(String operation) throws CatalogException {
if (!catalog_.reloadTableIfExists(dbName_, tblName_,
"Processing " + operation + " event from HMS")) {
debugLog("Automatic refresh on table {} failed as the table is not "
+ "present either in catalog or metastore.", getFullyQualifiedTblName());
} else {
infoLog("Table {} has been refreshed after " + operation +".",
getFullyQualifiedTblName());
}
}
}
/**
* Base class for all the database events
*/
public static abstract class MetastoreDatabaseEvent extends MetastoreEvent {
MetastoreDatabaseEvent(CatalogServiceCatalog catalogServiceCatalog, Metrics metrics,
NotificationEvent event) {
super(catalogServiceCatalog, metrics, event);
Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
debugLog("Creating event {} of type {} on database {}", eventId_,
eventType_, dbName_);
}
/**
* Even though there is a database level property
* <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code> it is only used
* for tables within that
* database. As such this property does not control if the database level DDLs are
* skipped or not.
*
* @return false
*/
@Override
protected boolean isEventProcessingDisabled() {
return false;
}
}
/**
* MetastoreEvent for CREATE_TABLE event type
*/
public static class CreateTableEvent extends MetastoreTableEvent {
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private CreateTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
CreateTableMessage createTableMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getCreateTableMessage(event.getMessage());
try {
msTbl_ = createTableMessage.getTableObj();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to deserialize the event message"), e);
}
}
/**
* If the table provided in the catalog does not exist in the catalog, this method
* will create it. If the table in the catalog already exists, it relies of the
* creationTime of the Metastore Table to resolve the conflict. If the catalog table's
* creation time is less than creationTime of the table from the event, it will be
* overridden. Else, it will ignore the event
*/
@Override
public void process() throws MetastoreNotificationException {
// check if the table exists already. This could happen in corner cases of the
// table being dropped and recreated with the same name or in case this event is
// a self-event (see description of self-event in the class documentation of
// MetastoreEventsProcessor)
try {
if (!catalog_.addTableIfNotExists(dbName_, tblName_)) {
debugLog(
"Not adding the table {} since it already exists in catalog", tblName_);
return;
}
} catch (CatalogException e) {
// if a exception is thrown, it could be due to the fact that the db did not
// exist in the catalog cache. This could only happen if the previous
// create_database event for this table errored out
throw new MetastoreNotificationException(debugString(
"Unable to add table while processing for table %s because the "
+ "database doesn't exist. This could be due to a previous error while "
+ "processing CREATE_DATABASE event for the database %s",
getFullyQualifiedTblName(), dbName_), e);
}
debugLog("Added a table {}", getFullyQualifiedTblName());
}
@Override
public boolean isRemovedAfter(List<MetastoreEvent> events) {
Preconditions.checkNotNull(events);
for (MetastoreEvent event : events) {
if (event.eventType_.equals(MetastoreEventType.DROP_TABLE)) {
DropTableEvent dropTableEvent = (DropTableEvent) event;
if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
.equalsIgnoreCase(dropTableEvent.tblName_)) {
infoLog("Found table {} is removed later in event {} type {}", tblName_,
dropTableEvent.eventId_, dropTableEvent.eventType_);
return true;
}
} else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
// renames are implemented as a atomic (drop+create) so rename events can
// also be treated as a inverse event of the create_table event. Consider a
// DDL op sequence like create table, alter table rename from impala. Since
// the rename operation is internally implemented as drop+add, processing a
// create table event on this cluster will show up the table for small window
// of time, until the actual rename event is processed. If however, we ignore
// the create table event, the alter rename event just becomes a addIfNotExists
// event which is valid for both a self-event and external event cases
AlterTableEvent alterTableEvent = (AlterTableEvent) event;
if (alterTableEvent.isRename_
&& dbName_.equalsIgnoreCase(alterTableEvent.msTbl_.getDbName())
&& tblName_.equalsIgnoreCase(alterTableEvent.msTbl_.getTableName())) {
infoLog("Found table {} is renamed later in event {} type {}", tblName_,
alterTableEvent.eventId_, alterTableEvent.eventType_);
return true;
}
}
}
return false;
}
}
/**
* Metastore event handler for INSERT events. Handles insert events at both table
* and partition scopes.
*/
public static class InsertEvent extends MetastoreTableEvent {
// Represents the partition for this insert. Null if the table is unpartitioned.
private Partition insertPartition_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
@VisibleForTesting
InsertEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.INSERT.equals(eventType_));
InsertMessage insertMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getInsertMessage(event.getMessage());
try {
msTbl_ = Preconditions.checkNotNull(insertMessage.getTableObj());
insertPartition_ = insertMessage.getPtnObj();
} catch (Exception e) {
throw new MetastoreNotificationException(debugString("Unable to "
+ "parse insert message"), e);
}
}
/**
* Currently we do not check for self-events in Inserts. Existing self-events logic
* cannot be used for insert events since firing insert event does not allow us to
* modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
* Insert Events.
* TODO: Handle self-events for insert case.
*/
@Override
public void process() throws MetastoreNotificationException {
// Reload the whole table if it's a transactional table.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
insertPartition_ = null;
}
if (insertPartition_ != null) {
processPartitionInserts();
} else {
processTableInserts();
}
}
/**
* Process partition inserts
*/
private void processPartitionInserts() throws MetastoreNotificationException {
// For partitioned table, refresh the partition only.
Preconditions.checkNotNull(insertPartition_);
List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
insertPartition_);
try {
// Ignore event if table or database is not in catalog. Throw exception if
// refresh fails. If the partition does not exist in metastore the reload
// method below removes it from the catalog
if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
"processing partition-level INSERT event from HMS")) {
debugLog("Refresh of table {} partition {} after insert "
+ "event failed as the table is not present in the catalog.",
getFullyQualifiedTblName(), (tPartSpec));
} else {
infoLog("Table {} partition {} has been refreshed after insert.",
getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
}
} catch (DatabaseNotFoundException e) {
debugLog("Refresh of table {} partition {} for insert "
+ "event failed as the database is not present in the catalog.",
getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+ "partition on table {} partition {} failed. Event processing cannot "
+ "continue. Issue and invalidate command to reset the event processor "
+ "state.", getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
}
}
/**
* Process unpartitioned table inserts
*/
private void processTableInserts() throws MetastoreNotificationException {
// For non-partitioned tables, refresh the whole table.
Preconditions.checkArgument(insertPartition_ == null);
try {
// Ignore event if table or database is not in the catalog. Throw exception if
// refresh fails.
reloadTableFromCatalog("table-level INSERT");
} catch (DatabaseNotFoundException e) {
debugLog("Automatic refresh of table {} insert failed as the "
+ "database is not present in the catalog.", getFullyQualifiedTblName());
} catch (CatalogException e) {
if (e instanceof TableLoadingException &&
e.getCause() instanceof NoSuchObjectException) {
LOG.warn(
"Ignoring the refresh of the table since the table does"
+ " not exist in metastore anymore");
} else {
throw new MetastoreNotificationNeedsInvalidateException(
debugString("Refresh table {} failed. Event processing "
+ "cannot continue. Issue an invalidate metadata command to reset "
+ "the event processor state.", getFullyQualifiedTblName()), e);
}
}
}
}
/**
* MetastoreEvent for ALTER_TABLE event type
*/
public static class AlterTableEvent extends TableInvalidatingEvent {
protected org.apache.hadoop.hive.metastore.api.Table tableBefore_;
// the table object after alter operation, as parsed from the NotificationEvent
protected org.apache.hadoop.hive.metastore.api.Table tableAfter_;
// true if this alter event was due to a rename operation
private final boolean isRename_;
// value of event sync flag for this table before the alter operation
private final Boolean eventSyncBeforeFlag_;
// value of the event sync flag if available at this table after the alter operation
private final Boolean eventSyncAfterFlag_;
// value of the db flag at the time of event creation
private final boolean dbFlagVal;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
@VisibleForTesting
AlterTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
JSONAlterTableMessage alterTableMessage =
(JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getAlterTableMessage(event.getMessage());
try {
msTbl_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter table message"), e);
}
// this is a rename event if either dbName or tblName of before and after object
// changed
isRename_ = !msTbl_.getDbName().equalsIgnoreCase(tableAfter_.getDbName())
|| !msTbl_.getTableName().equalsIgnoreCase(tableAfter_.getTableName());
eventSyncBeforeFlag_ = getHmsSyncProperty(msTbl_);
eventSyncAfterFlag_ = getHmsSyncProperty(tableAfter_);
dbFlagVal =
Boolean.valueOf(catalog_.getDbProperty(dbName_,
MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()));
}
@Override
protected void initSelfEventIdentifiersFromEvent() {
versionNumberFromEvent_ = Long.parseLong(
getStringProperty(tableAfter_.getParameters(),
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
serviceIdFromEvent_ =
getStringProperty(tableAfter_.getParameters(),
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
try {
if (isRename_) {
//if this is rename event then identifiers will be in tableAfter
pendingVersionNumbersFromCatalog_ = catalog_
.getInFlightVersionsForEvents(tableAfter_.getDbName(),
tableAfter_.getTableName());
} else {
pendingVersionNumbersFromCatalog_ = catalog_
.getInFlightVersionsForEvents(msTbl_.getDbName(), msTbl_.getTableName());
}
} catch (TableNotFoundException | DatabaseNotFoundException e) {
debugLog("Received exception {}. Ignoring self-event evaluation",
e.getMessage());
}
}
/**
* If the ALTER_TABLE event is due a table rename, this method removes the old table
* and creates a new table with the new name. Else, this just issues a invalidate
* table on the tblName from the event
*/
@Override
public void process() throws MetastoreNotificationException, CatalogException {
// Determine whether this is an event which we have already seen or if it is a new
// event
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event or "
+ "update is already present in catalog.");
return;
}
// Ignore the event if this is a trivial event. See javadoc for
// canBeSkipped() for examples.
if (canBeSkipped()) {
infoLog("Not processing this event as it only modifies some table parameters "
+ "which can be ignored.");
return;
}
// in case of table level alters from external systems it is better to do a full
// invalidate eg. this could be due to as simple as adding a new parameter or a
// full blown adding or changing column type
// detect the special where a table is renamed
if (!isRename_) {
// table is not renamed, need to invalidate
if (!invalidateCatalogTable()) {
if (wasEventSyncTurnedOn()) {
// we received this alter table event on a non-existing table. We also
// detect that event sync was turned on in this event. This may mean that
// the table creation was skipped earlier because event sync was turned off
// we don't really know how many of events we have skipped till now because
// the sync was disabled all this while before we receive such a event. We
// error on the side of caution by stopping the event processing and
// letting the user to issue a invalidate metadata to reset the state
throw new MetastoreNotificationNeedsInvalidateException(debugString(
"Detected that event sync was turned on for the table %s "
+ "and the table does not exist. Event processing cannot be "
+ "continued further. Issue a invalidate metadata command to reset "
+ "the event processing state", getFullyQualifiedTblName()));
}
debugLog("Table {} does not need to be "
+ "invalidated since it does not exist anymore",
getFullyQualifiedTblName());
} else {
infoLog("Table {} is invalidated", getFullyQualifiedTblName());
}
return;
}
// table was renamed, remove the old table
infoLog("Found that {} table was renamed. Renaming it by "
+ "remove and adding a new table",
new TableName(msTbl_.getDbName(), msTbl_.getTableName()));
TTableName oldTTableName =
new TTableName(msTbl_.getDbName(), msTbl_.getTableName());
TTableName newTTableName =
new TTableName(tableAfter_.getDbName(), tableAfter_.getTableName());
// Table is renamed if old db and table exist in catalog. If the rename is to a
// different database, we check if this other database exists in catalog. If
// either the old table, old database or new database are not in catalog, we skip
// this event.
if (!catalog_.renameTableIfExists(oldTTableName, newTTableName)) {
debugLog("Did not remove old table to rename table {} to {} since "
+ "it does not exist anymore or either the old database or the new "
+ "database don't exist anymore.", qualify(oldTTableName),
qualify(newTTableName));
} else {
infoLog("Renamed old table {} to new table {}.", qualify(oldTTableName),
qualify(newTTableName));
}
}
/**
* Detects a event sync flag was turned on in this event
*/
private boolean wasEventSyncTurnedOn() {
// the eventsync flag was not changed
if (Objects.equals(eventSyncBeforeFlag_, eventSyncAfterFlag_)) return false;
// eventSync after flag is null or if it is explicitly set to false
if ((eventSyncAfterFlag_ == null && !dbFlagVal) || !eventSyncAfterFlag_) {
return true;
}
return false;
}
@Override
protected boolean canBeSkipped() {
// Certain alter events just modify some parameters such as
// "transient_lastDdlTime" in Hive. For eg: the alter table event generated
// along with insert events. Check if the alter table event is such a trivial
// event by setting those parameters equal before and after the event and
// comparing the objects.
// Avoid modifying the object from event.
org.apache.hadoop.hive.metastore.api.Table tblAfter = tableAfter_.deepCopy();
setTrivialParameters(tableBefore_.getParameters(), tblAfter.getParameters());
return tblAfter.equals(tableBefore_);
}
private String qualify(TTableName tTableName) {
return new TableName(tTableName.db_name, tTableName.table_name).toString();
}
/**
* In case of alter table events, it is possible that the alter event is generated
* because user changed the value of the parameter
* <code>MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC</code>. If the
* parameter is unchanged, it doesn't
* matter if you use the before or after table object here since the eventual action
* is going be invalidate or rename. If however, the parameter is changed, couple of
* things could happen. The flag changes from unset/false to true or it changes from
* true to false/unset. In the first case, we want to process the event (and ignore
* subsequent events on this table). In the second case, we should process the event
* (as well as all the subsequent events on the table). So we always process this
* event when the value of the flag is changed.
*
* @return true, if this event needs to be skipped. false if this event needs to be
* processed.
*/
@Override
protected boolean isEventProcessingDisabled() {
// if the event sync flag was changed then we always process this event
if (!Objects.equals(eventSyncBeforeFlag_, eventSyncAfterFlag_)) {
infoLog("Before flag value {} after flag value {} changed for table {}",
eventSyncBeforeFlag_, eventSyncAfterFlag_, getFullyQualifiedTblName());
return false;
}
// flag is unchanged, use the default impl from base class
return super.isEventProcessingDisabled();
}
}
/**
* MetastoreEvent for the DROP_TABLE event type
*/
public static class DropTableEvent extends MetastoreTableEvent {
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private DropTableEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
JSONDropTableMessage dropTableMessage =
(JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getDropTableMessage(event.getMessage());
try {
msTbl_ = Preconditions.checkNotNull(dropTableMessage.getTableObj());
} catch (Exception e) {
throw new MetastoreNotificationException(debugString(
"Could not parse event message. "
+ "Check if %s is set to true in metastore configuration",
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
}
}
/**
* Process the drop table event type. If the table from the event doesn't exist in the
* catalog, ignore the event. If the table exists in the catalog, compares the
* createTime of the table in catalog with the createTime of the table from the event
* and remove the catalog table if there is a match. If the catalog table is a
* incomplete table it is removed as well. The creation_time from HMS is unfortunately
* in seconds granularity, which means there is a limitation that we cannot
* distinguish between tables which are created with the same name within a second.
* So a sequence of create_table, drop_table, create_table happening within the
* same second might cause false positives on drop_table event processing. This is
* not a huge problem since the tables will eventually be created when the
* create events are processed but there will be a non-zero amount of time when the
* table will not be existing in catalog.
* TODO : Once HIVE-21595 is available we should rely on table_id for determining a
* newer incarnation of a previous table.
*/
@Override
public void process() {
Reference<Boolean> tblWasFound = new Reference<>();
Reference<Boolean> tblMatched = new Reference<>();
Table removedTable = catalog_.removeTableIfExists(msTbl_, tblWasFound, tblMatched);
if (removedTable != null) {
infoLog("Removed table {} ", getFullyQualifiedTblName());
} else if (!tblWasFound.getRef()) {
debugLog("Table {} was not removed since it did not exist in catalog.",
tblName_);
} else if (!tblMatched.getRef()) {
infoLog(debugString("Table %s was not removed from "
+ "catalog since the creation time of the table did not match", tblName_));
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
}
}
}
/**
* MetastoreEvent for CREATE_DATABASE event type
*/
public static class CreateDatabaseEvent extends MetastoreDatabaseEvent {
// metastore database object as parsed from NotificationEvent message
private final Database createdDatabase_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private CreateDatabaseEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
JSONCreateDatabaseMessage createDatabaseMessage =
(JSONCreateDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getCreateDatabaseMessage(event.getMessage());
try {
createdDatabase_ =
Preconditions.checkNotNull(createDatabaseMessage.getDatabaseObject());
} catch (Exception e) {
throw new MetastoreNotificationException(debugString(
"Database object is null in the event. "
+ "This could be a metastore configuration problem. "
+ "Check if %s is set to true in metastore configuration",
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
}
}
/**
* Processes the create database event by adding the Db object from the event if it
* does not exist in the catalog already.
*/
@Override
public void process() {
// if the database already exists in catalog, by definition, it is a later version
// of the database since metastore will not allow it be created if it was already
// existing at the time of creation. In such case, it is safe to assume that the
// already existing database in catalog is a later version with the same name and
// this event can be ignored
if (catalog_.addDbIfNotExists(dbName_, createdDatabase_)) {
infoLog("Successfully added database {}", dbName_);
} else {
infoLog("Database {} already exists", dbName_);
}
}
@Override
public boolean isRemovedAfter(List<MetastoreEvent> events) {
Preconditions.checkNotNull(events);
for (MetastoreEvent event : events) {
if (event.eventType_.equals(MetastoreEventType.DROP_DATABASE)) {
DropDatabaseEvent dropDatabaseEvent = (DropDatabaseEvent) event;
if (dbName_.equalsIgnoreCase(dropDatabaseEvent.dbName_)) {
infoLog("Found database {} is removed later in event {} of type {} ", dbName_,
dropDatabaseEvent.eventId_, dropDatabaseEvent.eventType_);
return true;
}
}
}
return false;
}
}
/**
* MetastoreEvent for ALTER_DATABASE event type
*/
public static class AlterDatabaseEvent extends MetastoreDatabaseEvent {
// metastore database object as parsed from NotificationEvent message
private final Database alteredDatabase_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private AlterDatabaseEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.ALTER_DATABASE.equals(eventType_));
JSONAlterDatabaseMessage alterDatabaseMessage =
(JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getAlterDatabaseMessage(event.getMessage());
try {
alteredDatabase_ =
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter database message"), e);
}
}
/**
* Processes the alter database event by replacing the catalog cached Db object with
* the Db object from the event
*/
@Override
public void process() throws CatalogException, MetastoreNotificationException {
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
}
Preconditions.checkNotNull(alteredDatabase_);
// If not self event, copy Db object from event to catalog
if (!catalog_.updateDbIfExists(alteredDatabase_)) {
// Okay to skip this event. Events processor will not error out.
debugLog("Update database {} failed as the database is not present in the "
+ "catalog.", alteredDatabase_.getName());
} else {
infoLog("Database {} updated after alter database event.",
alteredDatabase_.getName());
}
}
@Override
protected void initSelfEventIdentifiersFromEvent() {
versionNumberFromEvent_ = Long.parseLong(getStringProperty(
alteredDatabase_.getParameters(),
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
serviceIdFromEvent_ = getStringProperty(
alteredDatabase_.getParameters(),
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
try {
pendingVersionNumbersFromCatalog_ =
catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
} catch (DatabaseNotFoundException | TableNotFoundException e) {
// ok to ignore this exception, since if the db doesn't exit, this event needs
// to be ignored anyways
debugLog("Received exception {}. Ignoring self-event evaluation", e.getMessage());
}
}
}
/**
* MetastoreEvent for the DROP_DATABASE event
*/
public static class DropDatabaseEvent extends MetastoreDatabaseEvent {
// Metastore database object as parsed from NotificationEvent message
private final Database droppedDatabase_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private DropDatabaseEvent(
CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event)
throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkArgument(MetastoreEventType.DROP_DATABASE.equals(eventType_));
JSONDropDatabaseMessage dropDatabaseMessage =
(JSONDropDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getDropDatabaseMessage(event.getMessage());
try {
droppedDatabase_ =
Preconditions.checkNotNull(dropDatabaseMessage.getDatabaseObject());
} catch (Exception e) {
throw new MetastoreNotificationException(debugString(
"Database object is null in the event. "
+ "This could be a metastore configuration problem. "
+ "Check if %s is set to true in metastore configuration",
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
}
}
/**
* Process the drop database event. This handler removes the db object from catalog
* only if the CREATION_TIME of the catalog's database object is lesser than or equal
* to that of the database object present in the notification event. If the
* CREATION_TIME of the catalog's DB object is greater than that of the notification
* event's DB object, it means that the Database object present in the catalog is a
* later version and we can skip the event. (For instance, when user does a create db,
* drop db and create db again with the same dbName.).
* The creation_time from HMS is unfortunately in seconds granularity, which means
* there is a limitation that we cannot distinguish between databases which are
* created with the same name within a second. So a sequence of create_database,
* drop_database, create_database happening within the same second might cause
* false positives on drop_database event processing. This is not a huge problem
* since the databases will eventually be created when the create events are
* processed but there will be a non-zero amount of time when the database will not
* be existing in catalog.
* TODO : Once HIVE-21595 is available we should rely on database_id for determining a
* newer incarnation of a previous database.
*/
@Override
public void process() {
Reference<Boolean> dbFound = new Reference<>();
Reference<Boolean> dbMatched = new Reference<>();
Db removedDb = catalog_.removeDbIfExists(droppedDatabase_, dbFound, dbMatched);
if (removedDb != null) {
infoLog("Removed Database {} ", dbName_);
} else if (!dbFound.getRef()) {
debugLog("Database {} was not removed since it " +
"did not exist in catalog.", dbName_);
} else if (!dbMatched.getRef()) {
infoLog(debugString("Database %s was not removed from catalog since "
+ "the creation time of the Database did not match", dbName_));
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
}
}
}
/**
* MetastoreEvent for which issues invalidate on a table from the event
*/
public static abstract class TableInvalidatingEvent extends MetastoreTableEvent {
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private TableInvalidatingEvent(
CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event) {
super(catalog, metrics, event);
}
/**
* Issues a invalidate table on the catalog on the table from the event. This
* invalidate does not fetch information from metastore unlike the invalidate metadata
* command since event is triggered post-metastore activity. This handler invalidates
* by atomically removing existing loaded table and replacing it with a
* IncompleteTable. If the table doesn't exist in catalog this operation is a no-op
*/
@Override
public void process() throws MetastoreNotificationException, CatalogException {
// skip event processing in case its a self-event
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
}
// Skip if it's only a change in parameters by Hive, which can be ignored.
if (canBeSkipped()) {
infoLog("Not processing this event as it only modifies some "
+ "parameters which can be ignored.");
return;
}
if (invalidateCatalogTable()) {
infoLog("Table {} is invalidated", getFullyQualifiedTblName());
} else {
debugLog("Table {} does not need to be invalidated since "
+ "it does not exist anymore", getFullyQualifiedTblName());
}
}
protected static String getStringProperty(
Map<String, String> params, String key, String defaultVal) {
if (params == null) return defaultVal;
return params.getOrDefault(key, defaultVal);
}
/**
* Returns a list of parameters that are set by Hive for tables/partitions that can be
* ignored to determine if the alter table/partition event is a trivial one.
*/
static final List<String> parametersToIgnore = new ImmutableList.Builder<String>()
.add("transient_lastDdlTime")
.add("totalSize")
.add("numFilesErasureCoded")
.add("numFiles")
.build();
/**
* Util method that sets the parameters that can be ignored equal before and after
* event.
*/
static void setTrivialParameters(Map<String, String> parametersBefore,
Map<String, String> parametersAfter) {
for (String parameter: parametersToIgnore) {
String val = parametersBefore.get(parameter);
if (val == null) {
parametersAfter.remove(parameter);
} else {
parametersAfter.put(parameter, val);
}
}
}
/**
* Hive generates certain trivial alter events for eg: change only
* "transient_lastDdlTime". This method returns true if the alter partition event is
* such a trivial event.
*/
protected abstract boolean canBeSkipped();
}
public static class AddPartitionEvent extends TableInvalidatingEvent {
private Partition lastAddedPartition_;
private final List<Partition> addedPartitions_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private AddPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkState(eventType_.equals(MetastoreEventType.ADD_PARTITION));
if (event.getMessage() == null) {
throw new IllegalStateException(debugString("Event message is null"));
}
try {
AddPartitionMessage addPartitionMessage_ =
MetastoreEventsProcessor.getMessageDeserializer()
.getAddPartitionMessage(event.getMessage());
addedPartitions_ =
Lists.newArrayList(addPartitionMessage_.getPartitionObjs());
// it is possible that the added partitions is empty in certain cases. See
// IMPALA-8847 for example
if (!addedPartitions_.isEmpty()) {
// when multiple partitions are added in HMS they are all added as one
// transaction Hence all the partitions which are present in the message must
// have the same serviceId and version if it is set. hence it is fine to just
// look at the last added partition in the list and use it for the self-event
// ids
lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
}
msTbl_ = addPartitionMessage_.getTableObj();
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
}
@Override
public void process() throws MetastoreNotificationException, CatalogException {
// bail out early if there are not partitions to process
if (addedPartitions_.isEmpty()) {
infoLog("Partition list is empty. Ignoring this event.");
return;
}
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
}
try {
// Reload the whole table if it's a transactional table.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
reloadTableFromCatalog("ADD_PARTITION");
} else {
boolean success = true;
// HMS adds partitions in a transactional way. This means there may be multiple
// HMS partition objects in an add_partition event. We try to do the same here
// by refreshing all those partitions in a loop. If any partition refresh fails,
// we throw MetastoreNotificationNeedsInvalidateException exception. We skip
// refresh of the partitions if the table is not present in the catalog.
infoLog("Trying to refresh {} partitions added to table {} in the event",
addedPartitions_.size(), getFullyQualifiedTblName());
for (Partition partition : addedPartitions_) {
List<TPartitionKeyValue> tPartSpec =
getTPartitionSpecFromHmsPartition(msTbl_, partition);
if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
"processing ADD_PARTITION event from HMS")) {
debugLog("Refresh partitions on table {} failed as table was not present " +
"in the catalog.", getFullyQualifiedTblName());
success = false;
break;
}
}
if (success) {
infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
getFullyQualifiedTblName());
}
}
} catch (DatabaseNotFoundException e) {
debugLog("Refresh partitions on table {} after add_partitions event failed as "
+ "the database was not present in the catalog.",
getFullyQualifiedTblName());
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+ "refresh newly added partitions of table {}. Event processing cannot "
+ "continue. Issue an invalidate command to reset event processor.",
getFullyQualifiedTblName()), e);
}
}
@Override
protected void initSelfEventIdentifiersFromEvent() {
versionNumberFromEvent_ = Long.parseLong(getStringProperty(
lastAddedPartition_.getParameters(),
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
serviceIdFromEvent_ = getStringProperty(
lastAddedPartition_.getParameters(),
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
try {
pendingVersionNumbersFromCatalog_ =
catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
} catch (DatabaseNotFoundException | TableNotFoundException e) {
// ok to ignore this exception, since if the db doesn't exit, this event needs
// to be ignored anyways
debugLog("Received exception {}. Ignoring self-event evaluation",
e.getMessage());
}
}
@Override
protected boolean canBeSkipped() { return false; }
}
public static class AlterPartitionEvent extends TableInvalidatingEvent {
// the Partition object before alter operation, as parsed from the NotificationEvent
private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_;
// the Partition object after alter operation, as parsed from the NotificationEvent
private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private AlterPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkState(eventType_.equals(MetastoreEventType.ALTER_PARTITION));
Preconditions.checkNotNull(event.getMessage());
AlterPartitionMessage alterPartitionMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getAlterPartitionMessage(event.getMessage());
try {
partitionBefore_ =
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
partitionAfter_ =
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
msTbl_ = alterPartitionMessage.getTableObj();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter partition message"), e);
}
}
@Override
public void process() throws MetastoreNotificationException, CatalogException {
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
}
// Ignore the event if this is a trivial event. See javadoc for
// isTrivialAlterPartitionEvent() for examples.
if (canBeSkipped()) {
infoLog("Not processing this event as it only modifies some partition "
+ "parameters which can be ignored.");
return;
}
// Reload the whole table if it's a transactional table.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
reloadTableFromCatalog("ALTER_PARTITION");
} else {
// Refresh the partition that was altered.
Preconditions.checkNotNull(partitionAfter_);
List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
partitionAfter_);
try {
// Ignore event if table or database is not in catalog. Throw exception if
// refresh fails.
if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
"processing ALTER_PARTITION event from HMS")) {
debugLog("Refresh of table {} partition {} failed as the table "
+ "is not present in the catalog.", getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
} else {
infoLog("Table {} partition {} has been refreshed",
getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
}
} catch (DatabaseNotFoundException e) {
debugLog("Refresh of table {} partition {} "
+ "event failed as the database is not present in the catalog.",
getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+ "partition on table {} partition {} failed. Event processing cannot "
+ "continue. Issue and invalidate command to reset the event processor "
+ "state.", getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
}
}
}
@Override
protected boolean canBeSkipped() {
// Certain alter events just modify some parameters such as
// "transient_lastDdlTime" in Hive. For eg: the alter table event generated
// along with insert events. Check if the alter table event is such a trivial
// event by setting those parameters equal before and after the event and
// comparing the objects.
// Avoid modifying the object from event.
Partition afterPartition = partitionAfter_.deepCopy();
setTrivialParameters(partitionBefore_.getParameters(),
afterPartition.getParameters());
return afterPartition.equals(partitionBefore_);
}
@Override
protected void initSelfEventIdentifiersFromEvent() {
versionNumberFromEvent_ = Long.parseLong(getStringProperty(
partitionAfter_.getParameters(),
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
serviceIdFromEvent_ = getStringProperty(
partitionAfter_.getParameters(),
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
try {
pendingVersionNumbersFromCatalog_ =
catalog_.getInFlightVersionsForEvents(dbName_, tblName_);
} catch (DatabaseNotFoundException | TableNotFoundException e) {
// ok to ignore since if the db or table doesn't exist the event is ignored
// anyways
debugLog("Received exception {}. Ignoring self-event evaluation",
e.getMessage());
}
}
}
public static class DropPartitionEvent extends TableInvalidatingEvent {
private final List<Map<String, String>> droppedPartitions_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private DropPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalog, metrics, event);
Preconditions.checkState(eventType_.equals(MetastoreEventType.DROP_PARTITION));
Preconditions.checkNotNull(event.getMessage());
DropPartitionMessage dropPartitionMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getDropPartitionMessage(event.getMessage());
try {
msTbl_ = Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
droppedPartitions_ = dropPartitionMessage.getPartitions();
Preconditions.checkNotNull(droppedPartitions_);
} catch (Exception ex) {
throw new MetastoreNotificationException(
debugString("Could not parse event message. "
+ "Check if %s is set to true in metastore configuration",
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY),
ex);
}
}
@Override
public void process() throws MetastoreNotificationException {
// we have seen cases where a add_partition event is generated with empty
// partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
// list is not empty
if (droppedPartitions_.isEmpty()) {
infoLog("Partition list is empty. Ignoring this event.");
}
// We do not need self event as dropPartition() call is a no-op if the directory
// doesn't exist.
try {
// Reload the whole table if it's a transactional table.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
reloadTableFromCatalog("DROP_PARTITION");
} else {
boolean success = true;
// We refresh all the partitions that were dropped from HMS. If a refresh
// fails, we throw a MetastoreNotificationNeedsInvalidateException
infoLog("{} partitions dropped from table {}. Trying "
+ "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
for (Map<String, String> partSpec : droppedPartitions_) {
List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
for (Map.Entry<String, String> entry : partSpec.entrySet()) {
tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
}
if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
"processing DROP_PARTITION event from HMS")) {
debugLog("Could not refresh partition {} of table {} as table "
+ "was not present in the catalog.",
getFullyQualifiedTblName());
success = false;
break;
}
}
if (success) {
infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
getFullyQualifiedTblName());
}
}
} catch (DatabaseNotFoundException e) {
debugLog("Could not refresh partitions of table {}"
+ "as database was not present in the catalog.", getFullyQualifiedTblName());
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+ "drop some partitions from table {} after a drop partitions event. Event "
+ "processing cannot continue. Issue an invalidate command to reset "
+ "event processor state.", getFullyQualifiedTblName()), e);
}
}
@Override
protected void initSelfEventIdentifiersFromEvent() {
// no-op
}
@Override
protected boolean canBeSkipped() { return false; }
}
/**
* An event type which is ignored. Useful for unsupported metastore event types
*/
public static class IgnoredEvent extends MetastoreEvent {
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private IgnoredEvent(
CatalogServiceCatalog catalog, Metrics metrics, NotificationEvent event) {
super(catalog, metrics, event);
}
@Override
public void process() {
debugLog("Ignored");
}
@Override
protected boolean isEventProcessingDisabled() {
return false;
}
}
}