HCAT-548 Move topic creation in NotificationListener to a separate method
git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1408429 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f567804..22d7da2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-548 Move topic creation in NotificationListener to a separate method (amalakar via avandana)
+
HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
HCAT-532 HiveClientCache shutdown hook should log at debug level (traviscrawford)
diff --git a/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java b/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
index 5c6b8ad..cd7530b 100644
--- a/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
+++ b/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
@@ -34,6 +34,7 @@
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -266,7 +267,6 @@
* select messages in client side.
*/
protected void send(Object msgBody, String topicName, String event) {
-
try {
Destination topic = null;
@@ -280,18 +280,7 @@
return;
}
}
- try {
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise) {
- // this will happen if we were able to establish connection once, but
- // its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
- }
+ topic = getTopic(topicName);
if (null == topic) {
// Still not successful, return from here.
LOG.error("Invalid session. Failed to send message on topic: "
@@ -318,10 +307,34 @@
} catch (Exception e) {
// Gobble up the exception. Message delivery is best effort.
LOG.error("Failed to send message on topic: " + topicName + " event: "
- + event, e);
+ + event, e);
}
}
+ /**
+ * Get the topic object for the topicName, it also tries to reconnect
+ * if the connection appears to be broken.
+ *
+ * @param topicName
+ * @return
+ * @throws JMSException
+ */
+ protected Topic getTopic(final String topicName) throws JMSException {
+ Topic topic;
+ try {
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise) {
+ // this will happen if we were able to establish connection once, but its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ return topic;
+ }
+
protected void createConnection() {
Context jndiCntxt;