blob: 0660042e1f6642eb3b980768ad331aec8aed987f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.listener;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hcatalog.common.HCatConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of
* {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
* message on two type of topics. One has name of form dbName.tblName On this
* topic, two kind of messages are sent: add/drop partition and
* finalize_partition message. Second topic has name "HCAT" and messages sent on
* it are: add/drop database and add/drop table. All messages also has a
* property named "HCAT_EVENT" set on them whose value can be used to configure
* message selector on subscriber side.
*/
public class NotificationListener extends MetaStoreEventListener {
private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
protected Session session;
protected Connection conn;
/**
* Create message bus connection and session in constructor.
*/
public NotificationListener(final Configuration conf) {
super(conf);
createConnection();
}
private static String getTopicName(Partition partition,
ListenerEvent partitionEvent) throws MetaException {
try {
return partitionEvent.getHandler()
.get_table(partition.getDbName(), partition.getTableName())
.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
} catch (NoSuchObjectException e) {
throw new MetaException(e.toString());
}
}
@Override
public void onAddPartition(AddPartitionEvent partitionEvent)
throws MetaException {
// Subscriber can get notification of newly add partition in a
// particular table by listening on a topic named "dbName.tableName"
// and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
if (partitionEvent.getStatus()) {
Partition partition = partitionEvent.getPartition();
String topicName = getTopicName(partition, partitionEvent);
if (topicName != null && !topicName.equals("")) {
send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
} else {
LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ partition.getDbName()
+ "."
+ partition.getTableName()
+ " To enable notifications for this table, please do alter table set properties ("
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ "=<dbname>.<tablename>) or whatever you want topic name to be.");
}
}
}
@Override
public void onDropPartition(DropPartitionEvent partitionEvent)
throws MetaException {
// Subscriber can get notification of dropped partition in a
// particular table by listening on a topic named "dbName.tableName"
// and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
// Datanucleus throws NPE when we try to serialize a partition object
// retrieved from metastore. To workaround that we reset following objects
if (partitionEvent.getStatus()) {
Partition partition = partitionEvent.getPartition();
StorageDescriptor sd = partition.getSd();
sd.setBucketCols(new ArrayList<String>());
sd.setSortCols(new ArrayList<Order>());
sd.setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
String topicName = getTopicName(partition, partitionEvent);
if (topicName != null && !topicName.equals("")) {
send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
} else {
LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ partition.getDbName()
+ "."
+ partition.getTableName()
+ " To enable notifications for this table, please do alter table set properties ("
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ "=<dbname>.<tablename>) or whatever you want topic name to be.");
}
}
}
@Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent)
throws MetaException {
// Subscriber can get notification about addition of a database in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_ADD_DATABASE"
if (dbEvent.getStatus())
send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
.getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
}
@Override
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
// Subscriber can get notification about drop of a database in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_DROP_DATABASE"
if (dbEvent.getStatus())
send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
.getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
}
@Override
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
// Subscriber can get notification about addition of a table in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_ADD_TABLE"
if (tableEvent.getStatus()) {
Table tbl = tableEvent.getTable();
HMSHandler handler = tableEvent.getHandler();
HiveConf conf = handler.getHiveConf();
Table newTbl;
try {
newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
.deepCopy();
newTbl.getParameters().put(
HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+ newTbl.getTableName().toLowerCase());
handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
} catch (InvalidOperationException e) {
MetaException me = new MetaException(e.toString());
me.initCause(e);
throw me;
} catch (NoSuchObjectException e) {
MetaException me = new MetaException(e.toString());
me.initCause(e);
throw me;
}
send(newTbl, getTopicPrefix(conf) + "."
+ newTbl.getDbName().toLowerCase(),
HCatConstants.HCAT_ADD_TABLE_EVENT);
}
}
private String getTopicPrefix(HiveConf conf) {
return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
}
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
// Subscriber can get notification about drop of a table in HCAT
// by listening on a topic named "HCAT" and message selector string
// as "HCAT_EVENT = HCAT_DROP_TABLE"
// Datanucleus throws NPE when we try to serialize a table object
// retrieved from metastore. To workaround that we reset following objects
if (tableEvent.getStatus()) {
Table table = tableEvent.getTable();
StorageDescriptor sd = table.getSd();
sd.setBucketCols(new ArrayList<String>());
sd.setSortCols(new ArrayList<Order>());
sd.setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
+ table.getDbName().toLowerCase(),
HCatConstants.HCAT_DROP_TABLE_EVENT);
}
}
/**
* @param msgBody
* is the metastore object. It is sent in full such that if
* subscriber is really interested in details, it can reconstruct it
* fully. In case of finalize_partition message this will be string
* specification of the partition.
* @param topicName
* is the name on message broker on which message is sent.
* @param event
* is the value of HCAT_EVENT property in message. It can be used to
* select messages in client side.
*/
protected void send(Object msgBody, String topicName, String event) {
try {
Destination topic = null;
if (null == session) {
// this will happen, if we never able to establish a connection.
createConnection();
if (null == session) {
// Still not successful, return from here.
LOG.error("Invalid session. Failed to send message on topic: "
+ topicName + " event: " + event);
return;
}
}
try {
// Topics are created on demand. If it doesn't exist on broker it will
// be created when broker receives this message.
topic = session.createTopic(topicName);
} catch (IllegalStateException ise) {
// this will happen if we were able to establish connection once, but
// its no longer valid,
// ise is thrown, catch it and retry.
LOG.error("Seems like connection is lost. Retrying", ise);
createConnection();
topic = session.createTopic(topicName);
}
if (null == topic) {
// Still not successful, return from here.
LOG.error("Invalid session. Failed to send message on topic: "
+ topicName + " event: " + event);
return;
}
MessageProducer producer = session.createProducer(topic);
Message msg;
if (msgBody instanceof Map) {
MapMessage mapMsg = session.createMapMessage();
Map<String, String> incomingMap = (Map<String, String>) msgBody;
for (Entry<String, String> partCol : incomingMap.entrySet()) {
mapMsg.setString(partCol.getKey(), partCol.getValue());
}
msg = mapMsg;
} else {
msg = session.createObjectMessage((Serializable) msgBody);
}
msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
producer.send(msg);
// Message must be transacted before we return.
session.commit();
} catch (Exception e) {
// Gobble up the exception. Message delivery is best effort.
LOG.error("Failed to send message on topic: " + topicName + " event: "
+ event, e);
}
}
protected void createConnection() {
Context jndiCntxt;
try {
jndiCntxt = new InitialContext();
ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
.lookup("ConnectionFactory");
Connection conn = connFac.createConnection();
conn.start();
conn.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException jmse) {
LOG.error(jmse.toString());
}
});
// We want message to be sent when session commits, thus we run in
// transacted mode.
session = conn.createSession(true, Session.SESSION_TRANSACTED);
} catch (NamingException e) {
LOG.error("JNDI error while setting up Message Bus connection. "
+ "Please make sure file named 'jndi.properties' is in "
+ "classpath and contains appropriate key-value pairs.", e);
} catch (JMSException e) {
LOG.error("Failed to initialize connection to message bus", e);
} catch (Throwable t) {
LOG.error("Unable to connect to JMS provider", t);
}
}
@Override
protected void finalize() throws Throwable {
// Close the connection before dying.
try {
if (null != session)
session.close();
if (conn != null) {
conn.close();
}
} catch (Exception ignore) {
LOG.info("Failed to close message bus connection.", ignore);
}
}
@Override
public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
throws MetaException {
if (lpde.getStatus())
send(
lpde.getPartitionName(),
lpde.getTable().getParameters()
.get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
HCatConstants.HCAT_PARTITION_DONE_EVENT);
}
@Override
public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
// no-op
}
@Override
public void onAlterTable(AlterTableEvent ate) throws MetaException {
// no-op
}
}