blob: f29f93eed33a38eb2edb08c1e159e3fce61f227d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
import java.util.Collection;
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan(basePackages = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.service.process.*",
"org.apache.dolphinscheduler.service.queue.*",
})
})
public class WorkerServer implements IStoppable {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* spring application context
* only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
/**
* alert model netty remote server
*/
@Autowired
private AlertClientService alertClientService;
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* worker registry
*/
@Autowired
private WorkerRegistryClient workerRegistryClient;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkerRpcServer workerRpcServer;
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private MessageRetryRunner messageRetryRunner;
/**
* worker server startup, not use web service
*
* @param args arguments
*/
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
SpringApplication.run(WorkerServer.class);
}
@PostConstruct
public void run() {
this.workerRpcServer.start();
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
this.workerManagerThread.start();
this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("WorkerServer shutdown hook");
}
}));
}
public void close(String cause) {
if (!ServerLifeCycleManager.toStopped()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (
WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient = workerRegistryClient;
AlertClientService closedAlertClientService = alertClientService;
SpringApplicationContext closedSpringContext = springApplicationContext;) {
logger.info("Worker server is stopping, current cause : {}", cause);
// kill running tasks
this.killAllRunningTasks();
} catch (Exception e) {
logger.error("Worker server stop failed, current cause: {}", cause, e);
return;
}
logger.info("Worker server stopped, current cause: {}", cause);
}
@Override
public void stop(String cause) {
close(cause);
}
/**
* kill all tasks which are running
*/
public void killAllRunningTasks() {
Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
if (CollectionUtils.isEmpty(taskRequests)) {
return;
}
logger.info("Worker begin to kill all cache task, task size: {}", taskRequests.size());
int killNumber = 0;
for (TaskExecutionContext taskRequest : taskRequests) {
// kill task when it's not finished yet
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
taskRequest.getTaskInstanceId());
if (ProcessUtils.kill(taskRequest)) {
killNumber++;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(),
killNumber);
}
}