blob: 778b1a03b6d8fed3532117e864fd002a58e0a048 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.GZipUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Project;
import org.apache.streampark.console.core.enums.BuildStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.ProjectMapper;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.ProjectBuildTask;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.flink.configuration.MemorySize;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
implements ProjectService {
@Autowired private ApplicationManageService applicationManageService;
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Qualifier("streamparkBuildExecutor")
@Autowired
private Executor executorService;
@Override
public RestResponse create(Project project) {
LambdaQueryWrapper<Project> queryWrapper =
new LambdaQueryWrapper<Project>().eq(Project::getName, project.getName());
long count = count(queryWrapper);
RestResponse response = RestResponse.success();
ApiAlertException.throwIfTrue(count > 0, "project name already exists, add project failed");
Date date = new Date();
project.setCreateTime(date);
project.setModifyTime(date);
boolean status = save(project);
if (status) {
return response.message("Add project successfully").data(true);
}
return response.message("Add project failed").data(false);
}
@Override
public boolean update(Project projectParam) {
Project project = getById(projectParam.getId());
AssertUtils.notNull(project);
ApiAlertException.throwIfFalse(
project.getTeamId().equals(projectParam.getTeamId()),
"TeamId can't be changed, update project failed.");
ApiAlertException.throwIfFalse(
!project.getBuildState().equals(BuildStateEnum.BUILDING.get()),
"The project is being built, update project failed.");
updateInternal(projectParam, project);
if (project.isSshRepositoryUrl()) {
project.setUserName(null);
} else {
project.setPrvkeyPath(null);
}
if (projectParam.getBuildState() != null) {
project.setBuildState(projectParam.getBuildState());
if (BuildStateEnum.NEED_REBUILD == BuildStateEnum.of(projectParam.getBuildState())) {
List<Application> applications = listApps(project);
// Update deployment status
applications.forEach(
(app) -> {
log.info(
"update deploy by project: {}, appName:{}", project.getName(), app.getJobName());
app.setRelease(ReleaseStateEnum.NEED_CHECK.get());
applicationManageService.updateRelease(app);
});
}
}
baseMapper.updateById(project);
return true;
}
private static void updateInternal(Project projectParam, Project project) {
project.setName(projectParam.getName());
project.setUrl(projectParam.getUrl());
project.setBranches(projectParam.getBranches());
project.setPrvkeyPath(projectParam.getPrvkeyPath());
project.setUserName(projectParam.getUserName());
project.setPassword(projectParam.getPassword());
project.setPom(projectParam.getPom());
project.setDescription(projectParam.getDescription());
project.setBuildArgs(projectParam.getBuildArgs());
}
@Override
public boolean removeById(Long id) {
Project project = getById(id);
AssertUtils.notNull(project);
LambdaQueryWrapper<Application> queryWrapper =
new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id);
long count = applicationManageService.count(queryWrapper);
if (count > 0) {
return false;
}
try {
project.delete();
super.removeById(id);
return true;
} catch (IOException e) {
return false;
}
}
@Override
public IPage<Project> getPage(Project project, RestRequest request) {
Page<Project> page = MybatisPager.getPage(request);
return this.baseMapper.selectPage(page, project);
}
@Override
public Boolean existsByTeamId(Long teamId) {
return this.baseMapper.existsByTeamId(teamId);
}
@Override
public List<Project> listByTeamId(Long teamId) {
return this.baseMapper.selectProjectsByTeamId(teamId);
}
@Override
public void build(Long id) throws Exception {
Project project = getById(id);
this.baseMapper.updateBuildState(project.getId(), BuildStateEnum.BUILDING.get());
String logPath = getBuildLogPath(id);
ProjectBuildTask projectBuildTask =
new ProjectBuildTask(
logPath,
project,
buildStateEnum -> {
baseMapper.updateBuildState(id, buildStateEnum.get());
if (buildStateEnum == BuildStateEnum.SUCCESSFUL) {
baseMapper.updateBuildTime(id);
}
flinkAppHttpWatcher.init();
},
fileLogger -> {
List<Application> applications =
this.applicationManageService.listByProjectId(project.getId());
applications.forEach(
(app) -> {
fileLogger.info(
"update deploy by project: {}, appName:{}",
project.getName(),
app.getJobName());
app.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
app.setBuild(true);
this.applicationManageService.updateRelease(app);
});
flinkAppHttpWatcher.init();
});
CompletableFuture<Void> buildTask =
CompletableFuture.runAsync(projectBuildTask, executorService);
// TODO May need to define parameters to set the build timeout in the future.
CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
}
@Override
public List<String> listModules(Long id) {
Project project = getById(id);
AssertUtils.notNull(project);
if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
|| !project.getDistHome().exists()) {
return Collections.emptyList();
}
File[] files = project.getDistHome().listFiles();
return files == null
? Collections.emptyList()
: Stream.of(files).map(File::getName).collect(Collectors.toList());
}
@Override
public List<String> listJars(Project project) {
List<String> jarList = new ArrayList<>(0);
ApiAlertException.throwIfNull(
project.getModule(), "Project module can't be null, please check.");
File apps = new File(project.getDistHome(), project.getModule());
for (File file : Objects.requireNonNull(apps.listFiles())) {
if (file.getName().endsWith(Constant.JAR_SUFFIX)) {
jarList.add(file.getName());
}
}
return jarList;
}
@Override
public String getAppConfPath(Long id, String module) {
Project project = getById(id);
File appHome = project.getDistHome();
Optional<File> fileOptional =
Arrays.stream(Objects.requireNonNull(appHome.listFiles()))
.filter((x) -> x.getName().equals(module))
.findFirst();
return fileOptional.map(File::getAbsolutePath).orElse(null);
}
@Override
public List<Application> listApps(Project project) {
return this.applicationManageService.listByProjectId(project.getId());
}
@Override
public boolean exists(Project project) {
if (project.getId() != null) {
Project proj = getById(project.getId());
if (proj.getName().equals(project.getName())) {
return false;
}
}
LambdaQueryWrapper<Project> queryWrapper =
new LambdaQueryWrapper<Project>().eq(Project::getName, project.getName());
return this.baseMapper.selectCount(queryWrapper) > 0;
}
@Override
public List<Map<String, Object>> listConf(Project project) {
try {
File file = new File(project.getDistHome(), project.getModule());
File unzipFile = new File(file.getAbsolutePath().replaceAll(".tar.gz", ""));
if (!unzipFile.exists()) {
GZipUtils.deCompress(file.getAbsolutePath(), file.getParentFile().getAbsolutePath());
}
List<Map<String, Object>> confList = new ArrayList<>();
File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
AssertUtils.notNull(files);
for (File item : files) {
eachFile(item, confList, true);
}
return confList;
} catch (Exception e) {
log.info(e.getMessage());
}
return null;
}
private void eachFile(File file, List<Map<String, Object>> list, Boolean isRoot) {
if (file != null && file.exists() && file.listFiles() != null) {
if (isRoot) {
Map<String, Object> fileMap = new HashMap<>(0);
fileMap.put("key", file.getName());
fileMap.put("title", file.getName());
fileMap.put("value", file.getAbsolutePath());
List<Map<String, Object>> children = new ArrayList<>();
eachFile(file, children, false);
if (!children.isEmpty()) {
fileMap.put("children", children);
}
list.add(fileMap);
} else {
for (File item : Objects.requireNonNull(file.listFiles())) {
String title = item.getName();
String value = item.getAbsolutePath();
Map<String, Object> fileMap = new HashMap<>(0);
fileMap.put("key", title);
fileMap.put("title", title);
fileMap.put("value", value);
List<Map<String, Object>> children = new ArrayList<>();
eachFile(item, children, false);
if (!children.isEmpty()) {
fileMap.put("children", children);
}
list.add(fileMap);
}
}
}
}
@Override
public RestResponse getBuildLog(Long id, Long startOffset) {
File logFile = Paths.get(getBuildLogPath(id)).toFile();
if (!logFile.exists()) {
String errorMsg =
String.format("Build log file(fileName=%s) not found, please build first.", logFile);
log.warn(errorMsg);
return RestResponse.success().data(errorMsg);
}
boolean isBuilding = this.getById(id).getBuildState() == 0;
byte[] fileContent;
long endOffset = 0L;
boolean readFinished = true;
// Read log from earliest when project is building
if (startOffset == null && isBuilding) {
startOffset = 0L;
}
try {
long maxSize =
MemorySize.parse(InternalConfigHolder.get(CommonConfig.READ_LOG_MAX_SIZE())).getBytes();
if (startOffset == null) {
fileContent = FileUtils.readEndOfFile(logFile, maxSize);
} else {
fileContent = FileUtils.readFileFromOffset(logFile, startOffset, maxSize);
endOffset = startOffset + fileContent.length;
readFinished = logFile.length() == endOffset && !isBuilding;
}
return RestResponse.success()
.data(new String(fileContent, StandardCharsets.UTF_8))
.put("offset", endOffset)
.put("readFinished", readFinished);
} catch (IOException e) {
String error =
String.format("Read build log file(fileName=%s) caused an exception: ", logFile);
log.error(error, e);
return RestResponse.fail(ResponseCode.CODE_FAIL, error + e.getMessage());
}
}
private String getBuildLogPath(Long projectId) {
return String.format("%s/%s/build.log", Workspace.PROJECT_BUILD_LOG_PATH(), projectId);
}
}