/**
 * Copyright 2016 Yahoo Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.yahoo.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import java.util.Map;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

/**
 * This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service
 * as well as ZooKeeper read/write functions for a) lookup of a service unit ownership to a broker; b) take ownership of
 * a service unit by the local broker
 *
 *
 */
public class OwnershipCache {

    private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);

    /**
     * The local broker URL that this <code>OwnershipCache</code> will set as owner
     */
    private final String ownerBrokerUrl;

    /**
     * The local broker URL that this <code>OwnershipCache</code> will set as owner
     */
    private final String ownerBrokerUrlTls;

    /**
     * The NamespaceEphemeralData objects that can be associated with the current owner
     */
    private final NamespaceEphemeralData[] selfOwnerInfos;

    /**
     * Service unit ownership cache of <code>ZooKeeper</code> data of ephemeral nodes showing all known ownership of
     * service unit to active brokers
     */
    private final ZooKeeperDataCache<NamespaceEphemeralData> ownershipReadOnlyCache;

    /**
     * The loading cache of locally owned <code>ServiceUnit</code> objects
     */
    private final LoadingCache<String, OwnedServiceUnit> ownedServiceUnitsCache;

    /**
     * The <code>ObjectMapper</code> to deserialize/serialize JSON objects
     */
    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();

    /**
     * The <code>ZooKeeperCache</code> connecting to the local ZooKeeper
     */
    private final ZooKeeperCache localZkCache;

    /**
     * The <code>NamespaceBundleFactory</code> to construct <code>NamespaceBundles</code>
     */
    private final NamespaceBundleFactory bundleFactory;

    /**
     * The max number of retries to acquire the ownership in <code>ZooKeeper</code>
     */
    private static final int MAX_RETRIES_CREATE_ZNODE = 5; // tentatively decide to retry 5 times in acquiring the
                                                           // zookeeper node

    private class OwnedServiceUnitCacheLoader extends CacheLoader<String, OwnedServiceUnit> {
        @Override
        public OwnedServiceUnit load(String key) throws Exception {
            LOG.info("Acquiring zk lock on namespace {}", key);

            // Under the cache sync lock, acquiring the ZNode
            // If succeeded, we guaranteed that the cache entry is setup w/ ZNode acquired
            // Only enters this load function if ownedServiceUnitsCache does not have the key
            // invalidate the read-only cache to ensure loading the up-to-date znode
            int numTries = 0;
            while (numTries != MAX_RETRIES_CREATE_ZNODE) {
                try {
                    checkArgument(ServiceUnitZkUtils
                            .acquireNameSpace(localZkCache.getZooKeeper(), key, selfOwnerInfos[0]).getNativeUrl()
                            .equals(ownerBrokerUrl)
                            || ServiceUnitZkUtils.acquireNameSpace(localZkCache.getZooKeeper(), key, selfOwnerInfos[0])
                                    .getNativeUrlTls().equals(ownerBrokerUrlTls));
                    ownershipReadOnlyCache.invalidate(key);
                    LOG.info("Acquired zk lock on namespace {}", key);
                    return new OwnedServiceUnit(ServiceUnitZkUtils.suIdFromPath(key, bundleFactory));
                } catch (Exception e) {
                    // Failed to acquire the namespace, try to load the read-only cache since some other broker may have
                    // won the race
                    LOG.warn(String.format("Failed in acquiring the namespace ownership. key=%s", key));
                    try {
                        ownershipReadOnlyCache.invalidate(key);
                        ownershipReadOnlyCache.get(key);
                        // if successful, the znode has been created by someone. break out the loop
                        break;
                    } catch (NoNodeException nne) {
                        // load read-only cache failed due to no node exists, hence, we should try to acquire the
                        // namespace
                        // again
                        numTries++;
                    } catch (Exception moreErr) {
                        // Other unexpected failure
                        LOG.error(String.format(
                                "Unexpected exception while loading the read-only ZK cache for namespace. key=%s", key),
                                moreErr);
                        throw moreErr;
                    }
                }
            }
            checkArgument(numTries < MAX_RETRIES_CREATE_ZNODE,
                    "Maximum retries exceeded to acquire the namespace. key=" + key);
            // load the read-only cache w/ update-to-date entry
            checkArgument(ownershipReadOnlyCache.get(key).getNativeUrl().equals(ownerBrokerUrl),
                    "Some other broker acquired the namespace. key=" + key);
            return new OwnedServiceUnit(ServiceUnitZkUtils.suIdFromPath(key, bundleFactory));
        }
    }

    /**
     * Constructor of <code>OwnershipCache</code>
     *
     * @param ownerUrl
     *            the local broker URL that will be set as owner for the <code>ServiceUnit</code>
     */
    public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
        ServiceConfiguration conf = pulsar.getConfiguration();
        this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
        this.selfOwnerInfos = new NamespaceEphemeralData[] {
                new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, pulsar.getWebServiceAddress(),
                        pulsar.getWebServiceAddressTls(), false),
                new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, pulsar.getWebServiceAddress(),
                        pulsar.getWebServiceAddressTls(), true) };
        this.bundleFactory = bundleFactory;
        this.localZkCache = pulsar.getLocalZkCache();
        this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
        // ownedServiceUnitsCache contains all namespaces that are owned by the local broker
        this.ownedServiceUnitsCache = CacheBuilder.newBuilder().build(new OwnedServiceUnitCacheLoader());
    }

    /**
     * Method to get the current owner of the <code>ServiceUnit</code>
     *
     * @param suId
     *            identifier of the <code>ServiceUnit</code>
     * @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
     * @throws Exception
     *             throws exception if no ownership info is found
     */
    public NamespaceEphemeralData getOwner(ServiceUnitId suname) throws Exception {
        return this.ownershipReadOnlyCache.get(ServiceUnitZkUtils.path(suname));
    }

    /**
     * Method to get the current owner of the <code>ServiceUnit</code> or set the local broker as the owner if absent
     *
     * @param suId
     *            identifier of the <code>ServiceUnit</code>
     * @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
     * @throws Exception
     */
    public NamespaceEphemeralData getOrSetOwner(ServiceUnitId suname) throws Exception {
        String path = ServiceUnitZkUtils.path(suname);

        // If the node has been deleted between two checks, we need to try again
        while (true) {
            try {
                // Trying to load the ownedServiceUnitCache by acquiring the ZNode via the loader
                this.ownedServiceUnitsCache.get(path);
            } catch (Exception e) {
                LOG.info(String.format("Failed in acquiring the ownership of service unit %s", suname), e);
            }

            try {
                return this.ownershipReadOnlyCache.get(path);
            } catch (KeeperException.NoNodeException e) {
                LOG.warn("Failed in getting service unit from read-only cache {}", suname);
            }
        }
    }

    /**
     * Method to remove the ownership of local broker on the <code>ServiceUnit</code>, if owned
     *
     * @param suId
     *            identifier of the <code>ServiceUnit</code>
     * @throws Exception 
     */
    public void removeOwnership(ServiceUnitId suname) throws Exception {
        // Under the cache sync lock, removing the ZNode
        // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
        String key = ServiceUnitZkUtils.path(suname);
        try {
            localZkCache.getZooKeeper().delete(key, -1);
            this.ownedServiceUnitsCache.invalidate(key);
            this.ownershipReadOnlyCache.invalidate(key);
            LOG.info("Removed zk lock for service unit: {}", key);
        } catch (KeeperException.NoNodeException e) {
            LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, e.getMessage());
        }

    }

    /**
     * Method to remove ownership of all owned bundles
     *
     * @param bundles
     *            <code>NamespaceBundles</code> to remove from ownership cache
     */
    public void removeOwnership(NamespaceBundles bundles) {
        boolean hasError = false;
        for (NamespaceBundle bundle : bundles.getBundles()) {
            if (getOwnedServiceUnit(bundle) == null) {
                // continue
                continue;
            }
            try {
                this.removeOwnership(bundle);
            } catch (Exception e) {
                LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e);
                hasError = true;
            }
        }
        checkState(!hasError, "Not able to remove all owned bundles");
    }

    /**
     * Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
     *
     * @return a map of owned <code>ServiceUnit</code> objects
     */
    public Map<String, OwnedServiceUnit> getOwnedServiceUnits() {
        return this.ownedServiceUnitsCache.asMap();
    }

    public OwnedServiceUnit getOwnedServiceUnit(ServiceUnitId suname) {
        return this.ownedServiceUnitsCache.getIfPresent(ServiceUnitZkUtils.path(suname));
    }

    public void disableOwnership(ServiceUnitId suName) throws Exception {
        String path = ServiceUnitZkUtils.path(suName);
        localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfos[1]), -1);
        this.ownershipReadOnlyCache.invalidate(path);
    }
    
    /**
     * Update bundle state in a local cache
     * 
     * @param bundle
     * @throws Exception
     */
    public void updateBundleState(ServiceUnitId bundle, boolean isActive) throws Exception {
        // Disable owned instance in local cache
        OwnedServiceUnit ownedServiceUnit = getOwnedServiceUnit(bundle);
        if (ownedServiceUnit != null) {
            ownedServiceUnit.setActive(isActive);
        }
    }
}
