blob: c2ae5122b8eec649ef62734f2efdfac759d53d4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
public class EntityNotificationSender<T> {
private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationSender.class);
private final static boolean NOTIFY_POST_COMMIT_DEFAULT = true;
private final NotificationSender<T> notificationSender;
public EntityNotificationSender(NotificationInterface notificationInterface, Configuration configuration) {
this(notificationInterface, configuration != null ? configuration.getBoolean("atlas.notification.send.postcommit", NOTIFY_POST_COMMIT_DEFAULT) : NOTIFY_POST_COMMIT_DEFAULT);
}
public EntityNotificationSender(NotificationInterface notificationInterface, boolean sendPostCommit) {
if (sendPostCommit) {
LOG.info("EntityNotificationSender: notifications will be sent after transaction commit");
this.notificationSender = new PostCommitNotificationSender(notificationInterface);
} else {
LOG.info("EntityNotificationSender: notifications will be sent inline (i.e. not waiting for transaction to commit)");
this.notificationSender = new InlineNotificationSender(notificationInterface);
}
}
public void send(List<T> notifications) throws NotificationException {
this.notificationSender.send(notifications);
}
private interface NotificationSender<T> {
void send(List<T> notifications) throws NotificationException;
}
private class InlineNotificationSender<T> implements NotificationSender<T> {
private final NotificationInterface notificationInterface;
public InlineNotificationSender(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
@Override
public void send(List<T> notifications) throws NotificationException {
notificationInterface.send(ENTITIES, notifications);
}
}
private class PostCommitNotificationSender<T> implements NotificationSender<T> {
private final NotificationInterface notificationInterface;
private final ThreadLocal<PostCommitNotificationHook> postCommitNotificationHooks = new ThreadLocal<>();
public PostCommitNotificationSender(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
@Override
public void send(List<T> notifications) throws NotificationException {
PostCommitNotificationHook notificationHook = postCommitNotificationHooks.get();
if (notificationHook == null) {
notificationHook = new PostCommitNotificationHook(notifications);
postCommitNotificationHooks.set(notificationHook);
} else {
notificationHook.addNotifications(notifications);
}
}
class PostCommitNotificationHook<T> extends GraphTransactionInterceptor.PostTransactionHook {
private final List<T> notifications = new ArrayList<>();
public PostCommitNotificationHook(List<T> notifications) {
this.addNotifications(notifications);
}
public void addNotifications(List<T> notifications) {
if (notifications != null) {
this.notifications.addAll(notifications);
}
}
@Override
public void onComplete(boolean isSuccess) {
postCommitNotificationHooks.remove();
if (CollectionUtils.isNotEmpty(notifications)) {
if (isSuccess) {
try {
notificationInterface.send(ENTITIES, notifications);
} catch (NotificationException excp) {
LOG.error("failed to send entity notifications", excp);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction not committed. Not sending {} notifications: {}", notifications.size(), notifications);
}
}
}
}
}
}
}