/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hcatalog.listener;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hcatalog.common.HCatConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implementation of
 * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
 * message on two type of topics. One has name of form dbName.tblName On this
 * topic, two kind of messages are sent: add/drop partition and
 * finalize_partition message. Second topic has name "HCAT" and messages sent on
 * it are: add/drop database and add/drop table. All messages also has a
 * property named "HCAT_EVENT" set on them whose value can be used to configure
 * message selector on subscriber side.
 */
public class NotificationListener extends MetaStoreEventListener {

    private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
    protected Session session;
    protected Connection conn;

    /**
     * Create message bus connection and session in constructor.
     */
    public NotificationListener(final Configuration conf) {

        super(conf);
        createConnection();
    }

    private static String getTopicName(Partition partition,
                                       ListenerEvent partitionEvent) throws MetaException {
        try {
            return partitionEvent.getHandler()
                .get_table(partition.getDbName(), partition.getTableName())
                .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
        } catch (NoSuchObjectException e) {
            throw new MetaException(e.toString());
        }
    }

    @Override
    public void onAddPartition(AddPartitionEvent partitionEvent)
        throws MetaException {
        // Subscriber can get notification of newly add partition in a
        // particular table by listening on a topic named "dbName.tableName"
        // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
        if (partitionEvent.getStatus()) {

            Partition partition = partitionEvent.getPartition();
            String topicName = getTopicName(partition, partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
                    + partition.getDbName()
                    + "."
                    + partition.getTableName()
                    + " To enable notifications for this table, please do alter table set properties ("
                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }

    }

    @Override
    public void onDropPartition(DropPartitionEvent partitionEvent)
        throws MetaException {
        // Subscriber can get notification of dropped partition in a
        // particular table by listening on a topic named "dbName.tableName"
        // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"

        // Datanucleus throws NPE when we try to serialize a partition object
        // retrieved from metastore. To workaround that we reset following objects

        if (partitionEvent.getStatus()) {
            Partition partition = partitionEvent.getPartition();
            StorageDescriptor sd = partition.getSd();
            sd.setBucketCols(new ArrayList<String>());
            sd.setSortCols(new ArrayList<Order>());
            sd.setParameters(new HashMap<String, String>());
            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
            String topicName = getTopicName(partition, partitionEvent);
            if (topicName != null && !topicName.equals("")) {
                send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
            } else {
                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
                    + partition.getDbName()
                    + "."
                    + partition.getTableName()
                    + " To enable notifications for this table, please do alter table set properties ("
                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
            }
        }
    }

    @Override
    public void onCreateDatabase(CreateDatabaseEvent dbEvent)
        throws MetaException {
        // Subscriber can get notification about addition of a database in HCAT
        // by listening on a topic named "HCAT" and message selector string
        // as "HCAT_EVENT = HCAT_ADD_DATABASE"
        if (dbEvent.getStatus())
            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
                .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
    }

    @Override
    public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
        // Subscriber can get notification about drop of a database in HCAT
        // by listening on a topic named "HCAT" and message selector string
        // as "HCAT_EVENT = HCAT_DROP_DATABASE"
        if (dbEvent.getStatus())
            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
                .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
    }

