blob: ac505628a768327ba8eea070d658dd642e39203e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.events;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.common.Metrics;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A metastore event is a instance of the class
* <code>org.apache.hadoop.hive.metastore.api.NotificationEvent</code>. Metastore can be
* configured, to work with Listeners which are called on various DDL operations like
* create/alter/drop operations on database, table, partition etc. Each event has a unique
* incremental id and the generated events are be fetched from Metastore to get
* incremental updates to the metadata stored in Hive metastore using the the public API
* <code>get_next_notification</code> These events could be generated by external
* Metastore clients like Apache Hive or Apache Spark as well as other Impala clusters
* configured to talk with the same metastore.
*
* This class is used to poll metastore for such events at a given frequency. By observing
* such events, we can take appropriate action on the catalogD
* (refresh/invalidate/add/remove) so that catalog represents the latest information
* available in metastore. We keep track of the last synced event id in each polling
* iteration so the next batch can be requested appropriately. The current batch size is
* constant and set to MAX_EVENTS_PER_RPC.
*
* <pre>
* +---------------+ +----------------+ +--------------+
* |Catalog state | |Catalog | | |
* |stale | |State up-to-date| |Catalog state |
* |(apply event) | |(ignore) | |is newer than |
* | | | | |event |
* | | | | |(ignore) |
* +------+--------+ +-----+----------+ +-+------------+
* | | |
* | | |
* | | |
* | | |
* | | |
* +-----------V------------------V---------------------V-----------> Event Timeline
* ^
* |
* |
* |
* |
* E
*
* </pre>
* Consistency model: Events could be seen as DDLs operations from past either done from
* this same cluster or some other external system. For example in the events timeline
* given above, consider a Event E at any given time. The catalog state for the
* corresponding object of the event could either be stale, exactly-same or at a version
* which is higher than one provided by event. Catalog state should only be updated when
* it is stale with respect to the event. In order to determine if the catalog object is
* stale, we rely on a combination of creationTime and object version. A object in catalog
* is stale if and only if its creationTime is < creationTime of the object from event OR
* its version < version from event if createTime matches
*
* If the object has the same createTime and version when compared to event or if the
* createTime > createTime from the event, the event can be safely ignored.
*
* Following table shows the actions to be taken when the catalog state is stale.
*
* <pre>
* +----------------------------------------+
* | Catalog object state |
* +----------------------------+------------+------------+
* | Event type | Loaded | Incomplete | Not present|
* | | | | |
* +------------------------------------------------------+
* | | | | |
* | CREATE EVENT| removeAndAdd | Ignore | Add |
* | | | | |
* | | | | |
* | ALTER EVENT | Invalidate | Ignore | Ignore |
* | | | | |
* | | | | |
* | DROP EVENT | Remove | Remove | Ignore |
* | | | | |
* | | | | |
* | INSERT EVENT| Refresh | Ignore | Ignore |
* | | | | |
* +-------------+--------------+------------+------------+
* </pre>
*
* Currently event handlers rely on creation time on Database, Table and Partition to
* uniquely determine if the object from event is same as object in the catalog. This
* information is used to make sure that we are deleting the right incarnation of the
* object when compared to Metastore.
*
* Self-events:
* Events could be generated by this Catalog's operations as well. Invalidating table
* for such events is unnecessary and inefficient. In order to detect such self-events
* when catalog executes a DDL operation it appends the current catalog version to the
* list of version numbers for the in-flight events for the table. Events processor
* clears this version when the corresponding version number identified by serviceId is
* received in the event. This is needed since it is possible that a external
* non-Impala system which generates the event presents the same serviceId and version
* number later on. The algorithm to detect a self-event is as below.
*
* 1. Add the service id and expected catalog version to table/partition parameters
* when executing the DDL operation. When the HMS operation is successful, add the
* version number to the list of version for in-flight events at table level.
* 2. When the event is received, the first time you see the combination of serviceId
* and version number, event processor clears the version number from table's list and
* determines the event as self-generated (and hence ignored)
* 3. If the event data presents a unknown serviceId or if the version number is not
* present in the list of in-flight versions, event is not a self-event and needs to be
* processed.
*
* In order to limit the total memory footprint, only 10 version numbers are stored at
* the table. Since the event processor is expected to poll every few seconds this
* should be a reasonable bound which satisfies most use-cases. Otherwise, event
* processor may wrongly process a self-event to invalidate the table. In such a case,
* its a performance penalty not a correctness issue.
*
* All the operations which change the state of catalog cache while processing a certain
* event type must be atomic in nature. We rely on taking a write lock on version object
* in CatalogServiceCatalog to make sure that readers are blocked while the metadata
* update operation is being performed. Since the events are generated post-metastore
* operations, such catalog updates do not need to update the state in Hive Metastore.
*
* Error Handling: The event processor could be in ACTIVE, PAUSED, ERROR states. In case
* of any errors while processing the events the state of event processor changes to ERROR
* and no subsequent events are polled. In such a case a invalidate metadata command
* restarts the event polling which updates the lastSyncedEventId to the latest from
* metastore.
*/
public class MetastoreEventsProcessor implements ExternalEventsProcessor {
public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
"hive.metastore.notifications.add.thrift.objects";
private static final Logger LOG =
LoggerFactory.getLogger(MetastoreEventsProcessor.class);
private static final MessageDeserializer MESSAGE_DESERIALIZER =
MetastoreShim.getMessageDeserializer();
private static MetastoreEventsProcessor instance;
// maximum number of events to poll in each RPC
private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
// maximum time to wait for a clean shutdown of scheduler in seconds
private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10;
// time taken to fetch events during each poll
public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration";
// time taken to apply all the events received duration each poll
public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration";
// rate of events received per unit time
public static final String EVENTS_RECEIVED_METRIC = "events-received";
// total number of events which are skipped because of the flag setting
public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
// name of the event processor status metric
public static final String STATUS_METRIC = "status";
// last synced event id
public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
// metric name which counts the number of self-events which are skipped
public static final String NUMBER_OF_SELF_EVENTS = "self-events-skipped";
// metric name for number of tables which are invalidated by event processor so far
public static final String NUMBER_OF_TABLE_INVALIDATES = "tables-invalidated";
// possible status of event processor
public enum EventProcessorStatus {
PAUSED, // event processor is paused because catalog is being reset concurrently
ACTIVE, // event processor is scheduled at a given frequency
ERROR, // event processor is in error state and event processing has stopped
NEEDS_INVALIDATE, // event processor could not resolve certain events and needs a
// manual invalidate command to reset the state (See AlterEvent for a example)
STOPPED, // event processor is shutdown. No events will be processed
DISABLED // event processor is not configured to run
}
// current status of this event processor
private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;
// event factory which is used to get or create MetastoreEvents
private final MetastoreEventFactory metastoreEventFactory_;
// keeps track of the last event id which we have synced to
private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
// polling interval in seconds. Note this is a time we wait AFTER each fetch call
private final long pollingFrequencyInSec_;
// catalog service instance to be used while processing events
private final CatalogServiceCatalog catalog_;
// scheduler daemon thread executor for processing events at given frequency
private final ScheduledExecutorService scheduler_ = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("MetastoreEventsProcessor").build());
// metrics registry to keep track of metrics related to event processing
//TODO create a separate singleton class which wraps around this so that we don't
// have to pass it around as a argument in constructor in MetastoreEvents
private final Metrics metrics_ = new Metrics();
@VisibleForTesting
MetastoreEventsProcessor(CatalogServiceCatalog catalog, long startSyncFromId,
long pollingFrequencyInSec) throws CatalogException {
Preconditions.checkState(pollingFrequencyInSec > 0);
this.catalog_ = Preconditions.checkNotNull(catalog);
validateConfigs();
lastSyncedEventId_.set(startSyncFromId);
metastoreEventFactory_ = new MetastoreEventFactory(catalog_, metrics_);
pollingFrequencyInSec_ = pollingFrequencyInSec;
initMetrics();
}
/**
* Fetches the required metastore config values and validates them against the
* expected values. The configurations to validate are different for HMS-2 v/s HMS-3
* and hence it uses MetastoreShim to get the configurations which need to be validated.
* @throws CatalogException if one or more validations fail or if metastore is not
* accessible
*/
@VisibleForTesting
void validateConfigs() throws CatalogException {
List<ValidationResult> validationErrors = new ArrayList<>();
for (MetastoreEventProcessorConfig config : getEventProcessorConfigsToValidate()) {
String configKey = config.getValidator().getConfigKey();
try {
String value = getConfigValueFromMetastore(configKey, "");
ValidationResult result = config.validate(value);
if (!result.isValid()) validationErrors.add(result);
} catch (TException e) {
String msg = String.format("Unable to get configuration %s from metastore. Check "
+ "if metastore is accessible", configKey);
LOG.error(msg, e);
throw new CatalogException(msg);
}
if (!validationErrors.isEmpty()) {
LOG.error("Found {} incorrect metastore configuration(s).",
validationErrors.size());
for (ValidationResult invalidConfig: validationErrors) {
LOG.error(invalidConfig.getReason());
}
throw new CatalogException(String.format("Found %d incorrect metastore "
+ "configuration(s). Events processor cannot start. See ERROR log for more "
+ "details.", validationErrors.size()));
}
}
}
/**
* Returns the list of Metastore configurations to validate depending on the hive
* version
*/
public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
if (MetastoreShim.getMajorVersion() >= 2) {
return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML);
}
return Arrays.asList(MetastoreEventProcessorConfig.values());
}
private void initMetrics() {
metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC);
metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
metrics_.addMeter(EVENTS_RECEIVED_METRIC);
metrics_.addCounter(EVENTS_SKIPPED_METRIC);
metrics_.addGauge(STATUS_METRIC,
(Gauge<String>) () -> getStatus().toString());
metrics_.addGauge(LAST_SYNCED_ID_METRIC,
(Gauge<Long>) () -> lastSyncedEventId_.get());
metrics_.addCounter(NUMBER_OF_SELF_EVENTS);
metrics_.addCounter(NUMBER_OF_TABLE_INVALIDATES);
}
/**
* Schedules the daemon thread at a given frequency. It is important to note that this
* method schedules with FixedDelay instead of FixedRate. The reason it is scheduled at
* a fixedDelay is to make sure that we don't pile up the pending tasks in case each
* polling operation is taking longer than the given frequency. Because of the fixed
* delay, the new poll operation is scheduled at the time when previousPoll operation
* completes + givenDelayInSec
*/
@Override
public synchronized void start() {
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
startScheduler();
updateStatus(EventProcessorStatus.ACTIVE);
LOG.info(String.format("Successfully started metastore event processing."
+ " Polling interval: %d seconds.", pollingFrequencyInSec_));
}
/**
* Gets the current event processor status
*/
@VisibleForTesting
EventProcessorStatus getStatus() {
return eventProcessorStatus_;
}
/**
* Returns the value for a given config key from Hive Metastore.
* @param config Hive configuration name
* @param defaultVal Default value to return if config not present in Hive
*/
@VisibleForTesting
public String getConfigValueFromMetastore(String config, String defaultVal)
throws TException {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient();
return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
}
}
/**
* returns the current value of LastSyncedEventId. This method is not thread-safe and
* only to be used for testing purposes
*/
@VisibleForTesting
public long getLastSyncedEventId() {
return lastSyncedEventId_.get();
}
@VisibleForTesting
void startScheduler() {
Preconditions.checkState(pollingFrequencyInSec_ > 0);
LOG.info(String.format("Starting metastore event polling with interval %d seconds.",
pollingFrequencyInSec_));
scheduler_.scheduleWithFixedDelay(this::processEvents, pollingFrequencyInSec_,
pollingFrequencyInSec_, TimeUnit.SECONDS);
}
/**
* Stops the event processing and changes the status of event processor to
* <code>EventProcessorStatus.PAUSED</code>. No new events will be processed as long
* the status is stopped. If this event processor is actively processing events when
* stop is called, this method blocks until the current processing is complete
*/
@Override
public synchronized void pause() {
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.PAUSED);
updateStatus(EventProcessorStatus.PAUSED);
LOG.info(String.format("Event processing is paused. Last synced event id is %d",
lastSyncedEventId_.get()));
}
/**
* Get the current notification event id from metastore
*/
@Override
public long getCurrentEventId() throws CatalogException {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
} catch (TException e) {
throw new CatalogException("Unable to fetch the current notification event id. "
+ "Check if metastore service is accessible");
}
}
/**
* Starts the event processor from a given event id
*/
@Override
public synchronized void start(long fromEventId) {
Preconditions.checkArgument(fromEventId >= 0);
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE,
"Event processing start called when it is already active");
long prevLastSyncedEventId = lastSyncedEventId_.get();
lastSyncedEventId_.set(fromEventId);
updateStatus(EventProcessorStatus.ACTIVE);
LOG.info(String.format(
"Metastore event processing restarted. Last synced event id was updated "
+ "from %d to %d", prevLastSyncedEventId, lastSyncedEventId_.get()));
}
/**
* Stops the event processing and shuts down the scheduler. In case there is a batch of
* events which is being processed currently, the
* <code>processEvents</code> method releases lock after every event is processed.
* Hence, it is possible that at-least 1 event from the batch be
* processed while shutdown() waits to acquire lock on this object.
*/
@Override
public synchronized void shutdown() {
Preconditions.checkNotNull(scheduler_);
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.STOPPED,
"Event processing is already stopped");
shutdownAndAwaitTermination();
updateStatus(EventProcessorStatus.STOPPED);
LOG.info("Metastore event processing stopped.");
}
/**
* Attempts to cleanly shutdown the scheduler pool. If the pool does not shutdown
* within timeout, does a force shutdown which might interrupt currently running tasks.
*/
private synchronized void shutdownAndAwaitTermination() {
scheduler_.shutdown(); // disable new tasks from being submitted
try {
// wait for 10 secs for scheduler to complete currently running tasks
if (!scheduler_.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
// executor couldn't terminate and timed-out, force the termination
LOG.info(String.format("Scheduler pool did not terminate within %d seconds. "
+ "Attempting to stop currently running tasks", SCHEDULER_SHUTDOWN_TIMEOUT));
scheduler_.shutdownNow();
}
} catch (InterruptedException e) {
// current thread interrupted while pool was waiting for termination
// issue a shutdownNow before returning to cancel currently running tasks
LOG.info("Received interruptedException. Terminating currently running tasks.", e);
scheduler_.shutdownNow();
}
}
/**
* Fetch the next batch of NotificationEvents from metastore. The default batch size if
* <code>EVENTS_BATCH_SIZE_PER_RPC</code>
*/
@VisibleForTesting
protected List<NotificationEvent> getNextMetastoreEvents()
throws MetastoreNotificationFetchException {
final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
long lastSyncedEventId = lastSyncedEventId_.get();
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// fetch the current notification event id. We assume that the polling interval
// is small enough that most of these polling operations result in zero new
// events. In such a case, fetching current notification event id is much faster
// (and cheaper on HMS side) instead of polling for events directly
CurrentNotificationEventId currentNotificationEventId =
msClient.getHiveClient().getCurrentNotificationEventId();
long currentEventId = currentNotificationEventId.getEventId();
// no new events since we last polled
if (currentEventId <= lastSyncedEventId) {
return Collections.emptyList();
}
NotificationEventResponse response = msClient.getHiveClient()
.getNextNotification(lastSyncedEventId, EVENTS_BATCH_SIZE_PER_RPC, null);
LOG.info(String.format("Received %d events. Start event id : %d",
response.getEvents().size(), lastSyncedEventId));
return response.getEvents();
} catch (TException e) {
throw new MetastoreNotificationFetchException(
"Unable to fetch notifications from metastore. Last synced event id is "
+ lastSyncedEventId, e);
} finally {
context.stop();
}
}
/**
* This method issues a request to Hive Metastore if needed, based on the current event
* id in metastore and the last synced event_id. Events are fetched in fixed sized
* batches. Each NotificationEvent received is processed by its corresponding
* <code>MetastoreEvent</code>
*/
@Override
public void processEvents() {
NotificationEvent lastProcessedEvent = null;
try {
EventProcessorStatus currentStatus = eventProcessorStatus_;
if (currentStatus != EventProcessorStatus.ACTIVE) {
LOG.warn(String.format(
"Event processing is skipped since status is %s. Last synced event id is %d",
currentStatus, lastSyncedEventId_.get()));
return;
}
List<NotificationEvent> events = getNextMetastoreEvents();
processEvents(events);
} catch (MetastoreNotificationFetchException ex) {
// No need to change the EventProcessor state to error since we want the
// EventProcessor to continue getting new events after HMS is back up.
LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore " +
"may be unavailable. Will retry.", ex);
} catch(MetastoreNotificationNeedsInvalidateException ex) {
updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
LOG.error("Event processing needs a invalidate command to resolve the state", ex);
} catch (Exception ex) {
// There are lot of Preconditions which can throw RuntimeExceptions when we
// process events this catch all exception block is needed so that the scheduler
// thread does not die silently
updateStatus(EventProcessorStatus.ERROR);
LOG.error("Unexpected exception received while processing event", ex);
dumpEventInfoToLog(lastProcessedEvent);
}
}
/**
* Gets the current event processor metrics along with its status. If the status is
* not active the metrics are skipped. Only the status is sent
*/
@Override
public TEventProcessorMetrics getEventProcessorMetrics() {
TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics();
EventProcessorStatus currentStatus = getStatus();
eventProcessorMetrics.setStatus(currentStatus.toString());
eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics;
long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
double avgFetchDuration =
metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getMeanRate();
double avgProcessDuration =
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getMeanRate();
double avgNumberOfEventsReceived1Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
double avgNumberOfEventsReceived5Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
double avgNumberOfEventsReceived15Min =
metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();
eventProcessorMetrics.setEvents_received(eventsReceived);
eventProcessorMetrics.setEvents_skipped(eventsSkipped);
eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min);
eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min);
eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min);
LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process "
+ "duration: {} Events received rate (1min) : {}",
eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration,
avgNumberOfEventsReceived1Min);
return eventProcessorMetrics;
}
@Override
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
TEventProcessorMetricsSummaryResponse summaryResponse =
new TEventProcessorMetricsSummaryResponse();
summaryResponse.setSummary(metrics_.toString());
return summaryResponse;
}
@VisibleForTesting
Metrics getMetrics() {
return metrics_;
}
/**
* Process the given list of notification events. Useful for tests which provide a list
* of events
*/
@VisibleForTesting
protected void processEvents(List<NotificationEvent> events)
throws MetastoreNotificationException {
NotificationEvent lastProcessedEvent = null;
// update the events received metric before returning
metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
if (events.isEmpty()) return;
final Timer.Context context =
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
try {
List<MetastoreEvent> filteredEvents =
metastoreEventFactory_.getFilteredEvents(events);
if (filteredEvents.isEmpty()) {
lastSyncedEventId_.set(events.get(events.size() - 1).getEventId());
return;
}
for (MetastoreEvent event : filteredEvents) {
// synchronizing each event processing reduces the scope of the lock so the a
// potential reset() during event processing is not blocked for longer than
// necessary
synchronized (this) {
if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
break;
}
lastProcessedEvent = event.metastoreNotificationEvent_;
event.processIfEnabled();
lastSyncedEventId_.set(event.eventId_);
}
}
} catch (CatalogException e) {
throw new MetastoreNotificationException(String.format(
"Unable to process event %d of type %s. Event processing will be stopped.",
lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), e);
} finally {
context.stop();
}
}
/**
* Updates the current states to the given status.
*/
private synchronized void updateStatus(EventProcessorStatus toStatus) {
eventProcessorStatus_ = toStatus;
}
private void dumpEventInfoToLog(NotificationEvent event) {
if (event == null) {
LOG.error("Notification event is null");
return;
}
StringBuilder msg =
new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n")
.append("Event Type: ").append(event.getEventType()).append("\n")
.append("Event time: ").append(event.getEventTime()).append("\n")
.append("Database name: ").append(event.getDbName()).append("\n");
if (event.getTableName() != null) {
msg.append("Table name: ").append(event.getTableName()).append("\n");
}
msg.append("Event message: ").append(event.getMessage()).append("\n");
LOG.error(msg.toString());
}
/**
* Create a instance of this object if it is not initialized. Currently, this object is
* a singleton and should only be created during catalogD initialization time, so that
* the start syncId matches with the catalogD startup time.
*
* @param catalog the CatalogServiceCatalog instance to which this event processing
* belongs
* @param startSyncFromId Start event id. Events will be polled starting from this
* event id
* @param eventPollingInterval HMS polling interval in seconds
* @return this object is already created, or create a new one if it is not yet
* instantiated
*/
public static synchronized ExternalEventsProcessor getInstance(
CatalogServiceCatalog catalog, long startSyncFromId, long eventPollingInterval)
throws CatalogException {
if (instance != null) {
return instance;
}
instance =
new MetastoreEventsProcessor(catalog, startSyncFromId, eventPollingInterval);
return instance;
}
@VisibleForTesting
public MetastoreEventFactory getMetastoreEventFactory() {
return metastoreEventFactory_;
}
public static MessageDeserializer getMessageDeserializer() {
return MESSAGE_DESERIALIZER;
}
}