blob: 1150b9102b54f8a28e89ec19ecb10586163f8e54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
import java.io.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class WorkerRpcServer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
@Autowired
private TaskDispatchProcessor taskDispatchProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskRejectAckProcessor taskRejectAckProcessor;
@Autowired
private TaskSavePointProcessor taskSavePointProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
@Autowired
private WorkerConfig workerConfig;
private NettyRemotingServer nettyRemotingServer;
public void start() {
LOGGER.info("Worker rpc server starting");
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
LOGGER.info("Worker rpc server started");
}
@Override
public void close() {
LOGGER.info("Worker rpc server closing");
this.nettyRemotingServer.close();
LOGGER.info("Worker rpc server closed");
}
}