blob: ab4213fe5eb18502586edc2f5010ee41fb4893b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.worker.hearbeat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
public class SyncContainerHb extends RunnableCallback {
private final static Logger LOG = LoggerFactory.getLogger(SyncContainerHb.class);
private String readDir;
private String writeDir;
private int timeoutSeconds = 60;
private int frequence = 10;
private int reserverNum = 10;
private int noContainerHbTimes = 0;
private boolean isFirstRead = true;
private static final int SECOND_MILLISCOND = 1000;
private static final int MAX_NO_CONTAINER_HB_TIMES = 30;
public void removeOld(List<String> fileList, String dir) {
if (fileList.size() <= reserverNum) {
// don't need remove old files
return;
}
int removeNum = fileList.size() - reserverNum;
for (int i = 0; i < removeNum; i++) {
String fileName = fileList.get(i);
String filePath = dir + File.separator + fileName;
try {
PathUtils.rmpath(filePath);
} catch (Exception e) {
LOG.error("Failed to delete " + filePath, e);
}
LOG.info("Remove hearbeat file " + filePath);
}
}
public void checkNoContainerHbTimes() {
noContainerHbTimes++;
if (noContainerHbTimes >= MAX_NO_CONTAINER_HB_TIMES) {
LOG.info("It's long time no container hearbeat");
throw new RuntimeException("It's long time no container hearbeat");
}
}
public void handlReadDir() {
if (StringUtils.isBlank(readDir) == true) {
return;
}
File file = new File(readDir);
if (file.exists() == false) {
LOG.info(readDir + " doesn't exist right now");
checkNoContainerHbTimes();
return;
} else if (file.isDirectory() == false) {
String msg = readDir + " isn't dir";
LOG.error(msg);
throw new RuntimeException(msg);
}
String[] files = file.list();
if (files.length == 0) {
LOG.info(readDir + " doesn't contain hearbeat files right now");
checkNoContainerHbTimes();
return;
}
noContainerHbTimes = 0;
List<String> fileList = JStormUtils.mk_list(files);
Collections.sort(fileList);
// removeOld(fileList);
String biggest = fileList.get(fileList.size() - 1);
long now = System.currentTimeMillis() / SECOND_MILLISCOND;
long hb = 0;
try {
hb = Long.valueOf(biggest);
} catch (Exception e) {
LOG.info("Heartbeat file " + biggest + " isn't a valid file, remove it");
String path = readDir + File.separator + biggest;
try {
PathUtils.rmpath(path);
} catch (Exception e1) {
LOG.error("Failed to delete " + path, e1);
}
}
if (now - hb > timeoutSeconds) {
if (isFirstRead == true) {
checkNoContainerHbTimes();
return;
}
StringBuilder sb = new StringBuilder();
sb.append("It's long time no container's hearbeat, ");
sb.append("ContainerDir:").append(readDir);
sb.append(",last hearbeat:").append(biggest);
LOG.error(sb.toString());
throw new RuntimeException(sb.toString());
} else {
isFirstRead = false;
LOG.info("Receive container hearbeat " + biggest);
}
}
public void handleWriteDir() {
if (StringUtils.isBlank(writeDir) == true) {
return;
}
String seconds = String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND);
String path = writeDir + File.separator + seconds;
try {
PathUtils.touch(path);
LOG.info("Successfully touch " + path);
} catch (IOException e) {
LOG.error("Failed to touch " + path, e);
throw new RuntimeException("Failed to touch " + path);
}
File file = new File(writeDir);
String[] files = file.list();
if (files.length == 0) {
LOG.info(readDir + " doesn't contain hearbeat files right now");
return;
}
List<String> fileList = JStormUtils.mk_list(files);
Collections.sort(fileList);
removeOld(fileList, writeDir);
}
@Override
public void run() {
handleWriteDir();
handlReadDir();
}
@Override
public void shutdown() {
frequence = -1;
LOG.info("Shutdown sync container thread");
}
public Object getResult() {
return frequence;
}
public String getReadDir() {
return readDir;
}
public void resetReadHeatbeats() {
File file = new File(readDir);
if (file.exists() == false) {
LOG.info("Read hearbeat directory hasn't been created " + readDir);
return;
} else if (file.isDirectory() == false) {
LOG.error(readDir + " isn't a directory ");
throw new RuntimeException(readDir + " isn't a directory ");
}
String[] files = file.list();
for (String fileName : files) {
String path = readDir + File.separator + fileName;
try {
PathUtils.rmr(path);
} catch (IOException e) {
// TODO Auto-generated catch block
LOG.error("Failed to remove " + path, e);
}
}
LOG.info("Successfully reset read heatbeats " + readDir);
}
public void setReadDir(String readDir) {
this.readDir = readDir;
if (StringUtils.isBlank(readDir) == true) {
LOG.warn("ReadDir is empty");
return;
} else {
LOG.info("ReadDir is " + readDir);
}
}
public int getTimeoutSeconds() {
return timeoutSeconds;
}
public void setTimeoutSeconds(int timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
}
public int getFrequence() {
return frequence;
}
public void setFrequence(int frequence) {
this.frequence = frequence;
}
public String getWriteDir() {
return writeDir;
}
public void setWriteDir(String writeDir) {
this.writeDir = writeDir;
if (StringUtils.isBlank(writeDir) == true) {
LOG.warn("writeDir is empty");
return;
} else {
LOG.info("writeDir is " + writeDir);
}
File file = new File(writeDir);
if (file.exists() == false) {
file.mkdirs();
LOG.info("Create Directory " + writeDir);
return;
} else if (file.isDirectory() == false) {
LOG.error(writeDir + " isn't a directory ");
throw new RuntimeException(writeDir + " isn't a directory ");
}
return;
}
public int getReserverNum() {
return reserverNum;
}
public void setReserverNum(int reserverNum) {
this.reserverNum = reserverNum;
}
public static AsyncLoopThread mkInstance(String containerHbDir, String hbDir, int timeout, int frequence) {
SyncContainerHb syncContainerHbThread = new SyncContainerHb();
syncContainerHbThread.setReadDir(containerHbDir);
syncContainerHbThread.setWriteDir(hbDir);
syncContainerHbThread.setTimeoutSeconds(timeout);
syncContainerHbThread.setFrequence(frequence);
StringBuilder sb = new StringBuilder();
sb.append("Run process under Apsara/Yarn container");
sb.append("ContainerDir:").append(containerHbDir);
sb.append("MyDir:").append(hbDir);
sb.append(", timeout:").append(timeout);
sb.append(",frequence:").append(frequence);
LOG.info(sb.toString());
AsyncLoopThread thread = new AsyncLoopThread(syncContainerHbThread, true, Thread.NORM_PRIORITY, true);
return thread;
}
public static AsyncLoopThread mkNimbusInstance(Map conf) throws IOException {
boolean isEnable = ConfigExtension.isEnableContainerNimbus();
if (isEnable == false) {
LOG.info("Run nimbus without Apsara/Yarn container");
return null;
}
String containerHbDir = ConfigExtension.getContainerNimbusHearbeat();
String hbDir = StormConfig.masterHearbeatForContainer(conf);
int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
public static AsyncLoopThread mkSupervisorInstance(Map conf) throws IOException {
boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
if (isEnableContainer) {
String containerHbDir = ConfigExtension.getContainerSupervisorHearbeat();
String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
if (isWorkerAutomaticStop) {
String containerHbDir = null;
String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
LOG.info("Run Supervisor without Apsara/Yarn container");
return null;
}
public static AsyncLoopThread mkWorkerInstance(Map conf) throws IOException {
boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
if (isEnableContainer == false && isWorkerAutomaticStop == false) {
LOG.info("Run worker without Apsara/Yarn container");
return null;
}
String containerHbDir = StormConfig.supervisorHearbeatForContainer(conf);
String hbDir = null;
int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
}