| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.cluster; |
| |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.storm.Config; |
| import org.apache.storm.assignments.ILocalAssignmentsBackend; |
| import org.apache.storm.assignments.LocalAssignmentsBackendFactory; |
| import org.apache.storm.generated.ClusterWorkerHeartbeat; |
| import org.apache.storm.generated.ExecutorInfo; |
| import org.apache.storm.generated.ExecutorStats; |
| import org.apache.storm.generated.ProfileAction; |
| import org.apache.storm.generated.WorkerTokenServiceType; |
| import org.apache.storm.shade.org.apache.zookeeper.ZooDefs; |
| import org.apache.storm.shade.org.apache.zookeeper.data.ACL; |
| import org.apache.storm.shade.org.apache.zookeeper.data.Id; |
| import org.apache.storm.shade.org.apache.zookeeper.server.auth.DigestAuthenticationProvider; |
| import org.apache.storm.utils.Utils; |
| |
| public class ClusterUtils { |
| |
| public static final String ZK_SEPERATOR = "/"; |
| |
| public static final String ASSIGNMENTS_ROOT = "assignments"; |
| public static final String STORMS_ROOT = "storms"; |
| public static final String SUPERVISORS_ROOT = "supervisors"; |
| public static final String WORKERBEATS_ROOT = "workerbeats"; |
| public static final String BACKPRESSURE_ROOT = "backpressure"; |
| public static final String LEADERINFO_ROOT = "leader-info"; |
| public static final String ERRORS_ROOT = "errors"; |
| public static final String BLOBSTORE_ROOT = "blobstore"; |
| public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber"; |
| public static final String NIMBUSES_ROOT = "nimbuses"; |
| public static final String CREDENTIALS_ROOT = "credentials"; |
| public static final String LOGCONFIG_ROOT = "logconfigs"; |
| public static final String PROFILERCONFIG_ROOT = "profilerconfigs"; |
| public static final String SECRET_KEYS_ROOT = "secretkeys"; |
| |
| public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT; |
| public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT; |
| public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT; |
| public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT; |
| public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT; |
| public static final String LEADERINFO_SUBTREE = ZK_SEPERATOR + LEADERINFO_ROOT; |
| public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT; |
| public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT; |
| public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT; |
| public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT; |
| public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT; |
| public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT; |
| public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT; |
| public static final String SECRET_KEYS_SUBTREE = ZK_SEPERATOR + SECRET_KEYS_ROOT; |
| |
| // A singleton instance allows us to mock delegated static methods in our |
| // tests by subclassing. |
| private static final ClusterUtils INSTANCE = new ClusterUtils(); |
| private static ClusterUtils _instance = INSTANCE; |
| |
| /** |
| * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that |
| * overrides the implementation of the delegated method. |
| * |
| * @param u a Cluster instance |
| */ |
| public static void setInstance(ClusterUtils u) { |
| _instance = u; |
| } |
| |
| /** |
| * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no |
| * longer desired. |
| */ |
| public static void resetInstance() { |
| _instance = INSTANCE; |
| } |
| |
| /** |
| * Get ZK ACLs for a topology to have read/write access. |
| * |
| * @param topoConf the topology config. |
| * @return the ACLs. |
| */ |
| public static List<ACL> mkTopoReadWriteAcls(Map<String, Object> topoConf) { |
| return mkTopoAcls(topoConf, ZooDefs.Perms.ALL); |
| } |
| |
| /** |
| * Get ZK ACLs for a topology to have read only access. |
| * |
| * @param topoConf the topology config. |
| * @return the ACLs. |
| */ |
| public static List<ACL> mkTopoReadOnlyAcls(Map<String, Object> topoConf) { |
| return mkTopoAcls(topoConf, ZooDefs.Perms.READ); |
| } |
| |
| private static List<ACL> mkTopoAcls(Map<String, Object> topoConf, int perms) { |
| List<ACL> aclList = null; |
| String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); |
| if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) { |
| aclList = new ArrayList<>(); |
| ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0); |
| aclList.add(acl1); |
| try { |
| ACL acl2 = new ACL(perms, new Id("digest", DigestAuthenticationProvider.generateDigest(payload))); |
| aclList.add(acl2); |
| } catch (NoSuchAlgorithmException e) { |
| //Should only happen on a badly configured system |
| throw new RuntimeException(e); |
| } |
| } |
| return aclList; |
| } |
| |
| public static String supervisorPath(String id) { |
| return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id; |
| } |
| |
| public static String assignmentPath(String id) { |
| return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id; |
| } |
| |
| public static String blobstorePath(String key) { |
| return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key; |
| } |
| |
| public static String blobstoreMaxKeySequenceNumberPath(String key) { |
| return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key; |
| } |
| |
| public static String nimbusPath(String id) { |
| return NIMBUSES_SUBTREE + ZK_SEPERATOR + id; |
| } |
| |
| public static String stormPath(String id) { |
| return STORMS_SUBTREE + ZK_SEPERATOR + id; |
| } |
| |
| public static String workerbeatStormRoot(String stormId) { |
| return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| public static String workerbeatPath(String stormId, String node, Long port) { |
| return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port; |
| } |
| |
| public static String backpressureStormRoot(String stormId) { |
| return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| public static String backpressurePath(String stormId, String node, Long port) { |
| return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port; |
| } |
| |
| /** |
| * Get the backpressure znode full path. |
| * |
| * @param stormId The topology id |
| * @param shortPath A string in the form of "node-port" |
| * @return The backpressure znode path |
| */ |
| public static String backpressurePath(String stormId, String shortPath) { |
| return backpressureStormRoot(stormId) + ZK_SEPERATOR + shortPath; |
| } |
| |
| public static String errorStormRoot(String stormId) { |
| return ERRORS_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| public static String errorPath(String stormId, String componentId) { |
| return errorStormRoot(stormId) + ZK_SEPERATOR + Utils.urlEncodeUtf8(componentId); |
| } |
| |
| public static String lastErrorPath(String stormId, String componentId) { |
| return errorPath(stormId, componentId) + "-last-error"; |
| } |
| |
| public static String credentialsPath(String stormId) { |
| return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| /** |
| * Get the path to the log config for a topology. |
| * |
| * @param stormId the topology id. |
| * @return the path to the config. |
| */ |
| public static String logConfigPath(String stormId) { |
| return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| public static String profilerConfigPath(String stormId) { |
| return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId; |
| } |
| |
| public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) { |
| return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType; |
| } |
| |
| /** |
| * Get the base path where secret keys are stored for a given service. |
| * |
| * @param type the service we are interested in. |
| * @return the path to that service root. |
| */ |
| public static String secretKeysPath(WorkerTokenServiceType type) { |
| return SECRET_KEYS_SUBTREE + ZK_SEPERATOR + type.name(); |
| } |
| |
| /** |
| * Get the path to secret keys for a specific topology. |
| * |
| * @param type the service the secret is for. |
| * @param topologyId the topology the secret is for. |
| * @return the path to the list of secret keys. |
| */ |
| public static String secretKeysPath(WorkerTokenServiceType type, String topologyId) { |
| return secretKeysPath(type) + ZK_SEPERATOR + topologyId; |
| } |
| |
| /** |
| * Get the path to a specific secret key. |
| * |
| * @param type the service the secret is for. |
| * @param topologyId the topology the secret is for. |
| * @param version the version the secret is for. |
| * @return the path to the secret. |
| */ |
| public static String secretKeysPath(WorkerTokenServiceType type, String topologyId, long version) { |
| return secretKeysPath(type, topologyId) + ZK_SEPERATOR + version; |
| } |
| |
| public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) { |
| if (serialized != null) { |
| return Utils.deserialize(serialized, clazz); |
| } |
| return null; |
| } |
| |
| /** |
| * Ensures that we only return heartbeats for executors assigned to this worker. |
| */ |
| public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, |
| ClusterWorkerHeartbeat workerHeartbeat) { |
| Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>(); |
| Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats(); |
| for (ExecutorInfo executor : executors) { |
| if (executorStatsMap.containsKey(executor)) { |
| int time = workerHeartbeat.get_time_secs(); |
| int uptime = workerHeartbeat.get_uptime_secs(); |
| ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor); |
| ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats); |
| executorWhb.put(executor, executorBeat); |
| } |
| } |
| return executorWhb; |
| } |
| |
| public static IStateStorage mkStateStorage(Map<String, Object> config, Map<String, Object> authConf, |
| ClusterStateContext context) throws Exception { |
| return _instance.mkStateStorageImpl(config, authConf, context); |
| } |
| |
| public static IStormClusterState mkStormClusterState(Object stateStorage, ILocalAssignmentsBackend backend, |
| ClusterStateContext context) throws Exception { |
| return _instance.mkStormClusterStateImpl(stateStorage, backend, context); |
| } |
| |
| public static IStormClusterState mkStormClusterState(Object stateStorage, ClusterStateContext context) throws Exception { |
| return _instance.mkStormClusterStateImpl(stateStorage, LocalAssignmentsBackendFactory.getDefault(), context); |
| } |
| |
| public static String stringifyError(Throwable error) { |
| StringWriter result = new StringWriter(); |
| PrintWriter printWriter = new PrintWriter(result); |
| error.printStackTrace(printWriter); |
| return result.toString(); |
| } |
| |
| public IStormClusterState mkStormClusterStateImpl(Object stateStorage, ILocalAssignmentsBackend backend, |
| ClusterStateContext context) throws Exception { |
| if (stateStorage instanceof IStateStorage) { |
| return new StormClusterStateImpl((IStateStorage) stateStorage, backend, context, false); |
| } else { |
| IStateStorage storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage, |
| (Map<String, Object>) stateStorage, context); |
| return new StormClusterStateImpl(storage, backend, context, true); |
| } |
| } |
| |
| public IStateStorage mkStateStorageImpl(Map<String, Object> config, Map<String, Object> authConf, ClusterStateContext context) throws |
| Exception { |
| String className = null; |
| IStateStorage stateStorage = null; |
| if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) { |
| className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE); |
| } else { |
| className = "org.apache.storm.cluster.ZKStateStorageFactory"; |
| } |
| Class clazz = Class.forName(className); |
| StateStorageFactory storageFactory = (StateStorageFactory) clazz.newInstance(); |
| stateStorage = storageFactory.mkStore(config, authConf, context); |
| return stateStorage; |
| } |
| } |