blob: 3871074633d3b4bca92be3f2c8f91e4f0d9dd014 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.nimbus;
import backtype.storm.Config;
import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormZkClusterState;
import com.alibaba.jstorm.metric.JStormMetricCache;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* All nimbus data
*/
public class NimbusData {
private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class);
private Map<Object, Object> conf;
private StormClusterState stormClusterState;
// Map<topologyId, Map<taskid, TkHbCacheTime>>
private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;
// TODO two kind of value:Channel/BufferFileInputStream
private TimeCacheMap<Object, Object> downloaders;
private TimeCacheMap<Object, Object> uploaders;
// cache thrift response to avoid scan zk too frequently
private NimbusCache nimbusCache;
private int startTime;
private final ScheduledExecutorService scheduExec;
private AtomicInteger submittedCount;
private StatusTransition statusTransition;
private static final int SCHEDULE_THREAD_NUM = 8;
private final INimbus inimubs;
private final boolean localMode;
private volatile boolean isLeader;
private AtomicBoolean isShutdown = new AtomicBoolean(false);
private TopologyMetricsRunnable metricRunnable;
private AsyncLoopThread metricLoopThread;
// The topologys which has been submitted, but the assignment is not
// finished
private TimeCacheMap<String, Object> pendingSubmitTopologys;
private Map<String, Integer> topologyTaskTimeout;
// Map<TopologyId, TasksHeartbeat>
private Map<String, TopologyTaskHbInfo> tasksHeartbeat;
private final JStormMetricCache metricCache;
private final String clusterName;
private JStormMetricsReporter metricsReporter;
@SuppressWarnings({"unchecked", "rawtypes"})
public NimbusData(Map conf, INimbus inimbus) throws Exception {
this.conf = conf;
createFileHandler();
this.submittedCount = new AtomicInteger(0);
this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
createCache();
this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
this.statusTransition = new StatusTransition(this);
this.startTime = TimeUtils.current_time_secs();
this.inimubs = inimbus;
localMode = StormConfig.local_mode(conf);
this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
this.clusterName = ConfigExtension.getClusterName(conf);
this.metricRunnable = new TopologyMetricsRunnable(this);
this.metricRunnable.init();
pendingSubmitTopologys = new TimeCacheMap<String, Object>(JStormUtils.MIN_30);
topologyTaskTimeout = new ConcurrentHashMap<String, Integer>();
tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>();
if (!localMode) {
startMetricThreads();
}
}
public void startMetricThreads() {
this.metricRunnable.start();
// init nimbus metric reporter
this.metricsReporter = new JStormMetricsReporter(this);
this.metricsReporter.init();
}
public void createFileHandler() {
TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
};
int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
}
public void createCache() throws IOException {
nimbusCache = new NimbusCache(conf, stormClusterState);
((StormZkClusterState) stormClusterState).setCache(nimbusCache.getMemCache());
}
public String getClusterName() {
return clusterName;
}
public int uptime() {
return (TimeUtils.current_time_secs() - startTime);
}
public Map<Object, Object> getConf() {
return conf;
}
public void setConf(Map<Object, Object> conf) {
this.conf = conf;
}
public StormClusterState getStormClusterState() {
return stormClusterState;
}
public void setStormClusterState(StormClusterState stormClusterState) {
this.stormClusterState = stormClusterState;
}
public ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> getTaskHeartbeatsCache() {
return taskHeartbeatsCache;
}
public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String topologyId, boolean createIfNotExist) {
Map<Integer, TkHbCacheTime> ret = null;
ret = taskHeartbeatsCache.get(topologyId);
if (ret == null && createIfNotExist) {
ret = new ConcurrentHashMap<Integer, TkHbCacheTime>();
taskHeartbeatsCache.put(topologyId, ret);
}
return ret;
}
public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
this.taskHeartbeatsCache = taskHeartbeatsCache;
}
public TimeCacheMap<Object, Object> getDownloaders() {
return downloaders;
}
public void setDownloaders(TimeCacheMap<Object, Object> downloaders) {
this.downloaders = downloaders;
}
public TimeCacheMap<Object, Object> getUploaders() {
return uploaders;
}
public void setUploaders(TimeCacheMap<Object, Object> uploaders) {
this.uploaders = uploaders;
}
public int getStartTime() {
return startTime;
}
public void setStartTime(int startTime) {
this.startTime = startTime;
}
public AtomicInteger getSubmittedCount() {
return submittedCount;
}
public ScheduledExecutorService getScheduExec() {
return scheduExec;
}
public StatusTransition getStatusTransition() {
return statusTransition;
}
public void cleanup() {
nimbusCache.cleanup();
LOG.info("Successfully shutdown Cache");
try {
stormClusterState.disconnect();
LOG.info("Successfully shutdown ZK Cluster Instance");
} catch (Exception e) {
// TODO Auto-generated catch block
}
try {
scheduExec.shutdown();
LOG.info("Successfully shutdown threadpool");
} catch (Exception e) {
}
uploaders.cleanup();
downloaders.cleanup();
}
public INimbus getInimubs() {
return inimubs;
}
public boolean isLocalMode() {
return localMode;
}
public boolean isLeader() {
return isLeader;
}
public void setLeader(boolean isLeader) {
this.isLeader = isLeader;
}
public AtomicBoolean getIsShutdown() {
return isShutdown;
}
public JStormCache getMemCache() {
return nimbusCache.getMemCache();
}
public JStormCache getDbCache() {
return nimbusCache.getDbCache();
}
public NimbusCache getNimbusCache() {
return nimbusCache;
}
public JStormMetricCache getMetricCache() {
return metricCache;
}
public final TopologyMetricsRunnable getMetricRunnable() {
return metricRunnable;
}
public TimeCacheMap<String, Object> getPendingSubmitTopoloygs() {
return pendingSubmitTopologys;
}
public Map<String, Integer> getTopologyTaskTimeout() {
return topologyTaskTimeout;
}
public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() {
return tasksHeartbeat;
}
}