blob: 139d281d7a0a75349d8e8200c4e7ef2da267a5ac [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.events;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.MetastoreClientInstantiationException;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A metastore event is a instance of the class
* <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be
* configured, to work with Listeners which are called on various DDL operations like
* create/alter/drop operations on database, table, partition etc. Each event has a unique
* incremental id and the generated events are be fetched from Metastore to get
* incremental updates to the metadata stored in Hive metastore using the the public API
* <code>get_next_notification</code> These events could be generated by external
* Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters
* configured to talk with the same metastore.
*
* This class is used to poll metastore for such events at a given frequency. By observing
* such events, we can take appropriate action on the catalogD
* (refresh/invalidate/add/remove) so that catalog represents the latest information
* available in metastore. We keep track of the last synced event id in each polling
* iteration so the next batch can be requested appropriately. The current batch size is
* constant and set to MAX_EVENTS_PER_RPC.
*
* <pre>
* +---------------+ +----------------+ +--------------+
* |Catalog state | |Catalog | | |
* |stale | |State up-to-date| |Catalog state |
* |(apply event) | |(ignore) | |is newer than |
* | | | | |event |
* | | | | |(ignore) |
* +------+--------+ +-----+----------+ +-+------------+
* | | |
* | | |
* | | |
* | | |
* | | |
* +-----------V------------------V---------------------V-----------> Event Timeline
* ^
* |
* |
* |
* |
* E
*
* </pre>
* Consistency model: Events could be seen as DDLs operations from past either done from
* this same cluster or some other external system. For example in the events timeline
* given above, consider a Event E at any given time. The catalog state for the
* corresponding object of the event could either be stale, exactly-same or at a version
* which is higher than one provided by event. Catalog state should only be updated when
* it is stale with respect to the event. In order to determine if the catalog object is
* stale, we rely on a combination of create EventId and object version.
* In case of create/drop events on database/table/partitions we use the
* <code>createEventId</code> field of the corresponding object in the catalogd
* to determine if the event needs to be processed or ignored. E.g. if Impala creates
* a table, {@link CatalogOpExecutor} will create the table and assign the createEventId
* of the table by fetching the CREATE_TABLE event from HMS. Later on when the event
* is fetched by events processor, it uses the createEventId of the Catalogd's table to
* ignore the event. Similar approach is used for databases and partition create events.
*
* In case of Drop events for database/table/partition events processor looks at the
* {@link DeleteEventLog} in the CatalogOpExecutor to determine if the table has been
* dropped already from catalogd.
*
* In case of ALTER/INSERT events events processor relies on the object version in the
* properties of the table to determine if this is a self-event or not.
*
* Following table shows the actions to be taken when the given event type is received.
*
* <pre>
* +------------------------------------------------+ --------------------+
* | Catalog object state |
* +--------------------------------------+-----------------------+---------------------+
* | Event type | Loaded | Incomplete | Not present |
* | | | | |
* +--------------------------------------------------------------+---------------------+
* | | | | |
* | CREATE EVENT| Ignore | Ignore | addIfNotRemovedLater|
* | | | | |
* | | | | |
* | ALTER EVENT | Refresh | Ignore | Ignore |
* | | | | |
* | | | | |
* | DROP EVENT | removeIfNotAddedLater | removeIfNotAddedLater | Ignore |
* | | | | |
* | | | | |
* | INSERT EVENT| Refresh | Ignore | Ignore |
* | | | | |
* +-------------+------------------------+-----------------------+---------------------+
* </pre>
*
* Currently event handlers rely on createEventId on Database, Table and Partition to
* uniquely determine if the object from event is same as object in the catalog. This
* information is used to make sure that we are deleting the right incarnation of the
* object when compared to Metastore.
*
* Self-events:
* Events could be generated by this Catalog's operations as well. Invalidating table
* for such events is unnecessary and inefficient. In order to detect such self-events
* when catalog executes a DDL operation it appends the current catalog version to the
* list of version numbers for the in-flight events for the table. Events processor
* clears this version when the corresponding version number identified by serviceId is
* received in the event. This is needed since it is possible that a external
* non-Impala system which generates the event presents the same serviceId and version
* number later on. The algorithm to detect such self-event is as below.
*
* 1. Add the service id and expected catalog version to database/table/partition
* parameters when executing the DDL operation. When the HMS operation is successful, add
* the version number to the list of version for in-flight events at
* table/database/partition level.
* 2. When the event is received, the first time you see the combination of serviceId
* and version number, event processor clears the version number from table's list and
* determines the event as self-generated (and hence ignored).
* 3. If the event data presents a unknown serviceId or if the version number is not
* present in the list of in-flight versions, event is not a self-event and needs to be
* processed.
*
* In order to limit the total memory footprint, only 100 version numbers are stored at
* the catalog object. Since the event processor is expected to poll every few seconds
* this should be a reasonable bound which satisfies most use-cases. Otherwise, event
* processor may wrongly process a self-event to invalidate the table. In such a case,
* its a performance penalty not a correctness issue.
*
* All the operations which change the state of catalog cache while processing a certain
* event type must be atomic in nature. We rely on taking a DDL lock in CatalogOpExecutor
* in case of create/drop events and object (Db or table) level writeLock in case of alter
* events to make sure that readers are blocked while the metadata
* update operation is being performed. Since the events are generated post-metastore
* operations, such catalog updates do not need to update the state in Hive Metastore.
*
* Error Handling: The event processor could be in ACTIVE, PAUSED, ERROR states. In case
* of any errors while processing the events the state of event processor changes to ERROR
* and no subsequent events are polled. In such a case a invalidate metadata command
* restarts the event polling which updates the lastSyncedEventId to the latest from
* metastore.
*
* TODO:
* 1. a global invalidate metadata command to get the events processor out of error state
* is too heavy weight. We should make it easier to recover from the error state.
* 2. The createEventId logic can be extended to track the last eventId which the table
* has synced to and we can then get rid of self-event logic for alter events too.
*/
public class MetastoreEventsProcessor implements ExternalEventsProcessor {
public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
"hive.metastore.notifications.add.thrift.objects";
private static final Logger LOG =
LoggerFactory.getLogger(MetastoreEventsProcessor.class);
private static final MessageDeserializer MESSAGE_DESERIALIZER =
MetastoreShim.getMessageDeserializer();
private static MetastoreEventsProcessor instance;
// maximum number of events to poll in each RPC
private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
// maximum time to wait for a clean shutdown of scheduler in seconds
private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10;
// time taken to fetch events during each poll
public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration";
// time taken to apply all the events received duration each poll
public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration";
// rate of events received per unit time
public static final String EVENTS_RECEIVED_METRIC = "events-received";
// total number of events which are skipped because of the flag setting or
// in case of [CREATE|DROP] events on [DATABASE|TABLE|PARTITION] which were ignored
// because the [DATABASE|TABLE|PARTITION] was already [PRESENT|ABSENT] in the catalogd.
public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
// name of the event processor status metric
public static final String STATUS_METRIC = "status";
// last synced event id
public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
// last synced event time
public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time";
// latest event id in Hive metastore
public static final String LATEST_EVENT_ID = "latest-event-id";
// event time of the latest event in Hive metastore
public static final String LATEST_EVENT_TIME = "latest-event-time";
// delay(secs) in events processing
public static final String EVENT_PROCESSING_DELAY = "event-processing-delay";
// metric name for number of tables which are refreshed by event processor so far
public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
// number of times events processor refreshed a partition
public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed";
// number of tables which were added to the catalogd based on events.
public static final String NUMBER_OF_TABLES_ADDED = "tables-added";
// number of tables which were removed to the catalogd based on events.
public static final String NUMBER_OF_TABLES_REMOVED = "tables-removed";
// number of databases which were added to the catalogd based on events.
public static final String NUMBER_OF_DATABASES_ADDED = "databases-added";
// number of database which were removed to the catalogd based on events.
public static final String NUMBER_OF_DATABASES_REMOVED = "databases-removed";
// number of partitions which were added to the catalogd based on events.
public static final String NUMBER_OF_PARTITIONS_ADDED = "partitions-added";
// number of partitions which were removed to the catalogd based on events.
public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed";
// number of entries in the delete event log
public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
// number of batch events generated
public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
// metric to measure the delay in msec, between the event created in metastore and time
// it took to be consumed by the event processor
public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming" +
"-delay";
private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
// List of event types to skip while fetching notification events from metastore
private static final List<String> EVENT_SKIP_LIST = Arrays.asList("OPEN_TXN");
/**
* Wrapper around {@link
* MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog,
* long, NotificationFilter, int)} which passes the default batch size.
*/
public static List<NotificationEvent> getNextMetastoreEventsInBatches(
CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
throws MetastoreNotificationFetchException {
return getNextMetastoreEventsInBatches(catalog, eventId, filter,
EVENTS_BATCH_SIZE_PER_RPC);
}
/**
* Gets the next list of {@link NotificationEvent} from Hive Metastore which are
* greater than the given eventId and filtered according to the provided filter.
* @param catalog The CatalogServiceCatalog used to get the metastore client
* @param eventId The eventId after which the events are needed.
* @param filter The {@link NotificationFilter} used to filter the list of fetched
* events. Note that this is a client side filter not a server side
* filter. Unfortunately, HMS doesn't provide a similar mechanism to
* do server side filtering.
* @param eventsBatchSize the batch size for fetching the events from metastore.
* @return List of {@link NotificationEvent} which are all greater than eventId and
* satisfy the given filter.
* @throws MetastoreNotificationFetchException in case of RPC errors to metastore.
*/
@VisibleForTesting
public static List<NotificationEvent> getNextMetastoreEventsInBatches(
CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
int eventsBatchSize) throws MetastoreNotificationFetchException {
Preconditions.checkArgument(eventsBatchSize > 0);
List<NotificationEvent> result = new ArrayList<>();
try (MetaStoreClient msc = catalog.getMetaStoreClient()) {
long toEventId = msc.getHiveClient().getCurrentNotificationEventId()
.getEventId();
if (toEventId <= eventId) return result;
long currentEventId = eventId;
while (currentEventId < toEventId) {
int batchSize = Math
.min(eventsBatchSize, (int)(toEventId - currentEventId));
// we don't call the HiveMetaStoreClient's getNextNotification()
// call here because it can throw a IllegalStateException if the eventId
// which we pass in is very old and metastore has already cleaned up
// the events since that eventId.
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setMaxEvents(batchSize);
eventRequest.setLastEvent(currentEventId);
NotificationEventResponse notificationEventResponse =
MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, true);
if (notificationEventResponse.getEvents().isEmpty()) {
// Possible to receive empty list due to event skip list in request
return result;
}
for (NotificationEvent event : notificationEventResponse.getEvents()) {
// if no filter is provided we add all the events
if (filter == null || filter.accept(event)) result.add(event);
currentEventId = event.getEventId();
}
}
return result;
} catch (MetastoreClientInstantiationException | TException e) {
throw new MetastoreNotificationFetchException(String.format(
CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
}
}
/**
* Sync table to latest event id starting from last synced
* event id.
* @param catalog
* @param tbl: Catalog table to be synced
* @param eventFactory
* @throws CatalogException
* @throws MetastoreNotificationException
*/
public static void syncToLatestEventId(CatalogServiceCatalog catalog,
org.apache.impala.catalog.Table tbl, EventFactory eventFactory, Metrics metrics)
throws CatalogException, MetastoreNotificationException {
Preconditions.checkArgument(tbl != null, "tbl is null");
Preconditions.checkState(!(tbl instanceof IncompleteTable) &&
tbl.isLoaded(), "table %s is either incomplete or not loaded",
tbl.getFullName());
Preconditions.checkState(tbl.isWriteLockedByCurrentThread(),
String.format("Write lock is not held on table %s by current thread",
tbl.getFullName()));
long lastEventId = tbl.getLastSyncedEventId();
Preconditions.checkArgument(lastEventId > 0, "lastEvent " +
" Id %s for table %s should be greater than 0", lastEventId, tbl.getFullName());
String annotation = String.format("sync table %s to latest HMS event id",
tbl.getFullName());
try(ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
List<NotificationEvent> events = getNextMetastoreEventsInBatches(catalog,
lastEventId, getTableNotificationEventFilter(tbl));
if (events.isEmpty()) {
LOG.debug("table {} synced till event id {}. No new HMS events to process from "
+ "event id: {}", tbl.getFullName(), lastEventId, lastEventId + 1);
return;
}
MetastoreEvents.MetastoreEvent currentEvent = null;
for (NotificationEvent event : events) {
currentEvent = eventFactory.get(event, metrics);
LOG.trace("for table {}, processing event {}", tbl.getFullName(), currentEvent);
currentEvent.processIfEnabled();
if (currentEvent.isDropEvent()) {
// currentEvent can only be DropPartition or DropTable
Preconditions.checkNotNull(currentEvent.getDbName());
Preconditions.checkNotNull(currentEvent.getTableName());
String key = DeleteEventLog.getTblKey(currentEvent.getDbName(),
currentEvent.getTableName());
catalog.getMetastoreEventProcessor().getDeleteEventLog()
.addRemovedObject(currentEvent.getEventId(), key);
}
if (currentEvent instanceof MetastoreEvents.DropTableEvent) {
// return after processing table drop event
return;
}
}
// setting HMS Event ID after all the events
// are successfully processed only if table was
// not dropped
// Certain events like alter_table, do an incremental reload which sets the event
// id to the current hms event id at that time. Therefore, check table's last
// synced event id again before setting currentEvent id
if (currentEvent.getEventId() > tbl.getLastSyncedEventId()) {
tbl.setLastSyncedEventId(currentEvent.getEventId());
}
LOG.info("Synced table {} till HMS event: {}", tbl.getFullName(),
tbl.getLastSyncedEventId());
}
}
/**
* Sync database to latest event id starting from the last synced
* event id
* @param catalog
* @param db
* @param eventFactory
* @throws CatalogException
* @throws MetastoreNotificationException
*/
public static void syncToLatestEventId(CatalogServiceCatalog catalog,
org.apache.impala.catalog.Db db, EventFactory eventFactory, Metrics metrics)
throws CatalogException, MetastoreNotificationException {
Preconditions.checkArgument(db != null, "db is null");
long lastEventId = db.getLastSyncedEventId();
Preconditions.checkArgument(lastEventId > 0, "Invalid " +
"last synced event ID %s for db %s ", lastEventId, db.getName());
Preconditions.checkState(db.isLockHeldByCurrentThread(),
"Current thread does not hold lock on db: %s", db.getName());
String annotation = String.format("sync db %s to latest HMS event id", db.getName());
try(ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
List<NotificationEvent> events = getNextMetastoreEventsInBatches(catalog,
lastEventId, getDbNotificationEventFilter(db));
if (events.isEmpty()) {
LOG.debug("db {} already synced till event id: {}, no new hms events from "
+ "event id: {}", db.getName(), lastEventId, lastEventId+1);
return;
}
MetastoreEvents.MetastoreEvent currentEvent = null;
for (NotificationEvent event : events) {
currentEvent = eventFactory.get(event, metrics);
LOG.trace("for db {}, processing event: {}", db.getName(), currentEvent);
currentEvent.processIfEnabled();
if (currentEvent.isDropEvent()) {
Preconditions.checkState(currentEvent instanceof DropDatabaseEvent,
"invalid drop event {} ", currentEvent);
Preconditions.checkNotNull(currentEvent.getDbName());
String key = DeleteEventLog.getDbKey(currentEvent.getDbName());
catalog.getMetastoreEventProcessor().getDeleteEventLog()
.addRemovedObject(currentEvent.getEventId(), key);
// return after processing drop db event
return;
}
}
// setting HMS Event Id after all the events
// are successfully processed only if db was not dropped
db.setLastSyncedEventId(currentEvent.getEventId());
LOG.info("Synced db {} till HMS event {}", db.getName(),
currentEvent);
}
}
/*
This filter is used when syncing events for a table to the latest HMS event id.
It filters all events except db related ones.
*/
private static NotificationFilter getTableNotificationEventFilter(Table tbl) {
NotificationFilter filter = new NotificationFilter() {
@Override
public boolean accept(NotificationEvent event) {
if (event.getDbName() != null && event.getTableName() != null) {
return tbl.getDb().getName().equalsIgnoreCase(event.getDbName()) &&
tbl.getName().equalsIgnoreCase(event.getTableName());
}
// filter all except db events
return event.getDbName() == null;
}
};
return filter;
}
/*
This filter is used when syncing db to the latest HMS event id. The
filter accepts all events except table related ones
*/
private static NotificationFilter getDbNotificationEventFilter(Db db) {
NotificationFilter filter = new NotificationFilter() {
@Override
public boolean accept(NotificationEvent event) {
if (event.getDbName() != null && event.getTableName() == null) {
return db.getName().equalsIgnoreCase(event.getDbName());
}
// filter all events except table events
return event.getTableName() == null;
}
};
return filter;
}
// possible status of event processor
public enum EventProcessorStatus {
PAUSED, // event processor is paused because catalog is being reset concurrently
ACTIVE, // event processor is scheduled at a given frequency
ERROR, // event processor is in error state and event processing has stopped
NEEDS_INVALIDATE, // event processor could not resolve certain events and needs a
// manual invalidate command to reset the state (See AlterEvent for a example)
STOPPED, // event processor is shutdown. No events will be processed
DISABLED // event processor is not configured to run
}
// current status of this event processor
private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;
// error message when event processor comes into ERROR/NEEDS_INVALIDATE states
private String eventProcessorErrorMsg_ = null;
// event factory which is used to get or create MetastoreEvents
private final MetastoreEventFactory metastoreEventFactory_;
// keeps track of the current event that we are processing
private NotificationEvent currentEvent_;
// keeps track of the last event id which we have synced to
private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0);
// The event id and eventTime of the latest event in HMS. Only used in metrics to show
// how far we are lagging behind.
private final AtomicLong latestEventId_ = new AtomicLong(0);
private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0);
// The duration in nanoseconds of the processing of the last event batch.
private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0);
// polling interval in seconds. Note this is a time we wait AFTER each fetch call
private final long pollingFrequencyInSec_;
// catalog service instance to be used while processing events
protected final CatalogServiceCatalog catalog_;
// scheduler daemon thread executor for processing events at given frequency
private final ScheduledExecutorService processEventsScheduler_ =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("MetastoreEventsProcessor-ProcessEvents")
.build());
// scheduler daemon thread executor to update the latest event id at given frequency
private final ScheduledExecutorService updateEventIdScheduler_ =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("MetastoreEventsProcessor-UpdateEventId")
.build());
// metrics registry to keep track of metrics related to event processing
//TODO create a separate singleton class which wraps around this so that we don't
// have to pass it around as a argument in constructor in MetastoreEvents
private final Metrics metrics_ = new Metrics();
// When events processing is ACTIVE this delete event log is used to keep track of
// DROP events for databases, tables and partitions so that the MetastoreEventsProcessor
// can ignore the drop events when they are received later.
private final DeleteEventLog deleteEventLog_ = new DeleteEventLog();
@VisibleForTesting
MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long startSyncFromId,
long pollingFrequencyInSec) throws CatalogException {
Preconditions.checkState(pollingFrequencyInSec > 0);
this.catalog_ = Preconditions.checkNotNull(catalogOpExecutor.getCatalog());
validateConfigs();
lastSyncedEventId_.set(startSyncFromId);
lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(startSyncFromId));
initMetrics();
metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor);
pollingFrequencyInSec_ = pollingFrequencyInSec;
}
/**
* Fetches the required metastore config values and validates them against the
* expected values. The configurations to validate are different for HMS-2 v/s HMS-3
* and hence it uses MetastoreShim to get the configurations which need to be validated.
* @throws CatalogException if one or more validations fail or if metastore is not
* accessible
*/
@VisibleForTesting
void validateConfigs() throws CatalogException {
List<ValidationResult> validationErrors = new ArrayList<>();
for (MetastoreEventProcessorConfig config : getEventProcessorConfigsToValidate()) {
String configKey = config.getValidator().getConfigKey();
try {
String value = getConfigValueFromMetastore(configKey, "");
ValidationResult result = config.validate(value);
if (!result.isValid()) validationErrors.add(result);
} catch (TException e) {
String msg = String.format("Unable to get configuration %s from metastore. Check "
+ "if metastore is accessible", configKey);
LOG.error(msg, e);
throw new CatalogException(msg);
}
}
if (!validationErrors.isEmpty()) {
LOG.error("Found {} incorrect metastore configuration(s).",
validationErrors.size());
for (ValidationResult invalidConfig: validationErrors) {
LOG.error(invalidConfig.getReason());
}
throw new CatalogException(String.format("Found %d incorrect metastore "
+ "configuration(s). Events processor cannot start. See ERROR log for more "
+ "details.", validationErrors.size()));
}
}
public DeleteEventLog getDeleteEventLog() { return deleteEventLog_; }
/**
* Returns the list of Metastore configurations to validate depending on the hive
* version
*/
public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML,
MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME);
}
private void initMetrics() {
metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC);
metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
metrics_.addMeter(EVENTS_RECEIVED_METRIC);
metrics_.addCounter(EVENTS_SKIPPED_METRIC);
metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () -> getStatus().toString());
metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>) lastSyncedEventId_::get);
metrics_.addGauge(LAST_SYNCED_EVENT_TIME,
(Gauge<Long>) lastSyncedEventTimeSecs_::get);
metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get);
metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) latestEventTimeSecs_::get);
metrics_.addGauge(EVENT_PROCESSING_DELAY,
(Gauge<Long>) () -> latestEventTimeSecs_.get() - lastSyncedEventTimeSecs_.get());
metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
metrics_.addCounter(NUMBER_OF_TABLES_REMOVED);
metrics_.addCounter(NUMBER_OF_DATABASES_ADDED);
metrics_.addCounter(NUMBER_OF_DATABASES_REMOVED);
metrics_.addCounter(NUMBER_OF_PARTITIONS_ADDED);
metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED);
metrics_
.addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size);
metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS);
}
/**
* Schedules the daemon thread at a given frequency. It is important to note that this
* method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at
* a fixedDelay is to make sure that we don't pile up the pending tasks in case each
* polling operation is taking longer than the given frequency. Because of the fixed
* delay, the new poll operation is scheduled at the time when previousPoll operation
* completes + givenDelayInSec
*/
@Override
public synchronized void start() {
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
startScheduler();
updateStatus(EventProcessorStatus.ACTIVE);
LOG.info(String.format("Successfully started metastore event processing."
+ " Polling interval: %d seconds.", pollingFrequencyInSec_));
}
/**
* Gets the current event processor status
*/
public EventProcessorStatus getStatus() {
return eventProcessorStatus_;
}
/**
* Returns the value for a given config key from Hive Metastore.
* @param config Hive configuration name
* @param defaultVal Default value to return if config not present in Hive
*/
@VisibleForTesting
public String getConfigValueFromMetastore(String config, String defaultVal)
throws TException {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient();
return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
}
}
/**
* returns the current value of LastSyncedEventId. This method is not thread-safe and
* only to be used for testing purposes
*/
@VisibleForTesting
public long getLastSyncedEventId() {
return lastSyncedEventId_.get();
}
/**
* Returns the current value of latestEventId_. This method is not thread-safe and
* only to be used for testing purposes
*/
@VisibleForTesting
public long getLatestEventId() {
return latestEventId_.get();
}
@VisibleForTesting
void startScheduler() {
Preconditions.checkState(pollingFrequencyInSec_ > 0);
LOG.info(String.format("Starting metastore event polling with interval %d seconds.",
pollingFrequencyInSec_));
processEventsScheduler_.scheduleAtFixedRate(this ::processEvents,
pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
// Update latestEventId in another thread in case that the processEvents() thread is
// blocked by slow metadata reloading or waiting for table locks.
updateEventIdScheduler_.scheduleAtFixedRate(this ::updateLatestEventId,
pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
}
/**
* Stops the event processing and changes the status of event processor to
* <code>EventProcessorStatus.PAUSED</code>. No new events will be processed as long
* the status is stopped. If this event processor is actively processing events when
* stop is called, this method blocks until the current processing is complete
*/
@Override
public synchronized void pause() {
// when concurrent invalidate metadata are running, it is possible the we receive
// a pause method call on a already paused events processor.
if (eventProcessorStatus_ == EventProcessorStatus.PAUSED) {
return;
}
updateStatus(EventProcessorStatus.PAUSED);
LOG.info(String.format("Event processing is paused. Last synced event id is %d",
lastSyncedEventId_.get()));
}
/**
* Get the current notification event id from metastore
*/
@Override
public long getCurrentEventId() throws MetastoreNotificationFetchException {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
} catch (MetastoreClientInstantiationException | TException e) {
throw new MetastoreNotificationFetchException("Unable to fetch the current " +
"notification event id. Check if metastore service is accessible", e);
}
}
public static long getCurrentEventIdNoThrow(IMetaStoreClient client) {
long latestEventId = -1L;
try {
latestEventId = client.getCurrentNotificationEventId().getEventId();
} catch (TException exception) {
LOG.warn(String.format("Unable to fetch latest event id from HMS: %s",
exception.getMessage()));
}
return latestEventId;
}
/**
* Fetch the event from HMS specified by 'eventId'
* @return null if the event has been cleaned up or any error occurs.
*/
private NotificationEvent getEventFromHMS(MetaStoreClient msClient, long eventId) {
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setLastEvent(eventId - 1);
eventRequest.setMaxEvents(1);
try {
NotificationEventResponse response = MetastoreShim.getNextNotification(
msClient.getHiveClient(), eventRequest, false);
Iterator<NotificationEvent> eventIter = response.getEventsIterator();
if (!eventIter.hasNext()) {
LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId);
return null;
}
return eventIter.next();
} catch (TException e) {
LOG.warn("Unable to fetch event {}", eventId, e);
}
return null;
}
/**
* Get the event time by fetching the specified event from HMS.
* @return 0 if the event has been cleaned up or any error occurs.
*/
@VisibleForTesting
public int getEventTimeFromHMS(long eventId) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
NotificationEvent event = getEventFromHMS(msClient, eventId);
if (event != null) return event.getEventTime();
} catch (MetastoreClientInstantiationException e) {
LOG.error("Failed to get event time from HMS for event {}", eventId, e);
}
return 0;
}
/**
* Starts the event processor from a given event id
*/
@Override
public synchronized void start(long fromEventId) {
Preconditions.checkArgument(fromEventId >= 0);
EventProcessorStatus currentStatus = eventProcessorStatus_;
long prevLastSyncedEventId = lastSyncedEventId_.get();
if (currentStatus == EventProcessorStatus.ACTIVE) {
// if events processor is already active, we should make sure that the
// start event id provided is not behind the lastSyncedEventId. This could happen
// when there are concurrent invalidate metadata calls. if we detect such a case
// we should return here.
if (prevLastSyncedEventId >= fromEventId) {
return;
}
}
lastSyncedEventId_.set(fromEventId);
lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId));
updateStatus(EventProcessorStatus.ACTIVE);
LOG.info(String.format(
"Metastore event processing restarted. Last synced event id was updated "
+ "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_.get()));
}
/**
* Stops the event processing and shuts down the scheduler. In case there is a batch of
* events which is being processed currently, the
* <code>processEvents</code> method releases lock after every event is processed.
* Hence, it is possible that at-least 1 event from the batch be
* processed while shutdown() waits to acquire lock on this object.
*/
@Override
public synchronized void shutdown() {
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED,
"Event processing is already stopped");
shutdownAndAwaitTermination(processEventsScheduler_);
shutdownAndAwaitTermination(updateEventIdScheduler_);
updateStatus(EventProcessorStatus.STOPPED);
LOG.info("Metastore event processing stopped.");
}
/**
* Attempts to cleanly shutdown the scheduler pool. If the pool does not shutdown
* within timeout, does a force shutdown which might interrupt currently running tasks.
*/
private synchronized void shutdownAndAwaitTermination(ScheduledExecutorService ses) {
Preconditions.checkNotNull(ses);
ses.shutdown(); // disable new tasks from being submitted
try {
// wait for 10 secs for scheduler to complete currently running tasks
if (!ses.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
// executor couldn't terminate and timed-out, force the termination
LOG.info(String.format("Scheduler pool did not terminate within %d seconds. "
+ "Attempting to stop currently running tasks", SCHEDULER_SHUTDOWN_TIMEOUT));
ses.shutdownNow();
}
} catch (InterruptedException e) {
// current thread interrupted while pool was waiting for termination
// issue a shutdownNow before returning to cancel currently running tasks
LOG.info("Received interruptedException. Terminating currently running tasks.", e);
ses.shutdownNow();
}
}
/**
* Gets metastore notification events from the given eventId. The returned list of
* NotificationEvents are filtered using the NotificationFilter provided if it is not
* null.
* @param eventId The returned events are all after this given event id.
* @param currentEventId Current event id on metastore
* @param getAllEvents If this is true all the events since eventId are returned.
* Note that Hive MetaStore can limit the response to a specific
* maximum number of limit based on the value of configuration
* {@code hive.metastore.max.event.response}.
* If it is false, only EVENTS_BATCH_SIZE_PER_RPC events are
* returned, caller is expected to issue more calls to this method
* to fetch the remaining events.
* @param filter This is a nullable argument. If not null, the events are filtered
* and then returned using this. Otherwise, all the events are returned.
* @return List of NotificationEvents from metastore since eventId.
* @throws MetastoreNotificationFetchException In case of exceptions from HMS.
*/
public List<NotificationEvent> getNextMetastoreEvents(final long eventId,
final long currentEventId, final boolean getAllEvents,
@Nullable final NotificationFilter filter)
throws MetastoreNotificationFetchException {
// no new events since we last polled
if (currentEventId <= eventId) {
return Collections.emptyList();
}
final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
int batchSize = getAllEvents ? -1 : EVENTS_BATCH_SIZE_PER_RPC;
// we use the thrift API directly instead of
// HiveMetastoreClient#getNextNotification because the HMS client can throw an
// exception when there is a gap between the eventIds returned.
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setLastEvent(eventId);
eventRequest.setMaxEvents(batchSize);
NotificationEventResponse response =
MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, true);
LOG.info(String.format("Received %d events. Start event id : %d",
response.getEvents().size(), eventId));
if (filter == null) return response.getEvents();
List<NotificationEvent> filteredEvents = new ArrayList<>();
for (NotificationEvent event : response.getEvents()) {
if (filter.accept(event)) filteredEvents.add(event);
}
return filteredEvents;
} catch (MetastoreClientInstantiationException | TException e) {
throw new MetastoreNotificationFetchException(
"Unable to fetch notifications from metastore. Last synced event id is "
+ eventId, e);
} finally {
context.stop();
}
}
/**
* Fetch the next batch of NotificationEvents from metastore. The default batch size is
* <code>EVENTS_BATCH_SIZE_PER_RPC</code>
*/
@VisibleForTesting
protected List<NotificationEvent> getNextMetastoreEvents()
throws MetastoreNotificationFetchException {
return getNextMetastoreEvents(getCurrentEventId());
}
/**
* Fetch the next batch of NotificationEvents from metastore. The default batch size is
* <code>EVENTS_BATCH_SIZE_PER_RPC</code>
* @param currentEventId Current event id on metastore
* @return List of NotificationEvents from metastore since lastSyncedEventId
* @throws MetastoreNotificationFetchException
*/
@VisibleForTesting
public List<NotificationEvent> getNextMetastoreEvents(long currentEventId)
throws MetastoreNotificationFetchException {
return getNextMetastoreEvents(lastSyncedEventId_.get(), currentEventId, false, null);
}
/**
* This method issues a request to Hive Metastore if needed, based on the current event
* id in metastore and the last synced event_id. Events are fetched in fixed sized
* batches. Each NotificationEvent received is processed by its corresponding
* <code>MetastoreEvent</code>
*/
@Override
public void processEvents() {
currentEvent_ = null;
try {
EventProcessorStatus currentStatus = eventProcessorStatus_;
if (currentStatus != EventProcessorStatus.ACTIVE) {
LOG.warn(String.format(
"Event processing is skipped since status is %s. Last synced event id is %d",
currentStatus, lastSyncedEventId_.get()));
return;
}
// fetch the current notification event id. We assume that the polling interval
// is small enough that most of these polling operations result in zero new
// events. In such a case, fetching current notification event id is much faster
// (and cheaper on HMS side) instead of polling for events directly
long currentEventId = getCurrentEventId();
List<NotificationEvent> events = getNextMetastoreEvents(currentEventId);
processEvents(currentEventId, events);
} catch (MetastoreNotificationFetchException ex) {
// No need to change the EventProcessor state to error since we want the
// EventProcessor to continue getting new events after HMS is back up.
LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore " +
"may be unavailable. Will retry.", ex);
} catch (MetastoreNotificationNeedsInvalidateException ex) {
updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
String msg = "Event processing needs a invalidate command to resolve the state";
LOG.error(msg, ex);
eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' +
ExceptionUtils.getFullStackTrace(ex);
} catch (Exception ex) {
// There are lot of Preconditions which can throw RuntimeExceptions when we
// process events this catch all exception block is needed so that the scheduler
// thread does not die silently
updateStatus(EventProcessorStatus.ERROR);
String msg = "Unexpected exception received while processing event";
LOG.error(msg, ex);
eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' +
ExceptionUtils.getFullStackTrace(ex);
dumpEventInfoToLog(currentEvent_);
}
}
/**
* Update the latest event id regularly so we know how far we are lagging behind.
*/
@VisibleForTesting
public void updateLatestEventId() {
EventProcessorStatus currentStatus = eventProcessorStatus_;
if (currentStatus != EventProcessorStatus.ACTIVE) {
return;
}
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
CurrentNotificationEventId currentNotificationEventId =
msClient.getHiveClient().getCurrentNotificationEventId();
long currentEventId = currentNotificationEventId.getEventId();
// no new events since we last polled
if (currentEventId <= latestEventId_.get()) {
return;
}
// Fetch the last event to get its eventTime.
NotificationEvent event = getEventFromHMS(msClient, currentEventId);
// Events could be empty if they are just cleaned up.
if (event == null) return;
long lastSyncedEventId = lastSyncedEventId_.get();
long lastSyncedEventTime = lastSyncedEventTimeSecs_.get();
long currentEventTime = event.getEventTime();
latestEventId_.set(currentEventId);
latestEventTimeSecs_.set(currentEventTime);
LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={}, time={}.",
currentEventId, currentEventTime, lastSyncedEventId, lastSyncedEventTime);
if (currentEventTime > lastSyncedEventTime) {
LOG.warn("Lag: {}. {} events pending to be processed.",
PrintUtils.printTimeMs((currentEventTime - lastSyncedEventTime) * 1000),
currentEventId - lastSyncedEventId);
}
} catch (Exception e) {
LOG.error("Unable to update current notification event id. Last value: {}",
latestEventId_, e);
}
}
/**
* Gets the current event processor metrics along with its status. If the status is
* not active the metrics are skipped. Only the status is sent
*/
@Override
public TEventProcessorMetrics getEventProcessorMetrics() {
TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics();
EventProcessorStatus currentStatus = getStatus();
eventProcessorMetrics.setStatus(currentStatus.toString());
eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics;
// The following counters are only updated when event-processor is active.
eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeSecs_.get());
eventProcessorMetrics.setLatest_event_id(latestEventId_.get());
eventProcessorMetrics.setLatest_event_time(latestEventTimeSecs_.get());
long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
eventProcessorMetrics.setEvents_received(eventsReceived);
eventProcessorMetrics.setEvents_skipped(eventsSkipped);
Snapshot fetchDuration =
metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getSnapshot();
double avgFetchDuration = fetchDuration.getMean() / SECOND_IN_NANOS;
double p75FetchDuration = fetchDuration.get75thPercentile() / SECOND_IN_NANOS;
double p95FetchDuration = fetchDuration.get95thPercentile() / SECOND_IN_NANOS;
double p99FetchDuration = fetchDuration.get99thPercentile() / SECOND_IN_NANOS;
eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
eventProcessorMetrics.setEvents_fetch_duration_p75(p75FetchDuration);
eventProcessorMetrics.setEvents_fetch_duration_p95(p95FetchDuration);
eventProcessorMetrics.setEvents_fetch_duration_p99(p99FetchDuration);
Snapshot processDuration =
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getSnapshot();
double avgProcessDuration = processDuration.getMean() / SECOND_IN_NANOS;
double p75ProcessDuration = processDuration.get75thPercentile() / SECOND_IN_NANOS;
double p95ProcessDuration = processDuration.get95thPercentile() / SECOND_IN_NANOS;
double p99ProcessDuration = processDuration.get99thPercentile() / SECOND_IN_NANOS;
eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
eventProcessorMetrics.setEvents_process_duration_p75(p75ProcessDuration);
eventProcessorMetrics.setEvents_process_duration_p95(p95ProcessDuration);
eventProcessorMetrics.setEvents_process_duration_p99(p99ProcessDuration);
double lastProcessDuration = lastEventProcessDurationNs_.get() /
(double) SECOND_IN_NANOS;
eventProcessorMetrics.setLast_events_process_duration(lastProcessDuration);
double avgNumberOfEventsReceived1Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
double avgNumberOfEventsReceived5Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
double avgNumberOfEventsReceived15Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();
eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min);
eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min);
eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min);
LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process "
+ "duration: {} Events received rate (1min) : {}",
eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration,
avgNumberOfEventsReceived1Min);
return eventProcessorMetrics;
}
@Override
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
TEventProcessorMetricsSummaryResponse summaryResponse =
new TEventProcessorMetricsSummaryResponse();
summaryResponse.setSummary(metrics_.toString());
if (eventProcessorErrorMsg_ != null) {
summaryResponse.setError_msg(eventProcessorErrorMsg_);
}
return summaryResponse;
}
@VisibleForTesting
Metrics getMetrics() {
return metrics_;
}
/**
* Process the given list of notification events. Useful for tests which provide a list
* of events
* @param currentEventId Current event id on metastore
* @param events List of NotificationEvents
* @throws MetastoreNotificationException
*/
@VisibleForTesting
protected void processEvents(long currentEventId, List<NotificationEvent> events)
throws MetastoreNotificationException {
currentEvent_ = null;
// update the events received metric before returning
metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
if (events.isEmpty()) {
if (lastSyncedEventId_.get() < currentEventId) {
// Possible to receive empty list due to event skip list in notification event
// request. Update the last synced event id with current event id on metastore
lastSyncedEventId_.set(currentEventId);
lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(currentEventId));
}
return;
}
final Timer.Context context =
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
try {
List<MetastoreEvent> filteredEvents =
metastoreEventFactory_.getFilteredEvents(events, metrics_);
if (filteredEvents.isEmpty()) {
NotificationEvent e = events.get(events.size() - 1);
lastSyncedEventId_.set(e.getEventId());
lastSyncedEventTimeSecs_.set(e.getEventTime());
return;
}
for (MetastoreEvent event : filteredEvents) {
// synchronizing each event processing reduces the scope of the lock so the a
// potential reset() during event processing is not blocked for longer than
// necessary
synchronized (this) {
if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
break;
}
currentEvent_ = event.metastoreNotificationEvent_;
String targetName = event.getTargetName();
String desc = String.format("Processing %s on %s, eventId=%d",
event.getEventType(), targetName, event.getEventId());
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
long startMs = System.currentTimeMillis();
event.processIfEnabled();
long elapsedTimeMs = System.currentTimeMillis() - startMs;
eventProcessingTime.put(event, elapsedTimeMs);
}
deleteEventLog_.garbageCollect(event.getEventId());
lastSyncedEventId_.set(event.getEventId());
lastSyncedEventTimeSecs_.set(event.getEventTime());
metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update(
(System.currentTimeMillis() / 1000) - event.getEventTime(),
TimeUnit.SECONDS);
}
}
} catch (CatalogException e) {
throw new MetastoreNotificationException(String.format(
"Unable to process event %d of type %s. Event processing will be stopped.",
currentEvent_.getEventId(), currentEvent_.getEventType()), e);
} finally {
long elapsedNs = context.stop();
lastEventProcessDurationNs_.set(elapsedNs);
logEventMetrics(eventProcessingTime, elapsedNs);
}
}
private void logEventMetrics(Map<MetastoreEvent, Long> eventProcessingTime,
long elapsedNs) {
LOG.info("Time elapsed in processing event batch: {}",
PrintUtils.printTimeNs(elapsedNs));
// Only log the metrics when the processing on this batch is slow.
if (elapsedNs < HdfsTable.LOADING_WARNING_TIME_NS) return;
// Get the top-10 expensive events
List<Map.Entry<MetastoreEvent, Long>> eventList =
new ArrayList<>(eventProcessingTime.entrySet());
eventList.sort(Map.Entry.<MetastoreEvent, Long>comparingByValue().reversed());
int num = Math.min(10, eventList.size());
StringBuilder report = new StringBuilder("Top " + num + " expensive events: ");
for (Map.Entry<MetastoreEvent, Long> entry : eventList.subList(0, num)) {
MetastoreEvent event = entry.getKey();
long durationMs = entry.getValue();
report.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d) ",
event.getEventType(), event.getEventId(), event.getTargetName(), durationMs));
}
// Get the top-10 expensive targets
Map<String, Long> durationPerTable = new HashMap<>();
for (MetastoreEvent event : eventProcessingTime.keySet()) {
String targetName = event.getTargetName();
long durationMs = durationPerTable.getOrDefault(targetName, 0L) +
eventProcessingTime.get(event);
durationPerTable.put(targetName, durationMs);
}
List<Map.Entry<String, Long>> targetList =
new ArrayList<>(durationPerTable.entrySet());
targetList.sort(Map.Entry.<String, Long>comparingByValue().reversed());
num = Math.min(10, targetList.size());
report.append("\nTop ").append(num).append(" targets in event processing: ");
for (Map.Entry<String, Long> entry : targetList.subList(0, num)) {
String targetName = entry.getKey();
long durationMs = entry.getValue();
report.append(String.format("(target=%s, duration_ms=%d) ",
targetName, durationMs));
}
LOG.warn(report.toString());
}
/**
* Updates the current states to the given status.
*/
private synchronized void updateStatus(EventProcessorStatus toStatus) {
eventProcessorStatus_ = toStatus;
}
private void dumpEventInfoToLog(NotificationEvent event) {
if (event == null) {
String error = "Notification event is null";
LOG.error(error);
eventProcessorErrorMsg_ += '\n' + error;
return;
}
StringBuilder msg =
new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n")
.append("Event Type: ").append(event.getEventType()).append("\n")
.append("Event time: ").append(event.getEventTime()).append("\n")
.append("Database name: ").append(event.getDbName()).append("\n");
if (event.getTableName() != null) {
msg.append("Table name: ").append(event.getTableName()).append("\n");
}
msg.append("Event message: ").append(event.getMessage()).append("\n");
String msgStr = msg.toString();
LOG.error(msgStr);
eventProcessorErrorMsg_ += '\n' + msgStr;
}
/**
* Create a instance of this object if it is not initialized. Currently, this object is
* a singleton and should only be created during catalogD initialization time, so that
* the start syncId matches with the catalogD startup time.
*
* @param catalogOpExecutor the CatalogOpExecutor instance to which this event
* processor belongs.
* @param startSyncFromId Start event id. Events will be polled starting from this
* event id
* @param eventPollingInterval HMS polling interval in seconds
* @return this object is already created, or create a new one if it is not yet
* instantiated
*/
public static synchronized ExternalEventsProcessor getInstance(
CatalogOpExecutor catalogOpExecutor, long startSyncFromId,
long eventPollingInterval) throws CatalogException {
if (instance != null) return instance;
instance =
new MetastoreEventsProcessor(catalogOpExecutor, startSyncFromId,
eventPollingInterval);
return instance;
}
@Override
public MetastoreEventFactory getEventsFactory() {
return metastoreEventFactory_;
}
public static MessageDeserializer getMessageDeserializer() {
return MESSAGE_DESERIALIZER;
}
public static List<String> getEventSkipList() { return EVENT_SKIP_LIST; }
}