blob: c586dd1a2556efddd002be12fcf8eacd8841ecb3 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.cluster;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.PrivateWorkerKey;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
List<String> assignments(Runnable callback);
* Get the assignment based on storm id from local backend.
* @param stormId topology id
* @param callback callback function
* @return {@link Assignment}
Assignment assignmentInfo(String stormId, Runnable callback);
* Get the assignment based on storm id from remote state store, eg: ZK.
* @param stormId topology id
* @param callback callback function
* @return {@link Assignment}
Assignment remoteAssignmentInfo(String stormId, Runnable callback);
* Get all the topologies assignments mapping stormId -> Assignment from local backend.
* @return stormId -> Assignment mapping
Map<String, Assignment> assignmentsInfo();
* Sync the remote state store assignments to local backend, used when master gains leadership, see {@link LeaderListenerCallback}.
* @param remote assigned assignments for a specific {@link IStormClusterState} instance, usually a supervisor/node.
void syncRemoteAssignments(Map<String, byte[]> remote);
* Flag to indicate if the assignments synced successfully, see {@link #syncRemoteAssignments(Map)}.
* @return true if is synced successfully
boolean isAssignmentsBackendSynchronized();
* Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
void setAssignmentsBackendSynchronized();
VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
List<String> blobstoreInfo(String blobKey);
List<NimbusSummary> nimbuses();
void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
List<String> activeStorms();
* Get a storm base for a topology.
* @param stormId the id of the topology
* @param callback something to call if the data changes (best effort)
* @return the StormBase or null if it is not alive.
StormBase stormBase(String stormId, Runnable callback);
* Get storm id from passed name, null if the name doesn't exist on cluster.
* @param stormName storm name
* @return storm id
String stormId(String stormName);
* Sync all the active storm ids of the cluster, used now when master gains leadership.
* @param ids stormName -> stormId mapping
void syncRemoteIds(Map<String, String> ids);
ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
List<ProfileRequest> getTopologyProfileRequests(String stormId);
void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
List<String> supervisors(Runnable callback);
SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
void setupHeatbeats(String stormId, Map<String, Object> topoConf);
void teardownHeartbeats(String stormId);
void teardownTopologyErrors(String stormId);
List<String> heartbeatStorms();
List<String> errorTopologies();
* @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
List<String> backpressureTopologies();
* Get leader info from state store, which was written when a master gains leadership.
* <p>Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our
* backend now, which could have a overdue info of nodes.
* @param callback callback func
* @return {@link NimbusInfo}
NimbusInfo getLeader(Runnable callback);
void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf);
LogConfig topologyLogConfig(String stormId, Runnable cb);
void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
void removeWorkerHeartbeat(String stormId, String node, Long port);
void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
* @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
* @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
void setupBackpressure(String stormId, Map<String, Object> topoConf);
* @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
void removeBackpressure(String stormId);
* @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
void removeWorkerBackpressure(String stormId, String node, Long port);
void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf);
void updateStorm(String stormId, StormBase newElems);
void removeStormBase(String stormId);
void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf);
void setupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo);
List<String> activeKeys();
List<String> blobstore(Runnable callback);
void removeStorm(String stormId);
void removeBlobstoreKey(String blobKey);
void removeKeyVersion(String blobKey);
void reportError(String stormId, String componentId, String node, Long port, Throwable error);
void setupErrors(String stormId, Map<String, Object> topoConf);
List<ErrorInfo> errors(String stormId, String componentId);
ErrorInfo lastError(String stormId, String componentId);
void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf);
Credentials credentials(String stormId, Runnable callback);
void disconnect();
* Get a private key used to validate a token is correct. This is expected to be called from a privileged daemon, and the ACLs should be
* set up to only allow nimbus and these privileged daemons access to these private keys.
* @param type the type of service the key is for.
* @param topologyId the topology id the key is for.
* @param keyVersion the version of the key this is for.
* @return the private key or null if it could not be found.
PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion);
* Store a new version of a private key. This is expected to only ever be called from nimbus. All ACLs however need to be setup to
* allow the given services access to the stored information.
* @param type the type of service this key is for.
* @param topologyId the topology this key is for
* @param keyVersion the version of the key this is for.
* @param key the key to store.
void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key);
* Get the next key version number that should be used for this topology id. This is expected to only ever be called from nimbus, but it
* is acceptable if the ACLs are setup so that it can work from a privileged daemon for the given service.
* @param type the type of service this is for.
* @param topologyId the topology id this is for.
* @return the next version number. It should be 0 for a new topology id/service combination.
long getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId);
* Remove all keys for the given topology that have expired. The number of keys should be small enough that doing an exhaustive scan of
* them all is acceptable as there is no guarantee that expiration time and version number are related. This should be for all service
* types. This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other daemons will cause
* it to fail.
* @param topologyId the id of the topology to scan.
void removeExpiredPrivateWorkerKeys(String topologyId);
* Remove all of the worker keys for a given topology. Used to clean up after a topology finishes. This is expected to only ever be
* called from nimbus and ideally should only ever work from nimbus.
* @param topologyId the topology to clean up after.
void removeAllPrivateWorkerKeys(String topologyId);
* Get a list of all topologyIds that currently have private worker keys stored, of any kind. This is expected to only ever be called
* from nimbus.
* @return the list of topology ids with any kind of private worker key stored.
Set<String> idsOfTopologiesWithPrivateWorkerKeys();
* Get all of the supervisors with the ID as the key.
default Map<String, SupervisorInfo> allSupervisorInfo() {
return allSupervisorInfo(null);
* @param callback be alerted if the list of supervisors change
* @return All of the supervisors with the ID as the key
default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
Map<String, SupervisorInfo> ret = new HashMap<>();
for (String id : supervisors(callback)) {
SupervisorInfo supervisorInfo = supervisorInfo(id);
if (supervisorInfo != null) {
ret.put(id, supervisorInfo);
return ret;
* Get a topology ID from the name of a topology.
* @param topologyName the name of the topology to look for
* @return the id of the topology or null if it is not alive.
default Optional<String> getTopoId(final String topologyName) {
return Optional.ofNullable(stormId(topologyName));
default Map<String, StormBase> topologyBases() {
Map<String, StormBase> stormBases = new HashMap<>();
for (String topologyId : activeStorms()) {
StormBase base = stormBase(topologyId, null);
if (base != null) { //race condition with delete
stormBases.put(topologyId, base);
return stormBases;