/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.cluster;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.PrivateWorkerKey;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.nimbus.NimbusInfo;

public interface IStormClusterState {
    List<String> assignments(Runnable callback);

    /**
     * Get the assignment based on storm id from local backend.
     *
     * @param stormId  topology id
     * @param callback callback function
     * @return {@link Assignment}
     */
    Assignment assignmentInfo(String stormId, Runnable callback);

    /**
     * Get the assignment based on storm id from remote state store, eg: ZK.
     *
     * @param stormId  topology id
     * @param callback callback function
     * @return {@link Assignment}
     */
    Assignment remoteAssignmentInfo(String stormId, Runnable callback);

    /**
     * Get all the topologies assignments mapping stormId -> Assignment from local backend.
     *
     * @return stormId -> Assignment mapping
     */
    Map<String, Assignment> assignmentsInfo();

    /**
     * Sync the remote state store assignments to local backend, used when master gains leadership, see {@link LeaderListenerCallback}.
     *
     * @param remote assigned assignments for a specific {@link IStormClusterState} instance, usually a supervisor/node.
     */
    void syncRemoteAssignments(Map<String, byte[]> remote);

    /**
     * Flag to indicate if the assignments synced successfully, see {@link #syncRemoteAssignments(Map)}.
     *
     * @return true if is synced successfully
     */
    boolean isAssignmentsBackendSynchronized();

    /**
     * Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
     */
    void setAssignmentsBackendSynchronized();

    VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);

    Integer assignmentVersion(String stormId, Runnable callback) throws Exception;

    List<String> blobstoreInfo(String blobKey);

    List<NimbusSummary> nimbuses();

    void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);

    List<String> activeStorms();

    /**
     * Get a storm base for a topology.
     *
     * @param stormId  the id of the topology
     * @param callback something to call if the data changes (best effort)
     * @return the StormBase or null if it is not alive.
     */
    StormBase stormBase(String stormId, Runnable callback);

    /**
     * Get storm id from passed name, null if the name doesn't exist on cluster.
     *
     * @param stormName storm name
     * @return storm id
     */
    String stormId(String stormName);

    /**
     * Sync all the active storm ids of the cluster, used now when master gains leadership.
     *
     * @param ids stormName -> stormId mapping
     */
    void syncRemoteIds(Map<String, String> ids);

    ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);

    List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);

    List<ProfileRequest> getTopologyProfileRequests(String stormId);

    void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);

    void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);

    Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);

    List<String> supervisors(Runnable callback);

    SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist

    void setupHeatbeats(String stormId, Map<String, Object> topoConf);

    void teardownHeartbeats(String stormId);

    void teardownTopologyErrors(String stormId);

    List<String> heartbeatStorms();

    List<String> errorTopologies();

    /**
     * Get backpressure topologies.
     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
     */
    @Deprecated
    List<String> backpressureTopologies();

    /**
     * Get leader info from state store, which was written when a master gains leadership.
     *
     * <p>Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our
     * backend now, which could have a overdue info of nodes.
     *
     * @param callback callback func
     * @return {@link NimbusInfo}
     */
    NimbusInfo getLeader(Runnable callback);

    void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf);

    LogConfig topologyLogConfig(String stormId, Runnable cb);

    void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);

    void removeWorkerHeartbeat(String stormId, String node, Long port);

    void supervisorHeartbeat(String supervisorId, SupervisorInfo info);

    /**
     * Get topoloy backpressure.
     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
     */
    @Deprecated
    boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);

    /**
     * Setup backpressure.
     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
     */
    @Deprecated
    void setupBackpressure(String stormId, Map<String, Object> topoConf);

    /**
     * Remove backpressure.
     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
     */
    @Deprecated
    void removeBackpressure(String stormId);

    /**
     * Remove worker backpressure.
     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
     */
    @Deprecated
    void removeWorkerBackpressure(String stormId, String node, Long port);

    void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf);

    void updateStorm(String stormId, StormBase newElems);

    void removeStormBase(String stormId);

    void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf);

    void setupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo);

    List<String> activeKeys();

    List<String> blobstore(Runnable callback);

    void removeStorm(String stormId);

    void removeBlobstoreKey(String blobKey);

    void removeKeyVersion(String blobKey);

    void reportError(String stormId, String componentId, String node, Long port, Throwable error);

    void setupErrors(String stormId, Map<String, Object> topoConf);

    List<ErrorInfo> errors(String stormId, String componentId);

    ErrorInfo lastError(String stormId, String componentId);

    void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf);

    Credentials credentials(String stormId, Runnable callback);

    void disconnect();

    /**
     * Get a private key used to validate a token is correct. This is expected to be called from a privileged daemon, and the ACLs should be
     * set up to only allow nimbus and these privileged daemons access to these private keys.
     *
     * @param type       the type of service the key is for.
     * @param topologyId the topology id the key is for.
     * @param keyVersion the version of the key this is for.
     * @return the private key or null if it could not be found.
     */
    PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion);

    /**
     * Store a new version of a private key. This is expected to only ever be called from nimbus.  All ACLs however need to be setup to
     * allow the given services access to the stored information.
     *
     * @param type       the type of service this key is for.
     * @param topologyId the topology this key is for
     * @param keyVersion the version of the key this is for.
     * @param key        the key to store.
     */
    void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key);

    /**
     * Get the next key version number that should be used for this topology id. This is expected to only ever be called from nimbus, but it
     * is acceptable if the ACLs are setup so that it can work from a privileged daemon for the given service.
     *
     * @param type       the type of service this is for.
     * @param topologyId the topology id this is for.
     * @return the next version number.  It should be 0 for a new topology id/service combination.
     */
    long getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId);

    /**
     * Remove all keys for the given topology that have expired. The number of keys should be small enough that doing an exhaustive scan of
     * them all is acceptable as there is no guarantee that expiration time and version number are related.  This should be for all service
     * types. This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other daemons will cause
     * it to fail.
     *
     * @param topologyId the id of the topology to scan.
     */
    void removeExpiredPrivateWorkerKeys(String topologyId);

    /**
     * Remove all of the worker keys for a given topology.  Used to clean up after a topology finishes. This is expected to only ever be
     * called from nimbus and ideally should only ever work from nimbus.
     *
     * @param topologyId the topology to clean up after.
     */
    void removeAllPrivateWorkerKeys(String topologyId);

    /**
     * Get a list of all topologyIds that currently have private worker keys stored, of any kind. This is expected to only ever be called
     * from nimbus.
     *
     * @return the list of topology ids with any kind of private worker key stored.
     */
    Set<String> idsOfTopologiesWithPrivateWorkerKeys();

    /**
     * Get all of the supervisors with the ID as the key.
     */
    default Map<String, SupervisorInfo> allSupervisorInfo() {
        return allSupervisorInfo(null);
    }

    /**
     * Get all supervisor info.
     * @param callback be alerted if the list of supervisors change
     * @return All of the supervisors with the ID as the key
     */
    default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
        Map<String, SupervisorInfo> ret = new HashMap<>();
        for (String id : supervisors(callback)) {
            SupervisorInfo supervisorInfo = supervisorInfo(id);
            if (supervisorInfo != null) {
                ret.put(id, supervisorInfo);
            }
        }
        return ret;
    }

    /**
     * Get a topology ID from the name of a topology.
     *
     * @param topologyName the name of the topology to look for
     * @return the id of the topology or null if it is not alive.
     */
    default Optional<String> getTopoId(final String topologyName) {
        return Optional.ofNullable(stormId(topologyName));
    }

    default Map<String, StormBase> topologyBases() {
        Map<String, StormBase> stormBases = new HashMap<>();
        for (String topologyId : activeStorms()) {
            StormBase base = stormBase(topologyId, null);
            if (base != null) { //race condition with delete
                stormBases.put(topologyId, base);
            }
        }
        return stormBases;
    }
}