    @Override
    public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
        // Subscriber can get notification about addition of a table in HCAT
        // by listening on a topic named "HCAT" and message selector string
        // as "HCAT_EVENT = HCAT_ADD_TABLE"
        if (tableEvent.getStatus()) {
            Table tbl = tableEvent.getTable();
            HMSHandler handler = tableEvent.getHandler();
            HiveConf conf = handler.getHiveConf();
            Table newTbl;
            try {
                newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
                    .deepCopy();
                newTbl.getParameters().put(
                    HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
                    getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
                        + newTbl.getTableName().toLowerCase());
                handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
            } catch (InvalidOperationException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause(e);
                throw me;
            } catch (NoSuchObjectException e) {
                MetaException me = new MetaException(e.toString());
                me.initCause(e);
                throw me;
            }
            send(newTbl, getTopicPrefix(conf) + "."
                + newTbl.getDbName().toLowerCase(),
                HCatConstants.HCAT_ADD_TABLE_EVENT);
        }
    }

    private String getTopicPrefix(HiveConf conf) {
        return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
            HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
    }

    @Override
    public void onDropTable(DropTableEvent tableEvent) throws MetaException {
        // Subscriber can get notification about drop of a table in HCAT
        // by listening on a topic named "HCAT" and message selector string
        // as "HCAT_EVENT = HCAT_DROP_TABLE"

        // Datanucleus throws NPE when we try to serialize a table object
        // retrieved from metastore. To workaround that we reset following objects

        if (tableEvent.getStatus()) {
            Table table = tableEvent.getTable();
            StorageDescriptor sd = table.getSd();
            sd.setBucketCols(new ArrayList<String>());
            sd.setSortCols(new ArrayList<Order>());
            sd.setParameters(new HashMap<String, String>());
            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
            send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
                + table.getDbName().toLowerCase(),
                HCatConstants.HCAT_DROP_TABLE_EVENT);
        }
    }

    /**
     * @param msgBody
     *          is the metastore object. It is sent in full such that if
     *          subscriber is really interested in details, it can reconstruct it
     *          fully. In case of finalize_partition message this will be string
     *          specification of the partition.
     * @param topicName
     *          is the name on message broker on which message is sent.
     * @param event
     *          is the value of HCAT_EVENT property in message. It can be used to
     *          select messages in client side.
     */
    protected void send(Object msgBody, String topicName, String event) {

        try {

            Destination topic = null;
            if (null == session) {
                // this will happen, if we never able to establish a connection.
                createConnection();
                if (null == session) {
                    // Still not successful, return from here.
                    LOG.error("Invalid session. Failed to send message on topic: "
                        + topicName + " event: " + event);
                    return;
                }
            }
            try {
                // Topics are created on demand. If it doesn't exist on broker it will
                // be created when broker receives this message.
                topic = session.createTopic(topicName);
            } catch (IllegalStateException ise) {
                // this will happen if we were able to establish connection once, but
                // its no longer valid,
                // ise is thrown, catch it and retry.
                LOG.error("Seems like connection is lost. Retrying", ise);
                createConnection();
                topic = session.createTopic(topicName);
            }
            if (null == topic) {
                // Still not successful, return from here.
                LOG.error("Invalid session. Failed to send message on topic: "
                    + topicName + " event: " + event);
                return;
            }
            MessageProducer producer = session.createProducer(topic);
            Message msg;
            if (msgBody instanceof Map) {
                MapMessage mapMsg = session.createMapMessage();
                Map<String, String> incomingMap = (Map<String, String>) msgBody;
                for (Entry<String, String> partCol : incomingMap.entrySet()) {
                    mapMsg.setString(partCol.getKey(), partCol.getValue());
                }
                msg = mapMsg;
            } else {
                msg = session.createObjectMessage((Serializable) msgBody);
            }

            msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
            producer.send(msg);
            // Message must be transacted before we return.
            session.commit();
        } catch (Exception e) {
            // Gobble up the exception. Message delivery is best effort.
            LOG.error("Failed to send message on topic: " + topicName + " event: "
                + event, e);
        }
    }

    protected void createConnection() {

        Context jndiCntxt;
        try {
            jndiCntxt = new InitialContext();
            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
                .lookup("ConnectionFactory");
            Connection conn = connFac.createConnection();
            conn.start();
            conn.setExceptionListener(new ExceptionListener() {
                @Override
                public void onException(JMSException jmse) {
                    LOG.error(jmse.toString());
                }
            });
            // We want message to be sent when session commits, thus we run in
            // transacted mode.
            session = conn.createSession(true, Session.SESSION_TRANSACTED);
        } catch (NamingException e) {
            LOG.error("JNDI error while setting up Message Bus connection. "
                + "Please make sure file named 'jndi.properties' is in "
                + "classpath and contains appropriate key-value pairs.", e);
        } catch (JMSException e) {
            LOG.error("Failed to initialize connection to message bus", e);
        } catch (Throwable t) {
            LOG.error("Unable to connect to JMS provider", t);
        }
    }

    @Override
    protected void finalize() throws Throwable {
        // Close the connection before dying.
        try {
            if (null != session)
                session.close();
            if (conn != null) {
                conn.close();
            }

        } catch (Exception ignore) {
            LOG.info("Failed to close message bus connection.", ignore);
        }
    }

    @Override
    public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
        throws MetaException {
        if (lpde.getStatus())
            send(
                lpde.getPartitionName(),
                lpde.getTable().getParameters()
                    .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
                HCatConstants.HCAT_PARTITION_DONE_EVENT);
    }

    @Override
    public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
        // no-op
    }

    @Override
    public void onAlterTable(AlterTableEvent ate) throws MetaException {
        // no-op
    }
}
