| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.server.log; |
| |
| import org.apache.dolphinscheduler.common.utils.JSONUtils; |
| import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
| import org.apache.dolphinscheduler.remote.command.Command; |
| import org.apache.dolphinscheduler.remote.command.CommandType; |
| import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; |
| import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; |
| import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; |
| import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; |
| import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; |
| import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; |
| import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; |
| import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; |
| import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
| import org.apache.dolphinscheduler.remote.utils.Constants; |
| import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; |
| |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.stereotype.Component; |
| |
| import io.netty.channel.Channel; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * logger request process logic |
| */ |
| @Component |
| public class LoggerRequestProcessor implements NettyRequestProcessor { |
| |
| private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class); |
| |
| private final ExecutorService executor; |
| |
| public LoggerRequestProcessor() { |
| this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1, |
| new NamedThreadFactory("Log-Request-Process-Thread")); |
| } |
| |
| @Override |
| public void process(Channel channel, Command command) { |
| logger.info("received command : {}", command); |
| |
| //request task log command type |
| final CommandType commandType = command.getType(); |
| switch (commandType) { |
| case GET_LOG_BYTES_REQUEST: |
| GetLogBytesRequestCommand getLogRequest = JSONUtils.parseObject( |
| command.getBody(), GetLogBytesRequestCommand.class); |
| String path = getLogRequest.getPath(); |
| if (!checkPathSecurity(path)) { |
| throw new IllegalArgumentException("Illegal path: " + path); |
| } |
| byte[] bytes = getFileContentBytes(path); |
| GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); |
| channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); |
| break; |
| case VIEW_WHOLE_LOG_REQUEST: |
| ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject( |
| command.getBody(), ViewLogRequestCommand.class); |
| String viewLogPath = viewLogRequest.getPath(); |
| if (!checkPathSecurity(viewLogPath)) { |
| throw new IllegalArgumentException("Illegal path: " + viewLogPath); |
| } |
| String msg = LoggerUtils.readWholeFileContent(viewLogPath); |
| ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); |
| channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); |
| break; |
| case ROLL_VIEW_LOG_REQUEST: |
| RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject( |
| command.getBody(), RollViewLogRequestCommand.class); |
| |
| String rollViewLogPath = rollViewLogRequest.getPath(); |
| if (!checkPathSecurity(rollViewLogPath)) { |
| throw new IllegalArgumentException("Illegal path: " + rollViewLogPath); |
| } |
| |
| List<String> lines = readPartFileContent(rollViewLogPath, |
| rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit()); |
| StringBuilder builder = new StringBuilder(); |
| final int MaxResponseLogSize = 65535; |
| int totalLogByteSize = 0; |
| for (String line : lines) { |
| //If a single line of log is exceed max response size, cut off the line |
| final int lineByteSize = line.getBytes(StandardCharsets.UTF_8).length; |
| if (lineByteSize >= MaxResponseLogSize) { |
| builder.append(line, 0, MaxResponseLogSize) |
| .append(" [this line's size ").append(lineByteSize).append(" bytes is exceed ") |
| .append(MaxResponseLogSize).append(" bytes, so only ") |
| .append(MaxResponseLogSize).append(" characters are reserved for performance reasons.]") |
| .append("\r\n"); |
| } else { |
| builder.append(line).append("\r\n"); |
| } |
| totalLogByteSize += lineByteSize; |
| if (totalLogByteSize >= MaxResponseLogSize) { |
| break; |
| } |
| } |
| RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString()); |
| channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque())); |
| break; |
| case REMOVE_TAK_LOG_REQUEST: |
| RemoveTaskLogRequestCommand removeTaskLogRequest = JSONUtils.parseObject( |
| command.getBody(), RemoveTaskLogRequestCommand.class); |
| |
| String taskLogPath = removeTaskLogRequest.getPath(); |
| if (!checkPathSecurity(taskLogPath)) { |
| throw new IllegalArgumentException("Illegal path: " + taskLogPath); |
| } |
| File taskLogFile = new File(taskLogPath); |
| boolean status = true; |
| try { |
| if (taskLogFile.exists()) { |
| status = taskLogFile.delete(); |
| } |
| } catch (Exception e) { |
| status = false; |
| } |
| |
| RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status); |
| channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque())); |
| break; |
| default: |
| throw new IllegalArgumentException("unknown commandType: " + commandType); |
| } |
| } |
| |
| /** |
| * LogServer only can read the logs dir. |
| * @param path |
| * @return |
| */ |
| private boolean checkPathSecurity(String path) { |
| String dsHome = System.getProperty("DOLPHINSCHEDULER_WORKER_HOME"); |
| if (StringUtils.isBlank(dsHome)) { |
| dsHome = System.getProperty("user.dir"); |
| } |
| if (StringUtils.isBlank(path)) { |
| logger.warn("path is null"); |
| return false; |
| } else { |
| return path.startsWith(dsHome) && !path.contains("../") && path.endsWith(".log"); |
| } |
| } |
| |
| public ExecutorService getExecutor() { |
| return this.executor; |
| } |
| |
| /** |
| * get files content bytes for download file |
| * |
| * @param filePath file path |
| * @return byte array of file |
| */ |
| private byte[] getFileContentBytes(String filePath) { |
| try (InputStream in = new FileInputStream(filePath); |
| ByteArrayOutputStream bos = new ByteArrayOutputStream()) { |
| byte[] buf = new byte[1024]; |
| int len; |
| while ((len = in.read(buf)) != -1) { |
| bos.write(buf, 0, len); |
| } |
| return bos.toByteArray(); |
| } catch (IOException e) { |
| logger.error("get file bytes error", e); |
| } |
| return new byte[0]; |
| } |
| |
| /** |
| * read part file content,can skip any line and read some lines |
| * |
| * @param filePath file path |
| * @param skipLine skip line |
| * @param limit read lines limit |
| * @return part file content |
| */ |
| private List<String> readPartFileContent(String filePath, |
| int skipLine, |
| int limit) { |
| File file = new File(filePath); |
| if (file.exists() && file.isFile()) { |
| try (Stream<String> stream = Files.lines(Paths.get(filePath))) { |
| return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); |
| } catch (IOException e) { |
| logger.error("read file error", e); |
| } |
| } else { |
| logger.info("file path: {} not exists", filePath); |
| } |
| return Collections.emptyList(); |
| } |
| |
| } |