/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hcatalog.common;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A thread safe time expired cache for HiveMetaStoreClient
 */
class HiveClientCache {
    final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache;
    private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
    private final int timeout;
    // This lock is used to make sure removalListener won't close a client that is being contemplated for returning by get()
    private final Object CACHE_TEARDOWN_LOCK = new Object();

    private static final AtomicInteger nextId = new AtomicInteger(0);

    // Since HiveMetaStoreClient is not threadsafe, hive clients are not  shared across threads.
    // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache
    // causing each thread to get a different client even if the hiveConf is same.
    private static final ThreadLocal<Integer> threadId =
            new ThreadLocal<Integer>() {
                @Override protected Integer initialValue() {
                    return nextId.getAndIncrement();
                }
            };

    private int getThreadId() {
        return threadId.get();
    }

    /**
     * @param timeout the length of time in seconds after a client is created that it should be automatically removed
     */
     public HiveClientCache(final int timeout) {
         this.timeout = timeout;
         RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient> removalListener =
             new RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient>() {
                 public void onRemoval(RemovalNotification<HiveClientCacheKey, CacheableHiveMetaStoreClient> notification) {
                     CacheableHiveMetaStoreClient hiveMetaStoreClient = notification.getValue();
                     if (hiveMetaStoreClient != null) {
                         synchronized (CACHE_TEARDOWN_LOCK) {
                            hiveMetaStoreClient.setExpiredFromCache();
                            hiveMetaStoreClient.tearDownIfUnused();
                         }
                     }
                 }
             };
         hiveCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(timeout, TimeUnit.SECONDS)
                 .removalListener(removalListener)
                 .build();

         // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
         // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
         // would get cleaned up via either the removalListener or the close() call, only the active clients
         // that are in the cache or expired but being used in other threads wont get cleaned. The following code will only
         // clean the active cache ones. The ones expired from cache but being hold by other threads are in the mercy
         // of finalize() being called.
         Thread cleanupHiveClientShutdownThread = new Thread() {
             @Override
             public void run() {
                 LOG.info("Cleaning up hive client cache in ShutDown hook");
                 closeAllClientsQuietly();
             }
         };
         Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread);
    }

    /**
     * Note: This doesn't check if they are being used or not, meant only to be called during shutdown etc.
     */
    void closeAllClientsQuietly() {
        try {
            ConcurrentMap<HiveClientCacheKey, CacheableHiveMetaStoreClient> elements = hiveCache.asMap();
            for (CacheableHiveMetaStoreClient cacheableHiveMetaStoreClient : elements.values()) {
                cacheableHiveMetaStoreClient.tearDown();
            }
        } catch (Exception e) {
            LOG.warn("Clean up of hive clients in the cache failed. Ignored", e);
        }
    }

    public void cleanup() {
        hiveCache.cleanUp();
    }

    /**
     * Returns a cached client if exists or else creates one, caches and returns it. It also checks that the client is
     * healthy and can be reused
     * @param hiveConf
     * @return the hive client
     * @throws MetaException
     * @throws IOException
     * @throws LoginException
     */
    public HiveMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException {
        final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf, getThreadId());
        CacheableHiveMetaStoreClient hiveMetaStoreClient = null;
        // the hmsc is not shared across threads. So the only way it could get closed while we are doing healthcheck
        // is if removalListener closes it. The synchronization takes care that removalListener won't do it
        synchronized (CACHE_TEARDOWN_LOCK) {
            hiveMetaStoreClient = getOrCreate(cacheKey);
            hiveMetaStoreClient.acquire();
        }
        if (!hiveMetaStoreClient.isOpen()) {
            synchronized (CACHE_TEARDOWN_LOCK) {
                hiveCache.invalidate(cacheKey);
                hiveMetaStoreClient.close();
                hiveMetaStoreClient = getOrCreate(cacheKey);
                hiveMetaStoreClient.acquire();
            }
        }
        return hiveMetaStoreClient;
    }

    /**
     * Return from cache if exists else create/cache and return
     * @param cacheKey
     * @return
     * @throws IOException
     * @throws MetaException
     * @throws LoginException
     */
    private CacheableHiveMetaStoreClient getOrCreate(final HiveClientCacheKey cacheKey) throws IOException, MetaException, LoginException {
        try {
            return hiveCache.get(cacheKey, new Callable<CacheableHiveMetaStoreClient>() {
                @Override
                public CacheableHiveMetaStoreClient call() throws MetaException {
                    return new CacheableHiveMetaStoreClient(cacheKey.getHiveConf(), timeout);
                }
            });
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof IOException) {
                throw (IOException) t;
            } else if (t instanceof MetaException) {
                throw (MetaException) t;
            } else if (t instanceof LoginException) {
                throw (LoginException) t;
            } else {
                throw new IOException("Error creating hiveMetaStoreClient", t);
            }
        }
    }

    /**
     * A class to wrap HiveConf and expose equality based only on UserGroupInformation and the metaStoreURIs.
     * This becomes the key for the cache and this way the same HiveMetaStoreClient would be returned if
     * UserGroupInformation and metaStoreURIs are same. This function can evolve to express
     * the cases when HiveConf is different but the same hiveMetaStoreClient can be used
     */
    public static class HiveClientCacheKey {
        final private String metaStoreURIs;
        final private UserGroupInformation ugi;
        final private HiveConf hiveConf;
        final private int threadId;

        private HiveClientCacheKey(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
            this.metaStoreURIs = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
            ugi = ShimLoader.getHadoopShims().getUGIForConf(hiveConf);
            this.hiveConf = hiveConf;
            this.threadId = threadId;
        }

        public static HiveClientCacheKey fromHiveConf(HiveConf hiveConf, final int threadId) throws IOException, LoginException {
            return new HiveClientCacheKey(hiveConf, threadId);
        }

        public HiveConf getHiveConf() {
            return hiveConf;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            HiveClientCacheKey that = (HiveClientCacheKey) o;
            return new EqualsBuilder().
                    append(this.metaStoreURIs,
                            that.metaStoreURIs).
                    append(this.ugi, that.ugi).
                    append(this.threadId, that.threadId).isEquals();
        }

        @Override
        public int hashCode() {
            return new HashCodeBuilder().
                    append(metaStoreURIs).
                    append(ugi).
                    append(threadId).toHashCode();
        }
    }

    /**
     * Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
     */
    public static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient  {
        private  AtomicInteger users = new AtomicInteger(0);
        private volatile boolean expiredFromCache = false;
        private boolean isClosed = false;
        private final long expiryTime;
        private static final int EXPIRY_TIME_EXTENSION_IN_MILLIS = 60*1000;

        public CacheableHiveMetaStoreClient(final HiveConf conf, final int timeout) throws MetaException {
            super(conf);
            // Extend the expiry time with some extra time on top of guava expiry time to make sure
            // that items closed() are for sure expired and would never be returned by guava.
            this.expiryTime = System.currentTimeMillis() + timeout*1000 + EXPIRY_TIME_EXTENSION_IN_MILLIS;
        }

        private void acquire() {
            users.incrementAndGet();
        }

        private void release() {
            users.decrementAndGet();
        }

        public void setExpiredFromCache() {
            expiredFromCache = true;
        }

        public boolean isClosed() {
            return isClosed;
        }

        /**
         * Make a call to hive meta store and see if the client is still usable. Some calls where the user provides
         * invalid data renders the client unusable for future use (example: create a table with very long table name)
         * @return
         */
        protected boolean isOpen() {
            try {
                // Look for an unlikely database name and see if either MetaException or TException is thrown
                this.getDatabase("NonExistentDatabaseUsedForHealthCheck");
            } catch (NoSuchObjectException e) {
                return true; // It is okay if the database doesn't exist
            } catch (MetaException e) {
                return false;
            } catch (TException e) {
                return false;
            }
            return true;
        }

        /**
         * Decrement the user count and piggyback this to set expiry flag as well, then  teardown(), if conditions are met.
         * This *MUST* be called by anyone who uses this client.
         */
        @Override
        public void close(){
            release();
            if(System.currentTimeMillis() >= expiryTime)
                setExpiredFromCache();
            tearDownIfUnused();
        }

        /**
         * Tear down only if
         *  1. There are no active user
         *  2. It has expired from the cache
         */
        private void tearDownIfUnused() {
            if(users.get() == 0 && expiredFromCache) {
                this.tearDown();
            }
        }

        /**
         * Close if not closed already
         */
        protected synchronized void tearDown() {
            try {
                if(!isClosed) {
                    super.close();
                }
                isClosed = true;
            } catch(Exception e) {
                LOG.warn("Error closing hive metastore client. Ignored.", e);
            }
        }

        /**
         * Last effort to clean up, may not even get called.
         * @throws Throwable
         */
        @Override
        protected void finalize() throws Throwable {
            try {
                this.tearDown();
            } finally {
                super.finalize();
            }
        }
    }
}
