| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog.events; |
| |
| |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Timer; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; |
| import org.apache.hadoop.hive.metastore.api.NotificationEvent; |
| import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; |
| import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; |
| import org.apache.impala.catalog.CatalogException; |
| import org.apache.impala.catalog.CatalogServiceCatalog; |
| import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; |
| import org.apache.impala.catalog.events.ConfigValidator.ValidationResult; |
| import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; |
| import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory; |
| import org.apache.impala.common.Metrics; |
| import org.apache.impala.compat.MetastoreShim; |
| import org.apache.impala.thrift.TEventProcessorMetrics; |
| import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse; |
| import org.apache.impala.util.MetaStoreUtil; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A metastore event is a instance of the class |
| * <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be |
| * configured, to work with Listeners which are called on various DDL operations like |
| * create/alter/drop operations on database, table, partition etc. Each event has a unique |
| * incremental id and the generated events are be fetched from Metastore to get |
| * incremental updates to the metadata stored in Hive metastore using the the public API |
| * <code>get_next_notification</code> These events could be generated by external |
| * Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters |
| * configured to talk with the same metastore. |
| * |
| * This class is used to poll metastore for such events at a given frequency. By observing |
| * such events, we can take appropriate action on the catalogD |
| * (refresh/invalidate/add/remove) so that catalog represents the latest information |
| * available in metastore. We keep track of the last synced event id in each polling |
| * iteration so the next batch can be requested appropriately. The current batch size is |
| * constant and set to MAX_EVENTS_PER_RPC. |
| * |
| * <pre> |
| * +---------------+ +----------------+ +--------------+ |
| * |Catalog state | |Catalog | | | |
| * |stale | |State up-to-date| |Catalog state | |
| * |(apply event) | |(ignore) | |is newer than | |
| * | | | | |event | |
| * | | | | |(ignore) | |
| * +------+--------+ +-----+----------+ +-+------------+ |
| * | | | |
| * | | | |
| * | | | |
| * | | | |
| * | | | |
| * +-----------V------------------V---------------------V-----------> Event Timeline |
| * ^ |
| * | |
| * | |
| * | |
| * | |
| * E |
| * |
| * </pre> |
| * Consistency model: Events could be seen as DDLs operations from past either done from |
| * this same cluster or some other external system. For example in the events timeline |
| * given above, consider a Event E at any given time. The catalog state for the |
| * corresponding object of the event could either be stale, exactly-same or at a version |
| * which is higher than one provided by event. Catalog state should only be updated when |
| * it is stale with respect to the event. In order to determine if the catalog object is |
| * stale, we rely on a combination of creationTime and object version. A object in catalog |
| * is stale if and only if its creationTime is < creationTime of the object from event OR |
| * its version < version from event if createTime matches |
| * |
| * If the object has the same createTime and version when compared to event or if the |
| * createTime > createTime from the event, the event can be safely ignored. |
| * |
| * Following table shows the actions to be taken when the catalog state is stale. |
| * |
| * <pre> |
| * +----------------------------------------+ |
| * | Catalog object state | |
| * +----------------------------+------------+------------+ |
| * | Event type | Loaded | Incomplete | Not present| |
| * | | | | | |
| * +------------------------------------------------------+ |
| * | | | | | |
| * | CREATE EVENT| removeAndAdd | Ignore | Add | |
| * | | | | | |
| * | | | | | |
| * | ALTER EVENT | Invalidate | Ignore | Ignore | |
| * | | | | | |
| * | | | | | |
| * | DROP EVENT | Remove | Remove | Ignore | |
| * | | | | | |
| * | | | | | |
| * | INSERT EVENT| Refresh | Ignore | Ignore | |
| * | | | | | |
| * +-------------+--------------+------------+------------+ |
| * </pre> |
| * |
| * Currently event handlers rely on creation time on Database, Table and Partition to |
| * uniquely determine if the object from event is same as object in the catalog. This |
| * information is used to make sure that we are deleting the right incarnation of the |
| * object when compared to Metastore. |
| * |
| * Self-events: |
| * Events could be generated by this Catalog's operations as well. Invalidating table |
| * for such events is unnecessary and inefficient. In order to detect such self-events |
| * when catalog executes a DDL operation it appends the current catalog version to the |
| * list of version numbers for the in-flight events for the table. Events processor |
| * clears this version when the corresponding version number identified by serviceId is |
| * received in the event. This is needed since it is possible that a external |
| * non-Impala system which generates the event presents the same serviceId and version |
| * number later on. The algorithm to detect a self-event is as below. |
| * |
| * 1. Add the service id and expected catalog version to table/partition parameters |
| * when executing the DDL operation. When the HMS operation is successful, add the |
| * version number to the list of version for in-flight events at table level. |
| * 2. When the event is received, the first time you see the combination of serviceId |
| * and version number, event processor clears the version number from table's list and |
| * determines the event as self-generated (and hence ignored) |
| * 3. If the event data presents a unknown serviceId or if the version number is not |
| * present in the list of in-flight versions, event is not a self-event and needs to be |
| * processed. |
| * |
| * In order to limit the total memory footprint, only 10 version numbers are stored at |
| * the table. Since the event processor is expected to poll every few seconds this |
| * should be a reasonable bound which satisfies most use-cases. Otherwise, event |
| * processor may wrongly process a self-event to invalidate the table. In such a case, |
| * its a performance penalty not a correctness issue. |
| * |
| * All the operations which change the state of catalog cache while processing a certain |
| * event type must be atomic in nature. We rely on taking a write lock on version object |
| * in CatalogServiceCatalog to make sure that readers are blocked while the metadata |
| * update operation is being performed. Since the events are generated post-metastore |
| * operations, such catalog updates do not need to update the state in Hive Metastore. |
| * |
| * Error Handling: The event processor could be in ACTIVE, PAUSED, ERROR states. In case |
| * of any errors while processing the events the state of event processor changes to ERROR |
| * and no subsequent events are polled. In such a case a invalidate metadata command |
| * restarts the event polling which updates the lastSyncedEventId to the latest from |
| * metastore. |
| */ |
| public class MetastoreEventsProcessor implements ExternalEventsProcessor { |
| |
| public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = |
| "hive.metastore.notifications.add.thrift.objects"; |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(MetastoreEventsProcessor.class); |
| |
| private static final MessageDeserializer MESSAGE_DESERIALIZER = |
| MetastoreShim.getMessageDeserializer(); |
| |
| private static MetastoreEventsProcessor instance; |
| |
| // maximum number of events to poll in each RPC |
| private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000; |
| |
| // maximum time to wait for a clean shutdown of scheduler in seconds |
| private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10; |
| |
| // time taken to fetch events during each poll |
| public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration"; |
| // time taken to apply all the events received duration each poll |
| public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration"; |
| // rate of events received per unit time |
| public static final String EVENTS_RECEIVED_METRIC = "events-received"; |
| // total number of events which are skipped because of the flag setting |
| public static final String EVENTS_SKIPPED_METRIC = "events-skipped"; |
| // name of the event processor status metric |
| public static final String STATUS_METRIC = "status"; |
| // last synced event id |
| public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id"; |
| // metric name which counts the number of self-events which are skipped |
| public static final String NUMBER_OF_SELF_EVENTS = "self-events-skipped"; |
| // metric name for number of tables which are refreshed by event processor so far |
| public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed"; |
| // number of times events processor refreshed a partition |
| public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed"; |
| |
| // possible status of event processor |
| public enum EventProcessorStatus { |
| PAUSED, // event processor is paused because catalog is being reset concurrently |
| ACTIVE, // event processor is scheduled at a given frequency |
| ERROR, // event processor is in error state and event processing has stopped |
| NEEDS_INVALIDATE, // event processor could not resolve certain events and needs a |
| // manual invalidate command to reset the state (See AlterEvent for a example) |
| STOPPED, // event processor is shutdown. No events will be processed |
| DISABLED // event processor is not configured to run |
| } |
| |
| // current status of this event processor |
| private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED; |
| |
| // event factory which is used to get or create MetastoreEvents |
| private final MetastoreEventFactory metastoreEventFactory_; |
| |
| // keeps track of the last event id which we have synced to |
| private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1); |
| |
| // polling interval in seconds. Note this is a time we wait AFTER each fetch call |
| private final long pollingFrequencyInSec_; |
| |
| // catalog service instance to be used while processing events |
| private final CatalogServiceCatalog catalog_; |
| |
| // scheduler daemon thread executor for processing events at given frequency |
| private final ScheduledExecutorService scheduler_ = Executors |
| .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) |
| .setNameFormat("MetastoreEventsProcessor").build()); |
| |
| // metrics registry to keep track of metrics related to event processing |
| //TODO create a separate singleton class which wraps around this so that we don't |
| // have to pass it around as a argument in constructor in MetastoreEvents |
| private final Metrics metrics_ = new Metrics(); |
| |
| @VisibleForTesting |
| MetastoreEventsProcessor(CatalogServiceCatalog catalog, long startSyncFromId, |
| long pollingFrequencyInSec) throws CatalogException { |
| Preconditions.checkState(pollingFrequencyInSec > 0); |
| this.catalog_ = Preconditions.checkNotNull(catalog); |
| validateConfigs(); |
| lastSyncedEventId_.set(startSyncFromId); |
| metastoreEventFactory_ = new MetastoreEventFactory(catalog_, metrics_); |
| pollingFrequencyInSec_ = pollingFrequencyInSec; |
| initMetrics(); |
| } |
| |
| /** |
| * Fetches the required metastore config values and validates them against the |
| * expected values. The configurations to validate are different for HMS-2 v/s HMS-3 |
| * and hence it uses MetastoreShim to get the configurations which need to be validated. |
| * @throws CatalogException if one or more validations fail or if metastore is not |
| * accessible |
| */ |
| @VisibleForTesting |
| void validateConfigs() throws CatalogException { |
| List<ValidationResult> validationErrors = new ArrayList<>(); |
| for (MetastoreEventProcessorConfig config : getEventProcessorConfigsToValidate()) { |
| String configKey = config.getValidator().getConfigKey(); |
| try { |
| String value = getConfigValueFromMetastore(configKey, ""); |
| ValidationResult result = config.validate(value); |
| if (!result.isValid()) validationErrors.add(result); |
| } catch (TException e) { |
| String msg = String.format("Unable to get configuration %s from metastore. Check " |
| + "if metastore is accessible", configKey); |
| LOG.error(msg, e); |
| throw new CatalogException(msg); |
| } |
| if (!validationErrors.isEmpty()) { |
| LOG.error("Found {} incorrect metastore configuration(s).", |
| validationErrors.size()); |
| for (ValidationResult invalidConfig: validationErrors) { |
| LOG.error(invalidConfig.getReason()); |
| } |
| throw new CatalogException(String.format("Found %d incorrect metastore " |
| + "configuration(s). Events processor cannot start. See ERROR log for more " |
| + "details.", validationErrors.size())); |
| } |
| } |
| } |
| |
| /** |
| * Returns the list of Metastore configurations to validate depending on the hive |
| * version |
| */ |
| public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() { |
| if (MetastoreShim.getMajorVersion() >= 2) { |
| return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML); |
| } |
| return Arrays.asList(MetastoreEventProcessorConfig.values()); |
| } |
| |
| private void initMetrics() { |
| metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC); |
| metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC); |
| metrics_.addMeter(EVENTS_RECEIVED_METRIC); |
| metrics_.addCounter(EVENTS_SKIPPED_METRIC); |
| metrics_.addGauge(STATUS_METRIC, |
| (Gauge<String>) () -> getStatus().toString()); |
| metrics_.addGauge(LAST_SYNCED_ID_METRIC, |
| (Gauge<Long>) () -> lastSyncedEventId_.get()); |
| metrics_.addCounter(NUMBER_OF_SELF_EVENTS); |
| metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES); |
| metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES); |
| } |
| |
| /** |
| * Schedules the daemon thread at a given frequency. It is important to note that this |
| * method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at |
| * a fixedDelay is to make sure that we don't pile up the pending tasks in case each |
| * polling operation is taking longer than the given frequency. Because of the fixed |
| * delay, the new poll operation is scheduled at the time when previousPoll operation |
| * completes + givenDelayInSec |
| */ |
| @Override |
| public synchronized void start() { |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE); |
| startScheduler(); |
| updateStatus(EventProcessorStatus.ACTIVE); |
| LOG.info(String.format("Successfully started metastore event processing." |
| + " Polling interval: %d seconds.", pollingFrequencyInSec_)); |
| } |
| |
| /** |
| * Gets the current event processor status |
| */ |
| public EventProcessorStatus getStatus() { |
| return eventProcessorStatus_; |
| } |
| |
| /** |
| * Returns the value for a given config key from Hive Metastore. |
| * @param config Hive configuration name |
| * @param defaultVal Default value to return if config not present in Hive |
| */ |
| @VisibleForTesting |
| public String getConfigValueFromMetastore(String config, String defaultVal) |
| throws TException { |
| try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { |
| IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient(); |
| return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal); |
| } |
| } |
| /** |
| * returns the current value of LastSyncedEventId. This method is not thread-safe and |
| * only to be used for testing purposes |
| */ |
| @VisibleForTesting |
| public long getLastSyncedEventId() { |
| return lastSyncedEventId_.get(); |
| } |
| |
| @VisibleForTesting |
| void startScheduler() { |
| Preconditions.checkState(pollingFrequencyInSec_ > 0); |
| LOG.info(String.format("Starting metastore event polling with interval %d seconds.", |
| pollingFrequencyInSec_)); |
| scheduler_.scheduleWithFixedDelay(this::processEvents, pollingFrequencyInSec_, |
| pollingFrequencyInSec_, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Stops the event processing and changes the status of event processor to |
| * <code>EventProcessorStatus.PAUSED</code>. No new events will be processed as long |
| * the status is stopped. If this event processor is actively processing events when |
| * stop is called, this method blocks until the current processing is complete |
| */ |
| @Override |
| public synchronized void pause() { |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.PAUSED); |
| updateStatus(EventProcessorStatus.PAUSED); |
| LOG.info(String.format("Event processing is paused. Last synced event id is %d", |
| lastSyncedEventId_.get())); |
| } |
| |
| /** |
| * Get the current notification event id from metastore |
| */ |
| @Override |
| public long getCurrentEventId() throws CatalogException { |
| try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { |
| return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId(); |
| } catch (TException e) { |
| throw new CatalogException("Unable to fetch the current notification event id. " |
| + "Check if metastore service is accessible"); |
| } |
| } |
| |
| /** |
| * Starts the event processor from a given event id |
| */ |
| @Override |
| public synchronized void start(long fromEventId) { |
| Preconditions.checkArgument(fromEventId >= 0); |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE, |
| "Event processing start called when it is already active"); |
| long prevLastSyncedEventId = lastSyncedEventId_.get(); |
| lastSyncedEventId_.set(fromEventId); |
| updateStatus(EventProcessorStatus.ACTIVE); |
| LOG.info(String.format( |
| "Metastore event processing restarted. Last synced event id was updated " |
| + "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_.get())); |
| } |
| |
| /** |
| * Stops the event processing and shuts down the scheduler. In case there is a batch of |
| * events which is being processed currently, the |
| * <code>processEvents</code> method releases lock after every event is processed. |
| * Hence, it is possible that at-least 1 event from the batch be |
| * processed while shutdown() waits to acquire lock on this object. |
| */ |
| @Override |
| public synchronized void shutdown() { |
| Preconditions.checkNotNull(scheduler_); |
| Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED, |
| "Event processing is already stopped"); |
| shutdownAndAwaitTermination(); |
| updateStatus(EventProcessorStatus.STOPPED); |
| LOG.info("Metastore event processing stopped."); |
| } |
| |
| /** |
| * Attempts to cleanly shutdown the scheduler pool. If the pool does not shutdown |
| * within timeout, does a force shutdown which might interrupt currently running tasks. |
| */ |
| private synchronized void shutdownAndAwaitTermination() { |
| scheduler_.shutdown(); // disable new tasks from being submitted |
| try { |
| // wait for 10 secs for scheduler to complete currently running tasks |
| if (!scheduler_.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { |
| // executor couldn't terminate and timed-out, force the termination |
| LOG.info(String.format("Scheduler pool did not terminate within %d seconds. " |
| + "Attempting to stop currently running tasks", SCHEDULER_SHUTDOWN_TIMEOUT)); |
| scheduler_.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| // current thread interrupted while pool was waiting for termination |
| // issue a shutdownNow before returning to cancel currently running tasks |
| LOG.info("Received interruptedException. Terminating currently running tasks.", e); |
| scheduler_.shutdownNow(); |
| } |
| } |
| |
| /** |
| * Fetch the next batch of NotificationEvents from metastore. The default batch size if |
| * <code>EVENTS_BATCH_SIZE_PER_RPC</code> |
| */ |
| @VisibleForTesting |
| protected List<NotificationEvent> getNextMetastoreEvents() |
| throws MetastoreNotificationFetchException { |
| final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time(); |
| long lastSyncedEventId = lastSyncedEventId_.get(); |
| try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { |
| // fetch the current notification event id. We assume that the polling interval |
| // is small enough that most of these polling operations result in zero new |
| // events. In such a case, fetching current notification event id is much faster |
| // (and cheaper on HMS side) instead of polling for events directly |
| CurrentNotificationEventId currentNotificationEventId = |
| msClient.getHiveClient().getCurrentNotificationEventId(); |
| long currentEventId = currentNotificationEventId.getEventId(); |
| |
| // no new events since we last polled |
| if (currentEventId <= lastSyncedEventId) { |
| return Collections.emptyList(); |
| } |
| |
| NotificationEventResponse response = msClient.getHiveClient() |
| .getNextNotification(lastSyncedEventId, EVENTS_BATCH_SIZE_PER_RPC, null); |
| LOG.info(String.format("Received %d events. Start event id : %d", |
| response.getEvents().size(), lastSyncedEventId)); |
| return response.getEvents(); |
| } catch (TException e) { |
| throw new MetastoreNotificationFetchException( |
| "Unable to fetch notifications from metastore. Last synced event id is " |
| + lastSyncedEventId, e); |
| } finally { |
| context.stop(); |
| } |
| } |
| |
| /** |
| * This method issues a request to Hive Metastore if needed, based on the current event |
| * id in metastore and the last synced event_id. Events are fetched in fixed sized |
| * batches. Each NotificationEvent received is processed by its corresponding |
| * <code>MetastoreEvent</code> |
| */ |
| @Override |
| public void processEvents() { |
| NotificationEvent lastProcessedEvent = null; |
| try { |
| EventProcessorStatus currentStatus = eventProcessorStatus_; |
| if (currentStatus != EventProcessorStatus.ACTIVE) { |
| LOG.warn(String.format( |
| "Event processing is skipped since status is %s. Last synced event id is %d", |
| currentStatus, lastSyncedEventId_.get())); |
| return; |
| } |
| |
| List<NotificationEvent> events = getNextMetastoreEvents(); |
| processEvents(events); |
| } catch (MetastoreNotificationFetchException ex) { |
| // No need to change the EventProcessor state to error since we want the |
| // EventProcessor to continue getting new events after HMS is back up. |
| LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore " + |
| "may be unavailable. Will retry.", ex); |
| } catch(MetastoreNotificationNeedsInvalidateException ex) { |
| updateStatus(EventProcessorStatus.NEEDS_INVALIDATE); |
| LOG.error("Event processing needs a invalidate command to resolve the state", ex); |
| } catch (Exception ex) { |
| // There are lot of Preconditions which can throw RuntimeExceptions when we |
| // process events this catch all exception block is needed so that the scheduler |
| // thread does not die silently |
| updateStatus(EventProcessorStatus.ERROR); |
| LOG.error("Unexpected exception received while processing event", ex); |
| dumpEventInfoToLog(lastProcessedEvent); |
| } |
| } |
| |
| /** |
| * Gets the current event processor metrics along with its status. If the status is |
| * not active the metrics are skipped. Only the status is sent |
| */ |
| @Override |
| public TEventProcessorMetrics getEventProcessorMetrics() { |
| TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics(); |
| EventProcessorStatus currentStatus = getStatus(); |
| eventProcessorMetrics.setStatus(currentStatus.toString()); |
| eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId()); |
| if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics; |
| |
| long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount(); |
| long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount(); |
| double avgFetchDuration = |
| metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getMeanRate(); |
| double avgProcessDuration = |
| metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getMeanRate(); |
| double avgNumberOfEventsReceived1Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate(); |
| double avgNumberOfEventsReceived5Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate(); |
| double avgNumberOfEventsReceived15Min = |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate(); |
| |
| |
| eventProcessorMetrics.setEvents_received(eventsReceived); |
| eventProcessorMetrics.setEvents_skipped(eventsSkipped); |
| eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration); |
| eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration); |
| eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min); |
| eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min); |
| eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min); |
| |
| LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process " |
| + "duration: {} Events received rate (1min) : {}", |
| eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration, |
| avgNumberOfEventsReceived1Min); |
| return eventProcessorMetrics; |
| } |
| |
| @Override |
| public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() { |
| TEventProcessorMetricsSummaryResponse summaryResponse = |
| new TEventProcessorMetricsSummaryResponse(); |
| summaryResponse.setSummary(metrics_.toString()); |
| return summaryResponse; |
| } |
| |
| @VisibleForTesting |
| Metrics getMetrics() { |
| return metrics_; |
| } |
| |
| /** |
| * Process the given list of notification events. Useful for tests which provide a list |
| * of events |
| */ |
| @VisibleForTesting |
| protected void processEvents(List<NotificationEvent> events) |
| throws MetastoreNotificationException { |
| NotificationEvent lastProcessedEvent = null; |
| // update the events received metric before returning |
| metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size()); |
| if (events.isEmpty()) return; |
| final Timer.Context context = |
| metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time(); |
| try { |
| List<MetastoreEvent> filteredEvents = |
| metastoreEventFactory_.getFilteredEvents(events); |
| if (filteredEvents.isEmpty()) { |
| lastSyncedEventId_.set(events.get(events.size() - 1).getEventId()); |
| return; |
| } |
| for (MetastoreEvent event : filteredEvents) { |
| // synchronizing each event processing reduces the scope of the lock so the a |
| // potential reset() during event processing is not blocked for longer than |
| // necessary |
| synchronized (this) { |
| if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) { |
| break; |
| } |
| lastProcessedEvent = event.metastoreNotificationEvent_; |
| event.processIfEnabled(); |
| lastSyncedEventId_.set(event.eventId_); |
| } |
| } |
| } catch (CatalogException e) { |
| throw new MetastoreNotificationException(String.format( |
| "Unable to process event %d of type %s. Event processing will be stopped.", |
| lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), e); |
| } finally { |
| context.stop(); |
| } |
| } |
| |
| /** |
| * Updates the current states to the given status. |
| */ |
| private synchronized void updateStatus(EventProcessorStatus toStatus) { |
| eventProcessorStatus_ = toStatus; |
| } |
| |
| private void dumpEventInfoToLog(NotificationEvent event) { |
| if (event == null) { |
| LOG.error("Notification event is null"); |
| return; |
| } |
| StringBuilder msg = |
| new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n") |
| .append("Event Type: ").append(event.getEventType()).append("\n") |
| .append("Event time: ").append(event.getEventTime()).append("\n") |
| .append("Database name: ").append(event.getDbName()).append("\n"); |
| if (event.getTableName() != null) { |
| msg.append("Table name: ").append(event.getTableName()).append("\n"); |
| } |
| msg.append("Event message: ").append(event.getMessage()).append("\n"); |
| LOG.error(msg.toString()); |
| } |
| |
| /** |
| * Create a instance of this object if it is not initialized. Currently, this object is |
| * a singleton and should only be created during catalogD initialization time, so that |
| * the start syncId matches with the catalogD startup time. |
| * |
| * @param catalog the CatalogServiceCatalog instance to which this event processing |
| * belongs |
| * @param startSyncFromId Start event id. Events will be polled starting from this |
| * event id |
| * @param eventPollingInterval HMS polling interval in seconds |
| * @return this object is already created, or create a new one if it is not yet |
| * instantiated |
| */ |
| public static synchronized ExternalEventsProcessor getInstance( |
| CatalogServiceCatalog catalog, long startSyncFromId, long eventPollingInterval) |
| throws CatalogException { |
| if (instance != null) { |
| return instance; |
| } |
| |
| instance = |
| new MetastoreEventsProcessor(catalog, startSyncFromId, eventPollingInterval); |
| return instance; |
| } |
| |
| @VisibleForTesting |
| public MetastoreEventFactory getMetastoreEventFactory() { |
| return metastoreEventFactory_; |
| } |
| |
| public static MessageDeserializer getMessageDeserializer() { |
| return MESSAGE_DESERIALIZER; |
| } |
| } |