blob: eeb8f80a1d1f2e95649823abd321819b14ce4d76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.core.common.utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is a simple publish-subscribe implementation for internal
* communication between Sentry components. It's a singleton class.
* <p>
* For the initial set of use cases, publish events are expected to be
* extremely rare, so no the data structures have been selected with no
* consideration for high concurrency.
*/
public final class PubSub {
private static final Logger LOGGER = LoggerFactory.getLogger(PubSub.class);
private final Map<Topic,Set<Subscriber>> subscriberMap = new HashMap<>();
private static PubSub instance;
/**
* Subscriber callback interface.
* The same subscriber can subscribe to multiple topics,
* so callback API includes both topic and message.
*/
public interface Subscriber {
void onMessage(Topic topic, String message);
}
/**
* Enumerated topics one can subscribe to.
* To be expanded as needed.
*/
public enum Topic {
HDFS_SYNC_HMS("hdfs-sync-hms"), // upcoming feature, triggering HDFS sync between HMS and Sentry
HDFS_SYNC_NN("hdfs-sync-nn"); // upcoming feature, triggering HDFS sync between Sentry and NameNode
private final String name;
private static final Map<String,Topic> map = new HashMap<>();
static {
for (Topic t : Topic.values()) {
map.put(t.name, t);
}
}
public static Topic fromString(String name) {
Preconditions.checkNotNull("Enum name cannot be null", name);
name = name.trim().toLowerCase();
if (map.containsKey(name)) {
return map.get(name);
}
throw new NoSuchElementException(name + " not found");
}
private Topic(String name) {
this.name = name.toLowerCase();
}
public String getName() {
return name;
}
}
/**
* Public factory method to guarantee singleton
*/
public static synchronized PubSub getInstance() {
if (instance != null) {
LOGGER.info(instance + " requested");
} else {
instance = new PubSub();
LOGGER.info(instance + " created");
}
return instance;
}
// declare private to prevent multiple class instantiation
private PubSub() {
}
/**
* Publish message on given topic. Message is optional.
*/
public synchronized void publish(Topic topic, String message) {
Preconditions.checkNotNull(topic, "Topic cannot be null");
Set<Subscriber> subscribers = subscriberMap.get(topic);
if (subscribers == null) {
throw new IllegalArgumentException("cannot publish to unknown topic " + topic
+ ", existing topics " + subscriberMap.keySet());
}
for (Subscriber subscriber : subscribers) {
// Faulire of one subscriber to process message delivery should not affect
// message delivery to other subscribers, therefore using try-catch.
try {
subscriber.onMessage(topic, message);
} catch (Exception e) {
LOGGER.error("Topic " + topic + ", message " + message + ", delivery error", e);
}
}
LOGGER.info("Topic " + topic + ", message " + message + ": " + subscribers.size() + " subscribers called");
}
/**
* Subscribe to given topic.
*/
public synchronized void subscribe(Topic topic, Subscriber subscriber) {
Preconditions.checkNotNull(topic, "Topic cannot be null");
Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic);
Set<Subscriber> subscribers = subscriberMap.get(topic);
if (subscribers == null) {
LOGGER.info("new topic " + topic);
subscriberMap.put(topic, subscribers = new HashSet<Subscriber>());
}
subscribers.add(subscriber);
LOGGER.info("Topic " + topic + ", added subscriber " + subscriber + ", total topic subscribers: " + subscribers.size());
}
/**
* Unsubscribe from given topic. If the last subscriber, remove the topic.
*/
public synchronized void unsubscribe(Topic topic, Subscriber subscriber) {
Preconditions.checkNotNull(topic, "Topic cannot be null");
Preconditions.checkNotNull(subscriber, "Topic %s: Subscriber cannot be null", topic);
Set<Subscriber> subscribers = subscriberMap.get(topic);
if (subscribers == null) {
throw new IllegalArgumentException("cannot unsubscribe from unknown topic " + topic);
}
if (!subscribers.remove(subscriber)) {
throw new IllegalArgumentException("cannot unsubscribe from topic " + topic + ", unknown subscriber");
}
LOGGER.info("Topic " + topic + ", unsubscribing subscriber " + subscriber + ", total topic subscribers: " + subscribers.size());
if (subscribers.isEmpty()) {
subscriberMap.remove(topic);
}
}
/**
* Get all existing topics.
*/
public synchronized Set<Topic> getTopics() {
return subscriberMap.keySet();
}
@Override
public String toString() {
return getClass().getSimpleName() + ":" + Integer.toHexString(hashCode());
}
}