blob: 2bf4c9c77a1d39e011cca66830a16e94d3bafcbf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.worker;
import backtype.storm.Config;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.*;
/**
* worker entrance
*
* @author yannian/Longda
*/
public class Worker {
private static Logger LOG = LoggerFactory.getLogger(Worker.class);
/**
* Why need workerData, it is for thread comeptition
*/
private WorkerData workerData;
@SuppressWarnings({ "rawtypes", "unchecked" })
public Worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
workerData = new WorkerData(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
}
/**
* get current task's output task list
*/
public static Set<Integer> worker_output_tasks(WorkerData workerData) {
ContextMaker context_maker = workerData.getContextMaker();
Set<Integer> task_ids = workerData.getTaskids();
StormTopology topology = workerData.getSysTopology();
Set<Integer> rtn = new HashSet<Integer>();
for (Integer taskid : task_ids) {
TopologyContext context = context_maker.makeTopologyContext(topology, taskid, null);
// <StreamId, <ComponentId, Grouping>>
Map<String, Map<String, Grouping>> targets = context.getThisTargets();
for (Map<String, Grouping> e : targets.values()) {
for (String componentId : e.keySet()) {
List<Integer> tasks = context.getComponentTasks(componentId);
rtn.addAll(tasks);
}
}
}
return rtn;
}
private RefreshConnections makeRefreshConnections() {
// get output streams of every task
Set<Integer> outboundTasks = worker_output_tasks(workerData);
workerData.initOutboundTaskStatus(outboundTasks);
workerData.setOutboundTasks(outboundTasks);
RefreshConnections refresh_connections = new RefreshConnections(workerData);
return refresh_connections;
}
private List<TaskShutdownDameon> createTasks() throws Exception {
List<TaskShutdownDameon> shutdowntasks =
new ArrayList<TaskShutdownDameon>();
Set<Integer> taskids = workerData.getTaskids();
Set<Thread> threads = new HashSet<Thread>();
List<Task> taskArrayList = new ArrayList<Task>();
for (int taskid : taskids) {
Task task = new Task(workerData, taskid);
Thread thread =new Thread(task);
threads.add(thread);
taskArrayList.add(task);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
for (Task t : taskArrayList){
shutdowntasks.add(t.getTaskShutdownDameon());
}
return shutdowntasks;
}
@Deprecated
private DisruptorQueue startDispatchDisruptor() {
Map stormConf = workerData.getStormConf();
int queue_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024);
WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, queue_size, waitStrategy);
// stop consumerStarted
recvQueue.consumerStarted();
return recvQueue;
}
private void startDispatchThread() {
// remove dispatch thread, send tuple directly from nettyserver
// startDispatchDisruptor();
IContext context = workerData.getContext();
String topologyId = workerData.getTopologyId();
IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues());
workerData.setRecvConnection(recvConnection);
}
public WorkerShutdown execute() throws Exception {
List<AsyncLoopThread> threads = new ArrayList<AsyncLoopThread>();
startDispatchThread();
// create client before create task
// so create client connection before create task
// refresh connection
RefreshConnections refreshConn = makeRefreshConnections();
AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
threads.add(refreshconn);
// refresh ZK active status
RefreshActive refreshZkActive = new RefreshActive(workerData);
AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
threads.add(refreshzk);
// Sync heartbeat to Apsara Container
AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
if (syncContainerHbThread != null) {
threads.add(syncContainerHbThread);
}
JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
metricReporter.init();
workerData.setMetricsReporter(metricReporter);
// refresh hearbeat to Local dir
RunnableCallback heartbeat_fn = new WorkerHeartbeatRunable(workerData);
AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null, Thread.NORM_PRIORITY, true);
threads.add(hb);
// shutdown task callbacks
List<TaskShutdownDameon> shutdowntasks = createTasks();
workerData.setShutdownTasks(shutdowntasks);
return new WorkerShutdown(workerData, threads);
}
/**
* create worker instance and run it
*
* @param conf
* @param topology_id
* @param supervisor_id
* @param port
* @param worker_id
* @return
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public static WorkerShutdown mk_worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path)
throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("topologyId:" + topology_id + ", ");
sb.append("port:" + port + ", ");
sb.append("workerId:" + worker_id + ", ");
sb.append("jarPath:" + jar_path + "\n");
LOG.info("Begin to run worker:" + sb.toString());
Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
w.redirectOutput();
return w.execute();
}
public void redirectOutput() {
if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) {
return;
}
String DEFAULT_OUT_TARGET_FILE = JStormUtils.getLogFileName();
if (DEFAULT_OUT_TARGET_FILE == null) {
DEFAULT_OUT_TARGET_FILE = "/dev/null";
} else {
DEFAULT_OUT_TARGET_FILE += ".out";
}
String outputFile = ConfigExtension.getWorkerRedirectOutputFile(workerData.getStormConf());
if (outputFile == null) {
outputFile = DEFAULT_OUT_TARGET_FILE;
} else {
try {
File file = new File(outputFile);
if (file.exists() == false) {
PathUtils.touch(outputFile);
} else {
if (file.isDirectory() == true) {
LOG.warn("Failed to write " + outputFile);
outputFile = DEFAULT_OUT_TARGET_FILE;
} else if (file.canWrite() == false) {
LOG.warn("Failed to write " + outputFile);
outputFile = DEFAULT_OUT_TARGET_FILE;
}
}
} catch (Exception e) {
LOG.warn("Failed to touch " + outputFile, e);
outputFile = DEFAULT_OUT_TARGET_FILE;
}
}
try {
JStormUtils.redirectOutput(outputFile);
} catch (Exception e) {
LOG.warn("Failed to redirect to " + outputFile, e);
}
}
/**
* Have one problem if the worker's start parameter length is longer than 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find worker
*
* @param port
*/
public static List<Integer> getOldPortPids(String port) {
String currPid = JStormUtils.process_pid();
List<Integer> ret = new ArrayList<Integer>();
StringBuilder sb = new StringBuilder();
sb.append("ps -Af ");
// sb.append(" | grep ");
// sb.append(Worker.class.getName());
// sb.append(" |grep ");
// sb.append(port);
// sb.append(" |grep -v grep");
try {
LOG.info("Begin to execute " + sb.toString());
Process process = JStormUtils.launch_process(sb.toString(), new HashMap<String, String>(), false);
// Process process = Runtime.getRuntime().exec(sb.toString());
InputStream stdin = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
JStormUtils.sleepMs(1000);
// if (process.exitValue() != 0) {
// LOG.info("Failed to execute " + sb.toString());
// return null;
// }
String str;
while ((str = reader.readLine()) != null) {
if (StringUtils.isBlank(str)) {
// LOG.info(str + " is Blank");
continue;
}
// LOG.info("Output:" + str);
if (str.contains(Worker.class.getName()) == false) {
continue;
} else if (str.contains(port) == false) {
continue;
}
LOG.info("Find :" + str);
String[] fields = StringUtils.split(str);
boolean find = false;
int i = 0;
for (; i < fields.length; i++) {
String field = fields[i];
LOG.debug("Filed, " + i + ":" + field);
if (field.contains(Worker.class.getName()) == true) {
if (i + 3 >= fields.length) {
LOG.info("Failed to find port ");
} else if (fields[i + 3].equals(String.valueOf(port))) {
find = true;
}
break;
}
}
if (find == false) {
LOG.info("No old port worker");
continue;
}
if (fields.length >= 2) {
try {
if (currPid.equals(fields[1])) {
LOG.info("Skip kill myself");
continue;
}
Integer pid = Integer.valueOf(fields[1]);
LOG.info("Find one process :" + pid.toString());
ret.add(pid);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
continue;
}
}
}
return ret;
} catch (IOException e) {
LOG.info("Failed to execute " + sb.toString());
return ret;
} catch (Exception e) {
LOG.info(e.getMessage(), e);
return ret;
}
}
public static void killOldWorker(String port) {
List<Integer> oldPids = getOldPortPids(port);
for (Integer pid : oldPids) {
JStormUtils.kill(pid);
}
}
/**
* worker entrance
*
* @param args
*/
@SuppressWarnings("rawtypes")
public static void main(String[] args) {
if (args.length < 5) {
StringBuilder sb = new StringBuilder();
sb.append("The length of args is less than 5 ");
for (String arg : args) {
sb.append(arg + " ");
}
LOG.error(sb.toString());
System.exit(-1);
}
StringBuilder sb = new StringBuilder();
try {
String topology_id = args[0];
String supervisor_id = args[1];
String port_str = args[2];
String worker_id = args[3];
String jar_path = args[4];
killOldWorker(port_str);
Map conf = Utils.readStormConfig();
StormConfig.validate_distributed_mode(conf);
JStormServerUtils.startTaobaoJvmMonitor();
sb.append("topologyId:" + topology_id + ", ");
sb.append("port:" + port_str + ", ");
sb.append("workerId:" + worker_id + ", ");
sb.append("jar_path:" + jar_path + "\n");
WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path);
sd.join();
LOG.info("Successfully shutdown worker " + sb.toString());
} catch (Throwable e) {
String errMsg = "Failed to create worker, " + sb.toString();
LOG.error(errMsg, e);
JStormUtils.halt_process(-1, errMsg);
}
}
}