blob: f702ad0042d1cd81b8143ced12bc6fa19cfea418 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.bean.Pom;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.enums.ResourceType;
import org.apache.streampark.console.core.mapper.ResourceMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ResourceServiceImpl extends ServiceImpl<ResourceMapper, Resource>
implements ResourceService {
@Autowired private ApplicationService applicationService;
@Autowired private CommonService commonService;
@Autowired private FlinkSqlService flinkSqlService;
@Override
public IPage<Resource> page(Resource resource, RestRequest restRequest) {
if (resource.getTeamId() == null) {
return null;
}
Page<Resource> page = new MybatisPager<Resource>().getDefaultPage(restRequest);
return this.baseMapper.page(page, resource);
}
/**
* check resource exists by user id
*
* @param userId user id
* @return true if exists
*/
@Override
public boolean existsByUserId(Long userId) {
return this.baseMapper.existsByUserId(userId);
}
@Override
public void addResource(Resource resource) {
String resourceStr = resource.getResource();
ApiAlertException.throwIfNull(resourceStr, "Please add pom or jar resource.");
if (resource.getResourceType() == ResourceType.GROUP) {
ApiAlertException.throwIfNull(
resource.getResourceName(), "The name of resource group is required.");
} else {
Dependency dependency = Dependency.toDependency(resourceStr);
List<String> jars = dependency.getJar();
List<Pom> poms = dependency.getPom();
ApiAlertException.throwIfTrue(
jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
ApiAlertException.throwIfTrue(
jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
ApiAlertException.throwIfTrue(
resource.getResourceType() == ResourceType.FLINK_APP && jars.isEmpty(),
"Please upload jar for Flink_App resource");
Long teamId = resource.getTeamId();
String resourceName = null;
if (poms.isEmpty()) {
resourceName = jars.get(0);
ApiAlertException.throwIfTrue(
this.findByResourceName(teamId, resourceName) != null,
String.format("Sorry, the resource %s already exists.", resourceName));
// copy jar to team upload directory
transferTeamResource(teamId, resourceName);
} else {
Pom pom = poms.get(0);
resourceName =
String.format("%s:%s:%s", pom.getGroupId(), pom.getArtifactId(), pom.getVersion());
if (StringUtils.isNotBlank(pom.getClassifier())) {
resourceName = resourceName + ":" + pom.getClassifier();
}
ApiAlertException.throwIfTrue(
this.findByResourceName(teamId, resourceName) != null,
String.format("Sorry, the resource %s already exists.", resourceName));
}
resource.setResourceName(resourceName);
}
resource.setCreatorId(commonService.getUserId());
this.save(resource);
}
@Override
public Resource findByResourceName(Long teamId, String name) {
LambdaQueryWrapper<Resource> queryWrapper =
new LambdaQueryWrapper<Resource>()
.eq(Resource::getResourceName, name)
.eq(Resource::getTeamId, teamId);
return baseMapper.selectOne(queryWrapper);
}
@Override
public void updateResource(Resource resource) {
Resource findResource = getById(resource.getId());
checkOrElseAlert(findResource);
String resourceName = resource.getResourceName();
if (resourceName != null) {
ApiAlertException.throwIfFalse(
resourceName.equals(findResource.getResourceName()),
"Please make sure the resource name is not changed.");
transferTeamResource(findResource.getTeamId(), resourceName);
}
findResource.setDescription(resource.getDescription());
baseMapper.updateById(findResource);
}
@Override
public void deleteResource(Resource resource) {
Resource findResource = getById(resource.getId());
checkOrElseAlert(findResource);
FsOperator.lfs()
.delete(
String.format(
"%s/%d/%s",
Workspace.local().APP_UPLOADS(),
findResource.getTeamId(),
findResource.getResourceName()));
this.removeById(resource);
}
public List<Resource> findByTeamId(Long teamId) {
LambdaQueryWrapper<Resource> queryWrapper =
new LambdaQueryWrapper<Resource>().eq(Resource::getTeamId, teamId);
return baseMapper.selectList(queryWrapper);
}
/**
* change resource owner
*
* @param userId original user id
* @param targetUserId target user id
*/
@Override
public void changeOwnership(Long userId, Long targetUserId) {
LambdaUpdateWrapper<Resource> updateWrapper =
new LambdaUpdateWrapper<Resource>()
.eq(Resource::getCreatorId, userId)
.set(Resource::getCreatorId, targetUserId);
this.baseMapper.update(null, updateWrapper);
}
private void transferTeamResource(Long teamId, String resourceName) {
String teamUploads = String.format("%s/%d", Workspace.local().APP_UPLOADS(), teamId);
if (!FsOperator.lfs().exists(teamUploads)) {
FsOperator.lfs().mkdirs(teamUploads);
}
File localJar = new File(WebUtils.getAppTempDir(), resourceName);
File teamUploadJar = new File(teamUploads, resourceName);
ApiAlertException.throwIfFalse(
localJar.exists(), "Missing file: " + resourceName + ", please upload again");
FsOperator.lfs()
.upload(localJar.getAbsolutePath(), teamUploadJar.getAbsolutePath(), false, true);
}
private void checkOrElseAlert(Resource resource) {
ApiAlertException.throwIfNull(resource, "The resource does not exist.");
ApiAlertException.throwIfTrue(
isDependByApplications(resource),
"Sorry, the resource is still in use, cannot be removed.");
}
private boolean isDependByApplications(Resource resource) {
return CollectionUtils.isNotEmpty(getResourceApplicationsById(resource));
}
private List<Application> getResourceApplicationsById(Resource resource) {
List<Application> dependApplications = new ArrayList<>();
List<Application> applications = applicationService.getByTeamId(resource.getTeamId());
Map<Long, Application> applicationMap =
applications.stream()
.collect(Collectors.toMap(Application::getId, application -> application));
// Get the application that depends on this resource
List<FlinkSql> flinkSqls = flinkSqlService.getByTeamId(resource.getTeamId());
for (FlinkSql flinkSql : flinkSqls) {
String sqlTeamResource = flinkSql.getTeamResource();
if (sqlTeamResource != null
&& sqlTeamResource.contains(String.valueOf(resource.getTeamId()))) {
Application app = applicationMap.get(flinkSql.getAppId());
if (!dependApplications.contains(app)) {
dependApplications.add(applicationMap.get(flinkSql.getAppId()));
}
}
}
return dependApplications;
}
}