/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.loadbalance;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * LoadManager runs through set of load reports collected from different brokers and generates a recommendation of
 * namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete Load Manager will use different algorithms to
 * generate this mapping.
 * <p>
 * Concrete Load Manager is also return the least loaded broker that should own the new namespace.
 */
public interface LoadManager {
    Logger LOG = LoggerFactory.getLogger(LoadManager.class);

    String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";

    void start() throws PulsarServerException;

    /**
     * Is centralized decision making to assign a new bundle.
     */
    boolean isCentralized();

    /**
     * Returns the Least Loaded Resource Unit decided by some algorithm or criteria which is implementation specific.
     */
    Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

    default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
        throw new UnsupportedOperationException();
    }

    /**
     * Generate the load report.
     */
    LoadManagerReport generateLoadReport() throws Exception;

    /**
     * Set flag to force load report update.
     */
    void setLoadReportForceUpdateFlag();

    /**
     * Publish the current load report on ZK.
     */
    void writeLoadReportOnZookeeper() throws Exception;

    /**
     * Publish the current load report on ZK, forced or not.
     * By default, rely on method writeLoadReportOnZookeeper().
     */
    default void writeLoadReportOnZookeeper(boolean force) throws Exception {
        writeLoadReportOnZookeeper();
    }

    /**
     * Update namespace bundle resource quota on ZK.
     */
    void writeResourceQuotasToZooKeeper() throws Exception;

    /**
     * Generate load balancing stats metrics.
     */
    List<Metrics> getLoadBalancingMetrics();

    /**
     * Unload a candidate service unit to balance the load.
     */
    void doLoadShedding();

    /**
     * Namespace bundle split.
     */
    void doNamespaceBundleSplit() throws Exception;

    /**
     * Removes visibility of current broker from loadbalancer list so, other brokers can't redirect any request to this
     * broker and this broker won't accept new connection requests.
     *
     * @throws Exception if there is any error while disabling broker
     */
    void disableBroker() throws Exception;

    /**
     * Get list of available brokers in cluster.
     *
     * @return the list of available brokers
     * @throws Exception if there is any error while getting available brokers
     */
    Set<String> getAvailableBrokers() throws Exception;

    CompletableFuture<Set<String>> getAvailableBrokersAsync();

    String setNamespaceBundleAffinity(String bundle, String broker);

    void stop() throws PulsarServerException;

    /**
     * Initialize this LoadManager.
     *
     * @param pulsar
     *            The service to initialize this with.
     */
    void initialize(PulsarService pulsar);

    static LoadManager create(final PulsarService pulsar) {
        try {
            final ServiceConfiguration conf = pulsar.getConfiguration();
            // Assume there is a constructor with one argument of PulsarService.
            final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(),
                    Thread.currentThread().getContextClassLoader());
            if (loadManagerInstance instanceof LoadManager casted) {
                casted.initialize(pulsar);
                return casted;
            } else if (loadManagerInstance instanceof ModularLoadManager modularLoadManager) {
                final LoadManager casted = new ModularLoadManagerWrapper(modularLoadManager);
                casted.initialize(pulsar);
                return casted;
            } else if (loadManagerInstance instanceof ExtensibleLoadManager) {
                final LoadManager casted =
                        new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
                casted.initialize(pulsar);
                return casted;
            }
        } catch (Exception e) {
            LOG.warn("Error when trying to create load manager: ", e);
        }
        // If we failed to create a load manager, default to SimpleLoadManagerImpl.
        return new SimpleLoadManagerImpl(pulsar);
    }

}
