blob: 62269a40895cd0f3ea69ff80992e379bb4e07e40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.stats;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.utils.Time;
/**
* Stats calculations needed by storm client code.
*/
public class ClientStatsUtil {
public static final String SPOUT = "spout";
public static final String BOLT = "bolt";
static final String EXECUTOR_STATS = "executor-stats";
static final String UPTIME = "uptime";
public static final String TIME_SECS = "time-secs";
public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
public static final IdentityTransformer IDENTITY = new IdentityTransformer();
/**
* Convert a List<Long> executor to java List<Integer>.
*/
public static List<Integer> convertExecutor(List<Long> executor) {
return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
}
/**
* Make an map of executors to empty stats, in preparation for doing a heartbeat.
* @param executors the executors as keys of the map
* @return and empty map of executors to stats
*/
public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
for (Object executor : executors) {
List startEnd = (List) executor;
ret.put(convertExecutor(startEnd), null);
}
return ret;
}
/**
* Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
*/
public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
ret.put(convertExecutor(entry.getKey()), entry.getValue());
}
return ret;
}
/**
* Create a new worker heartbeat for zookeeper.
* @param topoId the topology id
* @param executorStats the stats for the executors
* @param uptime the uptime for the worker
* @return the heartbeat map
*/
public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
Map<String, Object> ret = new HashMap<>();
ret.put("storm-id", topoId);
ret.put(EXECUTOR_STATS, executorStats);
ret.put(UPTIME, uptime);
ret.put(TIME_SECS, Time.currentTimeSecs());
return ret;
}
private static Number getByKeyOr0(Map<String, Object> m, String k) {
if (m == null) {
return 0;
}
Number n = (Number) m.get(k);
if (n == null) {
return 0;
}
return n;
}
/**
* Get a sub-map by a given key.
* @param map the original map
* @param key the key to get it from
* @return the map stored under key
*/
public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
if (map == null) {
return null;
}
return (Map<K, V>) map.get(key);
}
public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
ret.set_storm_id((String) heartbeat.get("storm-id"));
ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
if (executorStats != null) {
for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
List<Integer> executor = entry.getKey();
ExecutorStats stats = entry.getValue();
if (null != stats) {
convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
}
}
}
ret.set_executor_stats(convertedStats);
return ret;
}
/**
* Converts stats to be over given windows of time.
* @param stats the stats
* @param secKeyFunc transform the sub-key
* @param firstKeyFunc transform the main key
*/
public static <K1, K2> Map windowSetConverter(
Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
Map ret = new HashMap();
for (Object o : stats.entrySet()) {
Map.Entry entry = (Map.Entry) o;
K1 key1 = firstKeyFunc.transform(entry.getKey());
Map subRetMap = (Map) ret.get(key1);
if (subRetMap == null) {
subRetMap = new HashMap();
}
ret.put(key1, subRetMap);
Map value = (Map) entry.getValue();
for (Object oo : value.entrySet()) {
Map.Entry subEntry = (Map.Entry) oo;
K2 key2 = secKeyFunc.transform(subEntry.getKey());
subRetMap.put(key2, subEntry.getValue());
}
}
return ret;
}
// =====================================================================================
// key transformers
// =====================================================================================
/**
* Provides a way to transform one key into another.
*/
interface KeyTransformer<T> {
T transform(Object key);
}
static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
@Override
public GlobalStreamId transform(Object key) {
if (key instanceof List) {
List l = (List) key;
if (l.size() > 1) {
return new GlobalStreamId((String) l.get(0), (String) l.get(1));
}
}
return new GlobalStreamId("", key.toString());
}
}
static class IdentityTransformer implements KeyTransformer<Object> {
@Override
public Object transform(Object key) {
return key;
}
}
}