| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog.events; |
| |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Snapshot; |
| import com.codahale.metrics.Timer; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import java.time.LocalDateTime; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.annotation.Nullable; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; |
| import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; |
| import org.apache.hadoop.hive.metastore.api.NotificationEvent; |
| import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; |
| import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; |
| import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; |
| import org.apache.impala.catalog.CatalogException; |
| import org.apache.impala.catalog.CatalogServiceCatalog; |
| import org.apache.impala.catalog.Db; |
| import org.apache.impala.catalog.HdfsTable; |
| import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; |
| import org.apache.impala.catalog.MetastoreClientInstantiationException; |
| import org.apache.impala.catalog.Table; |
| import org.apache.impala.catalog.IncompleteTable; |
| import org.apache.impala.catalog.events.ConfigValidator.ValidationResult; |
| import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent; |
| import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; |
| import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory; |
| import org.apache.impala.common.Metrics; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.compat.MetastoreShim; |
| import org.apache.impala.service.CatalogOpExecutor; |
| import org.apache.impala.thrift.TEventProcessorMetrics; |
| import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse; |
| import org.apache.impala.util.MetaStoreUtil; |
| import org.apache.impala.util.ThreadNameAnnotator; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A metastore event is a instance of the class |
| * <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be |
| * configured, to work with Listeners which are called on various DDL operations like |
| * create/alter/drop operations on database, table, partition etc. Each event has a unique |
| * incremental id and the generated events are be fetched from Metastore to get |
| * incremental updates to the metadata stored in Hive metastore using the the public API |
| * <code>get_next_notification</code> These events could be generated by external |
| * Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters |
| * configured to talk with the same metastore. |
| * |
| * This class is used to poll metastore for such events at a given frequency. By observing |
| * such events, we can take appropriate action on the catalogD |
| * (refresh/invalidate/add/remove) so that catalog represents the latest information |
| * available in metastore. We keep track of the last synced event id in each polling |
| * iteration so the next batch can be requested appropriately. The current batch size is |
| * constant and set to MAX_EVENTS_PER_RPC. |
| * |
| * <pre> |
| * +---------------+ +----------------+ +--------------+ |
| * |Catalog state | |Catalog | | | |
| * |stale | |State up-to-date| |Catalog state | |
| * |(apply event) | |(ignore) | |is newer than | |
| * | | | | |event | |
| * | | | | |(ignore) | |
| * +------+--------+ +-----+----------+ +-+------------+ |
| * | | | |
| * | | | |
| * | | | |
| * | | | |
| * | | | |
| * +-----------V------------------V---------------------V-----------> Event Timeline |
| * ^ |
| * | |
| * | |
| * | |
| * | |
| * E |
| * |
| * </pre> |
| * Consistency model: Events could be seen as DDLs operations from past either done from |
| * this same cluster or some other external system. For example in the events timeline |
| * given above, consider a Event E at any given time. The catalog state for the |
| * corresponding object of the event could either be stale, exactly-same or at a version |
| * which is higher than one provided by event. Catalog state should only be updated when |
| * it is stale with respect to the event. In order to determine if the catalog object is |
| * stale, we rely on a combination of create EventId and object version. |
| * In case of create/drop events on database/table/partitions we use the |
| * <code>createEventId</code> field of the corresponding object in the catalogd |
| * to determine if the event needs to be processed or ignored. E.g. if Impala creates |
| * a table, {@link CatalogOpExecutor} will create the table and assign the createEventId |
| * of the table by fetching the CREATE_TABLE event from HMS. Later on when the event |
| * is fetched by events processor, it uses the createEventId of the Catalogd's table to |
| * ignore the event. Similar approach is used for databases and partition create events. |
| * |
| * In case of Drop events for database/table/partition events processor looks at the |
| * {@link DeleteEventLog} in the CatalogOpExecutor to determine if the table has been |
| * dropped already from catalogd. |
| * |
| * In case of ALTER/INSERT events events processor relies on the object version in the |
| * properties of the table to determine if this is a self-event or not. |
| * |
| * Following table shows the actions to be taken when the given event type is received. |
| * |
| * <pre> |
| * +------------------------------------------------+ --------------------+ |
| * | Catalog object state | |
| * +--------------------------------------+-----------------------+---------------------+ |
| * | Event type | Loaded | Incomplete | Not present | |
| * | | | | | |
| * +--------------------------------------------------------------+---------------------+ |
| * | | | | | |
| * | CREATE EVENT| Ignore | Ignore | addIfNotRemovedLater| |
| * | | | | | |
| * | | | | | |
| * | ALTER EVENT | Refresh | Ignore | Ignore | |
| * | | | | | |
| * | | | | | |
| * | DROP EVENT | removeIfNotAddedLater | removeIfNotAddedLater | Ignore | |
| * | | | | | |
| * | | | | | |
| * | INSERT EVENT| Refresh | Ignore | Ignore | |
| * | | | | | |
| * +-------------+------------------------+-----------------------+---------------------+ |
| * </pre> |
| * |
| * Currently event handlers rely on createEventId on Database, Table and Partition to |
| * uniquely determine if the object from event is same as object in the catalog. This |
| * information is used to make sure that we are deleting the right incarnation of the |
| * object when compared to Metastore. |
| * |
| * Self-events: |
| * Events could be generated by this Catalog's operations as well. Invalidating table |
| * for such events is unnecessary and inefficient. In order to detect such self-events |
| * when catalog executes a DDL operation it appends the current catalog version to the |
| * list of version numbers for the in-flight events for the table. Events processor |
| * clears this version when the corresponding version number identified by serviceId is |
| * received in the event. This is needed since it is possible that a external |
| * non-Impala system which generates the event presents the same serviceId and version |
| * number later on. The algorithm to detect such self-event is as below. |
| * |
| * 1. Add the service id and expected catalog version to database/table/partition |
| * parameters when executing the DDL operation. When the HMS operation is successful, add |
| * the version number to the list of version for in-flight events at |
| * table/database/partition level. |
| * 2. When the event is received, the first time you see the combination of serviceId |
| * and version number, event processor clears the version number from table's list and |
| * determines the event as self-generated (and hence ignored). |
| * 3. If the event data presents a unknown serviceId or if the version number is not |
| * present in the list of in-flight versions, event is not a self-event and needs to be |
| * processed. |
| * |
| * In order to limit the total memory footprint, only 100 version numbers are stored at |
| * the catalog object. Since the event processor is expected to poll every few seconds |
| * this should be a reasonable bound which satisfies most use-cases. Otherwise, event |
| * processor may wrongly process a self-event to invalidate the table. In such a case, |
| * its a performance penalty not a correctness issue. |
| * |
| * All the operations which change the state of catalog cache while processing a certain |
| * event type must be atomic in nature. We rely on taking a DDL lock in CatalogOpExecutor |
| * in case of create/drop events and object (Db or table) level writeLock in case of alter |
| * events to make sure that readers are blocked while the metadata |
| * update operation is being performed. Since the events are generated post-metastore |
| * operations, such catalog updates do not need to update the state in Hive Metastore. |
| * |
| * Error Handling: The event processor could be in ACTIVE, PAUSED, ERROR states. In case |
| * of any errors while processing the events the state of event processor changes to ERROR |
| * and no subsequent events are polled. In such a case a invalidate metadata command |
| * restarts the event polling which updates the lastSyncedEventId to the latest from |
| * metastore. |
| * |
| * TODO: |
| * 1. a global invalidate metadata command to get the events processor out of error state |
| * is too heavy weight. We should make it easier to recover from the error state. |
| * 2. The createEventId logic can be extended to track the last eventId which the table |
| * has synced to and we can then get rid of self-event logic for alter events too. |
| */ |
| public class MetastoreEventsProcessor implements ExternalEventsProcessor { |
| |
| public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = |
| "hive.metastore.notifications.add.thrift.objects"; |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(MetastoreEventsProcessor.class); |
| |
| private static final MessageDeserializer MESSAGE_DESERIALIZER = |
| MetastoreShim.getMessageDeserializer(); |
| |
| private static MetastoreEventsProcessor instance; |
| |
| // maximum number of events to poll in each RPC |
| private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000; |
| |
| // maximum time to wait for a clean shutdown of scheduler in seconds |
| private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10; |
| |
| // time taken to fetch events during each poll |
| public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration"; |
| // time taken to apply all the events received duration each poll |
| public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration"; |
| // rate of events received per unit time |
| public static final String EVENTS_RECEIVED_METRIC = "events-received"; |
| // total number of events which are skipped because of the flag setting or |
| // in case of [CREATE|DROP] events on [DATABASE|TABLE|PARTITION] which were ignored |
| // because the [DATABASE|TABLE|PARTITION] was already [PRESENT|ABSENT] in the catalogd. |
| public static final String EVENTS_SKIPPED_METRIC = "events-skipped"; |
| // name of the event processor status metric |
| public static final String STATUS_METRIC = "status"; |
| // last synced event id |
| public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id"; |
| // last synced event time |
| public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time"; |
| // latest event id in Hive metastore |
| public static final String LATEST_EVENT_ID = "latest-event-id"; |
| // event time of the latest event in Hive metastore |
| public static final String LATEST_EVENT_TIME = "latest-event-time"; |
| // delay(secs) in events processing |
| public static final String EVENT_PROCESSING_DELAY = "event-processing-delay"; |
| // metric name for number of tables which are refreshed by event processor so far |
| public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed"; |
| // number of times events processor refreshed a partition |
| public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed"; |
| // number of tables which were added to the catalogd based on events. |
| public static final String NUMBER_OF_TABLES_ADDED = "tables-added"; |
| // number of tables which were removed to the catalogd based on events. |
| public static final String NUMBER_OF_TABLES_REMOVED = "tables-removed"; |
| // number of databases which were added to the catalogd based on events. |
| public static final String NUMBER_OF_DATABASES_ADDED = "databases-added"; |
| // number of database which were removed to the catalogd based on events. |
| public static final String NUMBER_OF_DATABASES_REMOVED = "databases-removed"; |
| // number of partitions which were added to the catalogd based on events. |
| public static final String NUMBER_OF_PARTITIONS_ADDED = "partitions-added"; |
| // number of partitions which were removed to the catalogd based on events. |
| public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed"; |
| // number of entries in the delete event log |
| public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size"; |
| // number of batch events generated |
| public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created"; |
| |
| // metric to measure the delay in msec, between the event created in metastore and time |
| // it took to be consumed by the event processor |
| public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming" + |
| "-delay"; |
| |
| private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L; |
| |
| // List of event types to skip while fetching notification events from metastore |
| private static final List<String> EVENT_SKIP_LIST = Arrays.asList("OPEN_TXN"); |
| |
| /** |
| * Wrapper around {@link |
| * MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog, |
| * long, NotificationFilter, int)} which passes the default batch size. |
| */ |
| public static List<NotificationEvent> getNextMetastoreEventsInBatches( |
| CatalogServiceCatalog catalog, long eventId, NotificationFilter filter) |
| throws MetastoreNotificationFetchException { |
| return getNextMetastoreEventsInBatches(catalog, eventId, filter, |
| EVENTS_BATCH_SIZE_PER_RPC); |
| } |
| |
| /** |
| * Gets the next list of {@link NotificationEvent} from Hive Metastore which are |
| * greater than the given eventId and filtered according to the provided filter. |
| * @param catalog The CatalogServiceCatalog used to get the metastore client |
| * @param eventId The eventId after which the events are needed. |
| * @param filter The {@link NotificationFilter} used to filter the list of fetched |
| * events. Note that this is a client side filter not a server side |
| * filter. Unfortunately, HMS doesn't provide a similar mechanism to |
| * do server side filtering. |
| * @param eventsBatchSize the batch size for fetching the events from metastore. |
| * @return List of {@link NotificationEvent} which are all greater than eventId and |
| * satisfy the given filter. |
| * @throws MetastoreNotificationFetchException in case of RPC errors to metastore. |
| */ |
| @VisibleForTesting |
| public static List<NotificationEvent> getNextMetastoreEventsInBatches( |
| CatalogServiceCatalog catalog, long eventId, NotificationFilter filter, |
| int eventsBatchSize) throws MetastoreNotificationFetchException { |
| Preconditions.checkArgument(eventsBatchSize > 0); |
| List<NotificationEvent> result = new ArrayList<>(); |
| try (MetaStoreClient msc = catalog.getMetaStoreClient()) { |
| long toEventId = msc.getHiveClient().getCurrentNotificationEventId() |
| .getEventId(); |
| if (toEventId <= eventId) return result; |
| long currentEventId = eventId; |
| while (currentEventId < toEventId) { |
| int batchSize = Math |
| .min(eventsBatchSize, (int)(toEventId - currentEventId)); |
| // we don't call the HiveMetaStoreClient's getNextNotification() |
| // call here because it can throw a IllegalStateException if the eventId |
| // which we pass in is very old and metastore has already cleaned up |
| // the events since that eventId. |
| NotificationEventRequest eventRequest = new NotificationEventRequest(); |
| eventRequest.setMaxEvents(batchSize); |
| eventRequest.setLastEvent(currentEventId); |
| NotificationEventResponse notificationEventResponse = |
| MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, true); |
| if (notificationEventResponse.getEvents().isEmpty()) { |
| // Possible to receive empty list due to event skip list in request |
| return result; |
| } |
| for (NotificationEvent event : notificationEventResponse.getEvents()) { |
| // if no filter is provided we add all the events |
| if (filter == null || filter.accept(event)) result.add(event); |
| currentEventId = event.getEventId(); |
| } |
| } |
| return result; |
| } catch (MetastoreClientInstantiationException | TException e) { |
| throw new MetastoreNotificationFetchException(String.format( |
| CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e); |
| } |
| } |
| |
| /** |
| * Sync table to latest event id starting from last synced |
| * event id. |
| * @param catalog |
| * @param tbl: Catalog table to be synced |
| * @param eventFactory |
| * @throws CatalogException |
| * @throws MetastoreNotificationException |
| */ |
| public static void syncToLatestEventId(CatalogServiceCatalog catalog, |
| org.apache.impala.catalog.Table tbl, EventFactory eventFactory, Metrics metrics) |
| throws CatalogException, MetastoreNotificationException { |
| Preconditions.checkArgument(tbl != null, "tbl is null"); |
| Preconditions.checkState(!(tbl instanceof IncompleteTable) && |
| tbl.isLoaded(), "table %s is either incomplete or not loaded", |
| tbl.getFullName()); |
| Preconditions.checkState(tbl.isWriteLockedByCurrentThread(), |
| String.format("Write lock is not held on table %s by current thread", |
| tbl.getFullName())); |
| long lastEventId = tbl.getLastSyncedEventId(); |
| Preconditions.checkArgument(lastEventId > 0, "lastEvent " + |
| " Id %s for table %s should be greater than 0", lastEventId, tbl.getFullName()); |
| |
| String annotation = String.format("sync table %s to latest HMS event id", |
| tbl.getFullName()); |
| try(ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) { |
| List<NotificationEvent> events = getNextMetastoreEventsInBatches(catalog, |
| lastEventId, getTableNotificationEventFilter(tbl)); |
| |
| if (events.isEmpty()) { |
| LOG.debug("table {} synced till event id {}. No new HMS events to process from " |
| + "event id: {}", tbl.getFullName(), lastEventId, lastEventId + 1); |
| return; |
| } |
| MetastoreEvents.MetastoreEvent currentEvent = null; |
| for (NotificationEvent event : events) { |
| currentEvent = eventFactory.get(event, metrics); |
| LOG.trace("for table {}, processing event {}", tbl.getFullName(), currentEvent); |
| currentEvent.processIfEnabled(); |
| if (currentEvent.isDropEvent()) { |
| // currentEvent can only be DropPartition or DropTable |
| Preconditions.checkNotNull(currentEvent.getDbName()); |
| Preconditions.checkNotNull(currentEvent.getTableName()); |
| String key = DeleteEventLog.getTblKey(currentEvent.getDbName(), |
| currentEvent.getTableName()); |
| catalog.getMetastoreEventProcessor().getDeleteEventLog() |
| .addRemovedObject(currentEvent.getEventId(), key); |
| } |
| if (currentEvent instanceof MetastoreEvents.DropTableEvent) { |
| // return after processing table drop event |
| return; |
| } |
| } |
| // setting HMS Event ID after all the events |
| // are successfully processed only if table was |
| // not dropped |
| // Certain events like alter_table, do an incremental reload which sets the event |
| // id to the current hms event id at that time. Therefore, check table's last |
| // synced event id again before setting currentEvent id |
| if (currentEvent.getEventId() > tbl.getLastSyncedEventId()) { |
| tbl.setLastSyncedEventId(currentEvent.getEventId()); |
| } |
| LOG.info("Synced table {} till HMS event: {}", tbl.getFullName(), |
| tbl.getLastSyncedEventId()); |
| } |
| } |
| |
| /** |
| * Sync database to latest event id starting from the last synced |
| * event id |
| * @param catalog |
| * @param db |
| * @param eventFactory |
| * @throws CatalogException |
| * @throws MetastoreNotificationException |
| */ |
| public static void syncToLatestEventId(CatalogServiceCatalog catalog, |
| org.apache.impala.catalog.Db db, EventFactory eventFactory, Metrics metrics) |
| throws CatalogException, MetastoreNotificationException { |
| Preconditions.checkArgument(db != null, "db is null"); |
| long lastEventId = db.getLastSyncedEventId(); |
| Preconditions.checkArgument(lastEventId > 0, "Invalid " + |
| "last synced event ID %s for db %s ", lastEventId, db.getName()); |
| Preconditions.checkState(db.isLockHeldByCurrentThread(), |
| "Current thread does not hold lock on db: %s", db.getName()); |
| |
| String annotation = String.format("sync db %s to latest HMS event id", db.getName()); |
| try(ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) { |
| List<NotificationEvent> events = getNextMetastoreEventsInBatches(catalog, |
| lastEventId, getDbNotificationEventFilter(db)); |
| |
| if (events.isEmpty()) { |
| LOG.debug("db {} already synced till event id: {}, no new hms events from " |
| + "event id: {}", db.getName(), lastEventId, lastEventId+1); |
| return; |
| } |
| |
| MetastoreEvents.MetastoreEvent currentEvent = null; |
| for (NotificationEvent event : events) { |
| currentEvent = eventFactory.get(event, metrics); |
| LOG.trace("for db {}, processing event: {}", db.getName(), currentEvent); |
| currentEvent.processIfEnabled(); |
| if (currentEvent.isDropEvent()) { |
| Preconditions.checkState(currentEvent instanceof DropDatabaseEvent, |
| "invalid drop event {} ", currentEvent); |
| Preconditions.checkNotNull(currentEvent.getDbName()); |
| String key = DeleteEventLog.getDbKey(currentEvent.getDbName()); |
| catalog.getMetastoreEventProcessor().getDeleteEventLog() |
| .addRemovedObject(currentEvent.getEventId(), key); |
| // return after processing drop db event |
| return; |
| } |
| } |
| // setting HMS Event Id after all the events |
| // are successfully processed only if db was not dropped |
| db.setLastSyncedEventId(currentEvent.getEventId()); |
| LOG.info("Synced db {} till HMS event {}", db.getName(), |
| currentEvent); |
| } |
| } |
| |
| /* |
| This filter is used when syncing events for a table to the latest HMS event id. |
| It filters all events except db related ones. |
| */ |
| private static NotificationFilter getTableNotificationEventFilter(Table tbl) { |
| NotificationFilter filter = new NotificationFilter() { |
| @Override |
| public boolean accept(NotificationEvent event) { |
| if (event.getDbName() != null && event.getTableName() != null) { |
| return tbl.getDb().getName().equalsIgnoreCase(event.getDbName()) && |
| tbl.getName().equalsIgnoreCase(event.getTableName()); |
| } |
| // filter all except db events |
| return event.getDbName() == null; |
| } |
| }; |
| return filter; |
| } |
| |
| /* |
| This filter is used when syncing db to the latest HMS event id. The |
| filter accepts all events except table related ones |
| */ |
| private static NotificationFilter getDbNotificationEventFilter(Db db) { |
| NotificationFilter filter = new NotificationFilter() { |
| @Override |
| public boolean accept(NotificationEvent event) { |
| if (event.getDbName() != null && event.getTableName() == null) { |
| return db.getName().equalsIgnoreCase(event.getDbName()); |
| } |
| // filter all events except table events |
| return event.getTableName() == null; |
| } |
| }; |
| return filter; |
| } |
| |
| // possible status of event processor |
| public enum EventProcessorStatus { |
| PAUSED, // event processor is paused because catalog is being reset concurrently |
| ACTIVE, // event processor is scheduled at a given frequency |
| ERROR, // event processor is in error state and event processing has stopped |
| NEEDS_INVALIDATE, // event processor could not resolve certain events and needs a |
| // manual invalidate command to reset the state (See AlterEvent for a example) |
| STOPPED, // event processor is shutdown. No events will be processed |
| DISABLED // event processor is not configured to run |
| } |
| |
| // current status of this event processor |
| private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED; |
| |
| // error message when event processor comes into ERROR/NEEDS_INVALIDATE states |
| private String eventProcessorErrorMsg_ = null; |
| |
| // event factory which is used to get or create MetastoreEvents |
| private final MetastoreEventFactory metastoreEventFactory_; |
| |
| // keeps track of the current event that we are processing |
| private NotificationEvent currentEvent_; |
| |
| // keeps track of the last event id which we have synced to |
| private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1); |
| private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0); |
| |
| // The event id and eventTime of the latest event in HMS. Only used in metrics to show |
| // how far we are lagging behind. |
| private final AtomicLong latestEventId_ = new AtomicLong(0); |
| private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0); |
| |
| // The duration in nanoseconds of the processing of the last event batch. |
| private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0); |
| |
| // polling interval in seconds. Note this is a time we wait AFTER each fetch call |
| private final long pollingFrequencyInSec_; |
| |
| // catalog service instance to be used while processing events |
| protected final CatalogServiceCatalog catalog_; |
| |
| // scheduler daemon thread executor for processing events at given frequency |
| private final ScheduledExecutorService processEventsScheduler_ = |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat("MetastoreEventsProcessor-ProcessEvents") |
| .build()); |
| |
| // scheduler daemon thread executor to update the latest event id at given frequency |
| private final ScheduledExecutorService updateEventIdScheduler_ = |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat("MetastoreEventsProcessor-UpdateEventId") |
| .build()); |
| |
| // metrics registry to keep track of metrics related to event processing |
| //TODO create a separate singleton class which wraps around this so that we don't |
| // have to pass it around as a argument in constructor in MetastoreEvents |
| private final Metrics metrics_ = new Metrics(); |
| |
| // When events processing is ACTIVE this delete event log is used to keep track of |
| // DROP events for databases, tables and partitions so that the MetastoreEventsProcessor |
| // can ignore the drop events when they are received later. |
| private final DeleteEventLog deleteEventLog_ = new DeleteEventLog(); |
| |
| @VisibleForTesting |
| MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long startSyncFromId, |
| long pollingFrequencyInSec) throws CatalogException { |
| Preconditions.checkState(pollingFrequencyInSec > 0); |
| this.catalog_ = Preconditions.checkNotNull(catalogOpExecutor.getCatalog()); |
| validateConfigs(); |
| lastSyncedEventId_.set(startSyncFromId); |
| lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(startSyncFromId)); |
| initMetrics(); |
| metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor); |
| pollingFrequencyInSec_ = pollingFrequencyInSec; |
| } |
| |
| /** |
| * Fetches the required metastore config values and validates them against the |
| * expected values. The configurations to validate are different for HMS-2 v/s HMS-3 |
| * and hence it uses MetastoreShim to get the configurations which need to be validated. |
| * @throws CatalogException if one or more validations fail or if metastore is not |
| * accessible |
| */ |
| @VisibleForTesting |
| void validateConfigs() throws CatalogException { |
| List<ValidationResult> validationErrors = new ArrayList<>(); |
| for (MetastoreEventProcessorConfig config : getEventProcessorConfigsToValidate()) { |
| String configKey = config.getValidator().getConfigKey(); |
| try { |
| String value = getConfigValueFromMetastore(configKey, ""); |
| ValidationResult result = config.validate(value); |
| if (!result.isValid()) validationErrors.add(result); |
| } catch (TException e) { |
| String msg = String.format("Unable to get configuration %s from metastore. Check " |
| + "if metastore is accessible", configKey); |
| LOG.error(msg, e); |
| throw new CatalogException(msg); |
| } |
| } |
| if (!validationErrors.isEmpty()) { |
| LOG.error("Found {} incorrect metastore configuration(s).", |
| validationErrors.size()); |
| for (ValidationResult invalidConfig: validationErrors) { |
| LOG.error(invalidConfig.getReason()); |
| } |
| throw new CatalogException(String.format("Found %d incorrect metastore " |
| + "configuration(s). Events processor cannot start. See ERROR log for more " |
| + "details.", validationErrors.size())); |
| } |
| } |
| |
| public DeleteEventLog getDeleteEventLog() { return deleteEventLog_; } |
| |
| /** |
| * Returns the list of Metastore configurations to validate depending on the hive |
| * version |
| */ |
| public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() { |
| return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, |
| MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME); |
| } |
| |
| private void initMetrics() { |
| metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC); |
| metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC); |
| metrics_.addMeter(EVENTS_RECEIVED_METRIC); |
| metrics_.addCounter(EVENTS_SKIPPED_METRIC); |
| metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () -> getStatus().toString()); |
| metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>) lastSyncedEventId_::get); |
| metrics_.addGauge(LAST_SYNCED_EVENT_TIME, |
| (Gauge<Long>) lastSyncedEventTimeSecs_::get); |
| metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get); |
| metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) latestEventTimeSecs_::get); |
| metrics_.addGauge(EVENT_PROCESSING_DELAY, |
| (Gauge<Long>) () -> latestEventTimeSecs_.get() - lastSyncedEventTimeSecs_.get()); |
| metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES); |
| metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES); |
| metrics_.addCounter(NUMBER_OF_TABLES_ADDED); |
| metrics_.addCounter(NUMBER_OF_TABLES_REMOVED); |
| metrics_.addCounter(NUMBER_OF_DATABASES_ADDED); |
| metrics_.addCounter(NUMBER_OF_DATABASES_REMOVED); |
| metrics_.addCounter(NUMBER_OF_PARTITIONS_ADDED); |
| metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED); |
| metrics_ |
| .addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size); |
| metrics_.addCounter(NUMBER_OF_BATCH_EVENTS); |
| metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS); |
| } |
| |
| /** |
| * Schedules the daemon thread at a given frequency. It is important to note that this |
| * method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at |
| * a fixedDelay is to make sure that we don't pile up the pending tasks in case each |
| * polling operation is taking longer than the given frequency. Because of the fixed |
| * delay, the new poll operation is scheduled at the time when previousPoll operation |
| * completes + givenDelayInSec |
| */ |
| @Override |
| public synchronized void start() { |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE); |
| startScheduler(); |
| updateStatus(EventProcessorStatus.ACTIVE); |
| LOG.info(String.format("Successfully started metastore event processing." |
| + " Polling interval: %d seconds.", pollingFrequencyInSec_)); |
| } |
| |
| /** |
| * Gets the current event processor status |
| */ |
| public EventProcessorStatus getStatus() { |
| return eventProcessorStatus_; |
| } |
| |
| /** |
| * Returns the value for a given config key from Hive Metastore. |
| * @param config Hive configuration name |
| * @param defaultVal Default value to return if config not present in Hive |
| */ |
| @VisibleForTesting |
| public String getConfigValueFromMetastore(String config, String defaultVal) |
| throws TException { |
| try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { |
| IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient(); |
| return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal); |
| } |
| } |
| |
| /** |
| * returns the current value of LastSyncedEventId. This method is not thread-safe and |
| * only to be used for testing purposes |
| */ |
| @VisibleForTesting |
| public long getLastSyncedEventId() { |
| return lastSyncedEventId_.get(); |
| } |
| |
| /** |
| * Returns the current value of latestEventId_. This method is not thread-safe and |
| * only to be used for testing purposes |
| */ |
| @VisibleForTesting |
| public long getLatestEventId() { |
| return latestEventId_.get(); |
| } |
| |
| @VisibleForTesting |
| void startScheduler() { |
| Preconditions.checkState(pollingFrequencyInSec_ > 0); |
| LOG.info(String.format("Starting metastore event polling with interval %d seconds.", |
| pollingFrequencyInSec_)); |
| processEventsScheduler_.scheduleAtFixedRate(this ::processEvents, |
| pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS); |
| // Update latestEventId in another thread in case that the processEvents() thread is |
| // blocked by slow metadata reloading or waiting for table locks. |
| updateEventIdScheduler_.scheduleAtFixedRate(this ::updateLatestEventId, |
| pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Stops the event processing and changes the status of event processor to |
| * <code>EventProcessorStatus.PAUSED</code>. No new events will be processed as long |
| * the status is stopped. If this event processor is actively processing events when |
| * stop is called, this method blocks until the current processing is complete |
| */ |
| @Override |
| public synchronized void pause() { |
| // when concurrent invalidate metadata are running, it is possible the we receive |
| // a pause method call on a already paused events processor. |
| if (eventProcessorStatus_ == EventProcessorStatus.PAUSED) { |
| return; |
| } |
| updateStatus(EventProcessorStatus.PAUSED); |
| LOG.info(String.format("Event processing is paused. Last synced event id is %d", |
| lastSyncedEventId_.get())); |
| } |
| |
| /** |
| * Get the current notification event id from metastore |
| */ |
| @Override |
| public long getCurrentEventId() throws MetastoreNotificationFetchException { |
| try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { |
| return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId(); |
| } catch (MetastoreClientInstantiationException | TException e) { |
| throw new MetastoreNotificationFetchException("Unable to fetch the current " + |
| "notification event id. Check if metastore service is accessible", e); |
| } |
| } |
| |
| public static long getCurrentEventIdNoThrow(IMetaStoreClient client) { |
| long latestEventId = -1L; |
| try { |
| latestEventId = client.getCurrentNotificationEventId().getEventId(); |
| } catch (TException exception) { |
| LOG.warn(String.format("Unable to fetch latest event id from HMS: %s", |
| exception.getMessage())); |
| } |
| return latestEventId; |
| } |
| |
| /** |
| * Fetch the event from HMS specified by 'eventId' |
| * @return null if the event has been cleaned up or any error occurs. |
| */ |
| private NotificationEvent getEventFromHMS(MetaStoreClient msClient, long eventId) { |
| NotificationEventRequest eventRequest = new NotificationEventRequest(); |
| eventRequest.setLastEvent(eventId - 1); |
| eventRequest.setMaxEvents(1); |
| try { |
| NotificationEventResponse response = MetastoreShim.getNextNotification( |
| msClient.getHiveClient(), eventRequest, false); |
| Iterator<NotificationEvent> eventIter = response.getEventsIterator(); |
| if (!eventIter.hasNext()) { |
| LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId); |
| return null; |
| } |
| return eventIter.next(); |
| } catch (TException e) { |
| LOG.warn("Unable to fetch event {}", eventId, e); |
| } |
| return null; |
| } |
| |
| /** |
| * Get the event time by fetching the specified event from HMS. |
| * @return 0 if the event has been cleaned up or any error occurs. |
| */ |
| @VisibleForTesting |
| public int getEventTimeFromHMS(long eventId) { |
| try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { |
| NotificationEvent event = getEventFromHMS(msClient, eventId); |
| if (event != null) return event.getEventTime(); |
| } catch (MetastoreClientInstantiationException e) { |
| LOG.error("Failed to get event time from HMS for event {}", eventId, e); |
| } |
| return 0; |
| } |
| |
| /** |
| * Starts the event processor from a given event id |
| */ |
| @Override |
| public synchronized void start(long fromEventId) { |
| Preconditions.checkArgument(fromEventId >= 0); |
| EventProcessorStatus currentStatus = eventProcessorStatus_; |
| long prevLastSyncedEventId = lastSyncedEventId_.get(); |
| if (currentStatus == EventProcessorStatus.ACTIVE) { |
| // if events processor is already active, we should make sure that the |
| // start event id provided is not behind the lastSyncedEventId. This could happen |
| // when there are concurrent invalidate metadata calls. if we detect such a case |
| // we should return here. |
| if (prevLastSyncedEventId >= fromEventId) { |
| return; |
| } |
| } |
| lastSyncedEventId_.set(fromEventId); |
| lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId)); |
| updateStatus(EventProcessorStatus.ACTIVE); |
| LOG.info(String.format( |
| "Metastore event processing restarted. Last synced event id was updated " |
| + "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_.get())); |
| } |
| |
| /** |
| * Stops the event processing and shuts down the scheduler. In case there is a batch of |
| * events which is being processed currently, the |
| * <code>processEvents</code> method releases lock after every event is processed. |
| * Hence, it is possible that at-least 1 event from the batch be |
| * processed while shutdown() waits to acquire lock on this object. |
| */ |
| @Override |
| public synchronized void shutdown() { |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED, |
| "Event processing is already stopped"); |
| shutdownAndAwaitTermination(processEventsScheduler_); |
| shutdownAndAwaitTermination(updateEventIdScheduler_); |
| updateStatus(EventProcessorStatus.STOPPED); |
| LOG.info("Metastore event processing stopped."); |
| } |
| |
| /** |
| * Attempts to cleanly shutdown the scheduler pool. If the pool does not shutdown |
| * within timeout, does a force shutdown which might interrupt currently running tasks. |
| */ |
| private synchronized void shutdownAndAwaitTermination(ScheduledExecutorService ses) { |
| Preconditions.checkNotNull(ses); |
| ses.shutdown(); // disable new tasks from being submitted |
| try { |
| // wait for 10 secs for scheduler to complete currently running tasks |
| if (!ses.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { |
| // executor couldn't terminate and timed-out, force the termination |
| LOG.info(String.format("Scheduler pool did not terminate within %d seconds. " |
| + "Attempting to stop currently running tasks", SCHEDULER_SHUTDOWN_TIMEOUT)); |
| ses.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| // current thread interrupted while pool was waiting for termination |
| // issue a shutdownNow before returning to cancel currently running tasks |
| LOG.info("Received interruptedException. Terminating currently running tasks.", e); |
| ses.shutdownNow(); |
| } |
| } |
| |
| /** |
| * Gets metastore notification events from the given eventId. The returned list of |
| * NotificationEvents are filtered using the NotificationFilter provided if it is not |
| * null. |
| * @param eventId The returned events are all after this given event id. |
| * @param currentEventId Current event id on metastore |
| * @param getAllEvents If this is true all the events since eventId are returned. |
| * Note that Hive MetaStore can limit the response to a specific |
| * maximum number of limit based on the value of configuration |
| * {@code hive.metastore.max.event.response}. |
| * If it is false, only EVENTS_BATCH_SIZE_PER_RPC events are |
| * returned, caller is expected to issue more calls to this method |
| * to fetch the remaining events. |
| * @param filter This is a nullable argument. If not null, the events are filtered |
| * and then returned using this. Otherwise, all the events are returned. |
| * @return List of NotificationEvents from metastore since eventId. |
| * @throws MetastoreNotificationFetchException In case of exceptions from HMS. |
| */ |
| public List<NotificationEvent> getNextMetastoreEvents(final long eventId, |
| final long currentEventId, final boolean getAllEvents, |
| @Nullable final NotificationFilter filter) |
| throws MetastoreNotificationFetchException { |
| // no new events since we last polled |
| if (currentEventId <= eventId) { |
| return Collections.emptyList(); |
| } |
| final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time(); |
| try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { |
| int batchSize = getAllEvents ? -1 : EVENTS_BATCH_SIZE_PER_RPC; |
| // we use the thrift API directly instead of |
| // HiveMetastoreClient#getNextNotification because the HMS client can throw an |
| // exception when there is a gap between the eventIds returned. |
| NotificationEventRequest eventRequest = new NotificationEventRequest(); |
| eventRequest.setLastEvent(eventId); |
| eventRequest.setMaxEvents(batchSize); |
| NotificationEventResponse response = |
| MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, true); |
| LOG.info(String.format("Received %d events. Start event id : %d", |
| response.getEvents().size(), eventId)); |
| if (filter == null) return response.getEvents(); |
| List<NotificationEvent> filteredEvents = new ArrayList<>(); |
| for (NotificationEvent event : response.getEvents()) { |
| if (filter.accept(event)) filteredEvents.add(event); |
| } |
| return filteredEvents; |
| } catch (MetastoreClientInstantiationException | TException e) { |
| throw new MetastoreNotificationFetchException( |
| "Unable to fetch notifications from metastore. Last synced event id is " |
| + eventId, e); |
| } finally { |
| context.stop(); |
| } |
| } |
| |
| /** |
| * Fetch the next batch of NotificationEvents from metastore. The default batch size is |
| * <code>EVENTS_BATCH_SIZE_PER_RPC</code> |
| */ |
| @VisibleForTesting |
| protected List<NotificationEvent> getNextMetastoreEvents() |
| throws MetastoreNotificationFetchException { |
| return getNextMetastoreEvents(getCurrentEventId()); |
| } |
| |
| /** |
| * Fetch the next batch of NotificationEvents from metastore. The default batch size is |
| * <code>EVENTS_BATCH_SIZE_PER_RPC</code> |
| * @param currentEventId Current event id on metastore |
| * @return List of NotificationEvents from metastore since lastSyncedEventId |
| * @throws MetastoreNotificationFetchException |
| */ |
| @VisibleForTesting |
| public List<NotificationEvent> getNextMetastoreEvents(long currentEventId) |
| throws MetastoreNotificationFetchException { |
| return getNextMetastoreEvents(lastSyncedEventId_.get(), currentEventId, false, null); |
| } |
| |
| /** |
| * This method issues a request to Hive Metastore if needed, based on the current event |
| * id in metastore and the last synced event_id. Events are fetched in fixed sized |
| * batches. Each NotificationEvent received is processed by its corresponding |
| * <code>MetastoreEvent</code> |
| */ |
| @Override |
| public void processEvents() { |
| currentEvent_ = null; |
| try { |
| EventProcessorStatus currentStatus = eventProcessorStatus_; |
| if (currentStatus != EventProcessorStatus.ACTIVE) { |
| LOG.warn(String.format( |
| "Event processing is skipped since status is %s. Last synced event id is %d", |
| currentStatus, lastSyncedEventId_.get())); |
| return; |
| } |
| // fetch the current notification event id. We assume that the polling interval |
| // is small enough that most of these polling operations result in zero new |
| // events. In such a case, fetching current notification event id is much faster |
| // (and cheaper on HMS side) instead of polling for events directly |
| long currentEventId = getCurrentEventId(); |
| List<NotificationEvent> events = getNextMetastoreEvents(currentEventId); |
| processEvents(currentEventId, events); |
| } catch (MetastoreNotificationFetchException ex) { |
| // No need to change the EventProcessor state to error since we want the |
| // EventProcessor to continue getting new events after HMS is back up. |
| LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore " + |
| "may be unavailable. Will retry.", ex); |
| } catch (MetastoreNotificationNeedsInvalidateException ex) { |
| updateStatus(EventProcessorStatus.NEEDS_INVALIDATE); |
| String msg = "Event processing needs a invalidate command to resolve the state"; |
| LOG.error(msg, ex); |
| eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + |
| ExceptionUtils.getFullStackTrace(ex); |
| } catch (Exception ex) { |
| // There are lot of Preconditions which can throw RuntimeExceptions when we |
| // process events this catch all exception block is needed so that the scheduler |
| // thread does not die silently |
| updateStatus(EventProcessorStatus.ERROR); |
| String msg = "Unexpected exception received while processing event"; |
| LOG.error(msg, ex); |
| eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + |
| ExceptionUtils.getFullStackTrace(ex); |
| dumpEventInfoToLog(currentEvent_); |
| } |
| } |
| |
| /** |
| * Update the latest event id regularly so we know how far we are lagging behind. |
| */ |
| @VisibleForTesting |
| public void updateLatestEventId() { |
| EventProcessorStatus currentStatus = eventProcessorStatus_; |
| if (currentStatus != EventProcessorStatus.ACTIVE) { |
| return; |
| } |
| try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { |
| CurrentNotificationEventId currentNotificationEventId = |
| msClient.getHiveClient().getCurrentNotificationEventId(); |
| long currentEventId = currentNotificationEventId.getEventId(); |
| // no new events since we last polled |
| if (currentEventId <= latestEventId_.get()) { |
| return; |
| } |
| // Fetch the last event to get its eventTime. |
| NotificationEvent event = getEventFromHMS(msClient, currentEventId); |
| // Events could be empty if they are just cleaned up. |
| if (event == null) return; |
| long lastSyncedEventId = lastSyncedEventId_.get(); |
| long lastSyncedEventTime = lastSyncedEventTimeSecs_.get(); |
| long currentEventTime = event.getEventTime(); |
| latestEventId_.set(currentEventId); |
| latestEventTimeSecs_.set(currentEventTime); |
| LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={}, time={}.", |
| currentEventId, currentEventTime, lastSyncedEventId, lastSyncedEventTime); |
| if (currentEventTime > lastSyncedEventTime) { |
| LOG.warn("Lag: {}. {} events pending to be processed.", |
| PrintUtils.printTimeMs((currentEventTime - lastSyncedEventTime) * 1000), |
| currentEventId - lastSyncedEventId); |
| } |
| } catch (Exception e) { |
| LOG.error("Unable to update current notification event id. Last value: {}", |
| latestEventId_, e); |
| } |
| } |
| |
| /** |
| * Gets the current event processor metrics along with its status. If the status is |
| * not active the metrics are skipped. Only the status is sent |
| */ |
| @Override |
| public TEventProcessorMetrics getEventProcessorMetrics() { |
| TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics(); |
| EventProcessorStatus currentStatus = getStatus(); |
| eventProcessorMetrics.setStatus(currentStatus.toString()); |
| eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId()); |
| if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics; |
| // The following counters are only updated when event-processor is active. |
| eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeSecs_.get()); |
| eventProcessorMetrics.setLatest_event_id(latestEventId_.get()); |
| eventProcessorMetrics.setLatest_event_time(latestEventTimeSecs_.get()); |
| |
| long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount(); |
| long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount(); |
| eventProcessorMetrics.setEvents_received(eventsReceived); |
| eventProcessorMetrics.setEvents_skipped(eventsSkipped); |
| |
| Snapshot fetchDuration = |
| metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getSnapshot(); |
| double avgFetchDuration = fetchDuration.getMean() / SECOND_IN_NANOS; |
| double p75FetchDuration = fetchDuration.get75thPercentile() / SECOND_IN_NANOS; |
| double p95FetchDuration = fetchDuration.get95thPercentile() / SECOND_IN_NANOS; |
| double p99FetchDuration = fetchDuration.get99thPercentile() / SECOND_IN_NANOS; |
| eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration); |
| eventProcessorMetrics.setEvents_fetch_duration_p75(p75FetchDuration); |
| eventProcessorMetrics.setEvents_fetch_duration_p95(p95FetchDuration); |
| eventProcessorMetrics.setEvents_fetch_duration_p99(p99FetchDuration); |
| |
| Snapshot processDuration = |
| metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getSnapshot(); |
| double avgProcessDuration = processDuration.getMean() / SECOND_IN_NANOS; |
| double p75ProcessDuration = processDuration.get75thPercentile() / SECOND_IN_NANOS; |
| double p95ProcessDuration = processDuration.get95thPercentile() / SECOND_IN_NANOS; |
| double p99ProcessDuration = processDuration.get99thPercentile() / SECOND_IN_NANOS; |
| eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration); |
| eventProcessorMetrics.setEvents_process_duration_p75(p75ProcessDuration); |
| eventProcessorMetrics.setEvents_process_duration_p95(p95ProcessDuration); |
| eventProcessorMetrics.setEvents_process_duration_p99(p99ProcessDuration); |
| |
| double lastProcessDuration = lastEventProcessDurationNs_.get() / |
| (double) SECOND_IN_NANOS; |
| eventProcessorMetrics.setLast_events_process_duration(lastProcessDuration); |
| |
| double avgNumberOfEventsReceived1Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate(); |
| double avgNumberOfEventsReceived5Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate(); |
| double avgNumberOfEventsReceived15Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate(); |
| eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min); |
| eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min); |
| eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min); |
| |
| LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process " |
| + "duration: {} Events received rate (1min) : {}", |
| eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration, |
| avgNumberOfEventsReceived1Min); |
| return eventProcessorMetrics; |
| } |
| |
| @Override |
| public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() { |
| TEventProcessorMetricsSummaryResponse summaryResponse = |
| new TEventProcessorMetricsSummaryResponse(); |
| summaryResponse.setSummary(metrics_.toString()); |
| if (eventProcessorErrorMsg_ != null) { |
| summaryResponse.setError_msg(eventProcessorErrorMsg_); |
| } |
| return summaryResponse; |
| } |
| |
| @VisibleForTesting |
| Metrics getMetrics() { |
| return metrics_; |
| } |
| |
| /** |
| * Process the given list of notification events. Useful for tests which provide a list |
| * of events |
| * @param currentEventId Current event id on metastore |
| * @param events List of NotificationEvents |
| * @throws MetastoreNotificationException |
| */ |
| @VisibleForTesting |
| protected void processEvents(long currentEventId, List<NotificationEvent> events) |
| throws MetastoreNotificationException { |
| currentEvent_ = null; |
| // update the events received metric before returning |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size()); |
| if (events.isEmpty()) { |
| if (lastSyncedEventId_.get() < currentEventId) { |
| // Possible to receive empty list due to event skip list in notification event |
| // request. Update the last synced event id with current event id on metastore |
| lastSyncedEventId_.set(currentEventId); |
| lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(currentEventId)); |
| } |
| return; |
| } |
| final Timer.Context context = |
| metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time(); |
| Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>(); |
| try { |
| List<MetastoreEvent> filteredEvents = |
| metastoreEventFactory_.getFilteredEvents(events, metrics_); |
| if (filteredEvents.isEmpty()) { |
| NotificationEvent e = events.get(events.size() - 1); |
| lastSyncedEventId_.set(e.getEventId()); |
| lastSyncedEventTimeSecs_.set(e.getEventTime()); |
| return; |
| } |
| for (MetastoreEvent event : filteredEvents) { |
| // synchronizing each event processing reduces the scope of the lock so the a |
| // potential reset() during event processing is not blocked for longer than |
| // necessary |
| synchronized (this) { |
| if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) { |
| break; |
| } |
| currentEvent_ = event.metastoreNotificationEvent_; |
| String targetName = event.getTargetName(); |
| String desc = String.format("Processing %s on %s, eventId=%d", |
| event.getEventType(), targetName, event.getEventId()); |
| try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) { |
| long startMs = System.currentTimeMillis(); |
| event.processIfEnabled(); |
| long elapsedTimeMs = System.currentTimeMillis() - startMs; |
| eventProcessingTime.put(event, elapsedTimeMs); |
| } |
| deleteEventLog_.garbageCollect(event.getEventId()); |
| lastSyncedEventId_.set(event.getEventId()); |
| lastSyncedEventTimeSecs_.set(event.getEventTime()); |
| metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update( |
| (System.currentTimeMillis() / 1000) - event.getEventTime(), |
| TimeUnit.SECONDS); |
| } |
| } |
| } catch (CatalogException e) { |
| throw new MetastoreNotificationException(String.format( |
| "Unable to process event %d of type %s. Event processing will be stopped.", |
| currentEvent_.getEventId(), currentEvent_.getEventType()), e); |
| } finally { |
| long elapsedNs = context.stop(); |
| lastEventProcessDurationNs_.set(elapsedNs); |
| logEventMetrics(eventProcessingTime, elapsedNs); |
| } |
| } |
| |
| private void logEventMetrics(Map<MetastoreEvent, Long> eventProcessingTime, |
| long elapsedNs) { |
| LOG.info("Time elapsed in processing event batch: {}", |
| PrintUtils.printTimeNs(elapsedNs)); |
| // Only log the metrics when the processing on this batch is slow. |
| if (elapsedNs < HdfsTable.LOADING_WARNING_TIME_NS) return; |
| // Get the top-10 expensive events |
| List<Map.Entry<MetastoreEvent, Long>> eventList = |
| new ArrayList<>(eventProcessingTime.entrySet()); |
| eventList.sort(Map.Entry.<MetastoreEvent, Long>comparingByValue().reversed()); |
| int num = Math.min(10, eventList.size()); |
| StringBuilder report = new StringBuilder("Top " + num + " expensive events: "); |
| for (Map.Entry<MetastoreEvent, Long> entry : eventList.subList(0, num)) { |
| MetastoreEvent event = entry.getKey(); |
| long durationMs = entry.getValue(); |
| report.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d) ", |
| event.getEventType(), event.getEventId(), event.getTargetName(), durationMs)); |
| } |
| // Get the top-10 expensive targets |
| Map<String, Long> durationPerTable = new HashMap<>(); |
| for (MetastoreEvent event : eventProcessingTime.keySet()) { |
| String targetName = event.getTargetName(); |
| long durationMs = durationPerTable.getOrDefault(targetName, 0L) + |
| eventProcessingTime.get(event); |
| durationPerTable.put(targetName, durationMs); |
| } |
| List<Map.Entry<String, Long>> targetList = |
| new ArrayList<>(durationPerTable.entrySet()); |
| targetList.sort(Map.Entry.<String, Long>comparingByValue().reversed()); |
| num = Math.min(10, targetList.size()); |
| report.append("\nTop ").append(num).append(" targets in event processing: "); |
| for (Map.Entry<String, Long> entry : targetList.subList(0, num)) { |
| String targetName = entry.getKey(); |
| long durationMs = entry.getValue(); |
| report.append(String.format("(target=%s, duration_ms=%d) ", |
| targetName, durationMs)); |
| } |
| LOG.warn(report.toString()); |
| } |
| |
| /** |
| * Updates the current states to the given status. |
| */ |
| private synchronized void updateStatus(EventProcessorStatus toStatus) { |
| eventProcessorStatus_ = toStatus; |
| } |
| |
| private void dumpEventInfoToLog(NotificationEvent event) { |
| if (event == null) { |
| String error = "Notification event is null"; |
| LOG.error(error); |
| eventProcessorErrorMsg_ += '\n' + error; |
| return; |
| } |
| StringBuilder msg = |
| new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n") |
| .append("Event Type: ").append(event.getEventType()).append("\n") |
| .append("Event time: ").append(event.getEventTime()).append("\n") |
| .append("Database name: ").append(event.getDbName()).append("\n"); |
| if (event.getTableName() != null) { |
| msg.append("Table name: ").append(event.getTableName()).append("\n"); |
| } |
| msg.append("Event message: ").append(event.getMessage()).append("\n"); |
| String msgStr = msg.toString(); |
| LOG.error(msgStr); |
| eventProcessorErrorMsg_ += '\n' + msgStr; |
| } |
| |
| /** |
| * Create a instance of this object if it is not initialized. Currently, this object is |
| * a singleton and should only be created during catalogD initialization time, so that |
| * the start syncId matches with the catalogD startup time. |
| * |
| * @param catalogOpExecutor the CatalogOpExecutor instance to which this event |
| * processor belongs. |
| * @param startSyncFromId Start event id. Events will be polled starting from this |
| * event id |
| * @param eventPollingInterval HMS polling interval in seconds |
| * @return this object is already created, or create a new one if it is not yet |
| * instantiated |
| */ |
| public static synchronized ExternalEventsProcessor getInstance( |
| CatalogOpExecutor catalogOpExecutor, long startSyncFromId, |
| long eventPollingInterval) throws CatalogException { |
| if (instance != null) return instance; |
| instance = |
| new MetastoreEventsProcessor(catalogOpExecutor, startSyncFromId, |
| eventPollingInterval); |
| return instance; |
| } |
| |
| @Override |
| public MetastoreEventFactory getEventsFactory() { |
| return metastoreEventFactory_; |
| } |
| |
| public static MessageDeserializer getMessageDeserializer() { |
| return MESSAGE_DESERIALIZER; |
| } |
| |
| public static List<String> getEventSkipList() { return EVENT_SKIP_LIST; } |
| } |