blob: 412eabef4320c6eeecee01ac5b5c60cd74da6073 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.streampark.console.core.controller;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.ApiDocConstant;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.annotation.PermissionAction;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
import org.apache.streampark.console.core.enums.PermissionTypeEnum;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import com.baomidou.mybatisplus.core.metadata.IPage;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import java.util.Map;
public class ApplicationController {
@Autowired private ApplicationManageService applicationManageService;
@Autowired private ApplicationActionService applicationActionService;
@Autowired private ApplicationInfoService applicationInfoService;
@Autowired private ApplicationBackUpService backUpService;
@Autowired private ApplicationLogService applicationLogService;
@Autowired private ResourceService resourceService;
@Operation(summary = "Get application")
public RestResponse get(Application app) {
Application application = applicationManageService.getApp(app);
return RestResponse.success(application);
@Operation(summary = "Create application")
@PermissionAction(id = "#app.teamId", type = PermissionTypeEnum.TEAM)
public RestResponse create(Application app) throws IOException {
boolean saved = applicationManageService.create(app);
return RestResponse.success(saved);
summary = "Copy application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
name = "id",
description = "copied target app id",
in = ParameterIn.QUERY,
required = true,
example = "100000"),
name = "jobName",
description = "new application name",
in = ParameterIn.QUERY,
example = "copy-app"),
@Parameter(name = "args", description = "new application args", in = ParameterIn.QUERY)
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
@PostMapping(value = "copy")
public RestResponse copy(@Parameter(hidden = true) Application app) throws IOException {
return RestResponse.success();
@Operation(summary = "Update application")
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
public RestResponse update(Application app) {
return RestResponse.success(true);
@Operation(summary = "Get applications dashboard data")
public RestResponse dashboard(Long teamId) {
Map<String, Serializable> dashboardMap = applicationInfoService.getDashboardDataMap(teamId);
return RestResponse.success(dashboardMap);
@Operation(summary = "List applications")
public RestResponse list(Application app, RestRequest request) {
IPage<Application> applicationList =, request);
return RestResponse.success(applicationList);
@Operation(summary = "Mapping application")
public RestResponse mapping(Application app) {
boolean flag = applicationManageService.mapping(app);
return RestResponse.success(flag);
@Operation(summary = "Revoke application")
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
public RestResponse revoke(Application app) {
return RestResponse.success();
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
@PostMapping(value = "check_start")
public RestResponse checkStart(Application app) {
AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
return RestResponse.success(stateEnum.get());
summary = "Start application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
name = "id",
description = "start app id",
in = ParameterIn.QUERY,
required = true,
example = "100000",
schema = @Schema(implementation = Long.class)),
name = "savePointed",
description = "restored app from the savepoint or latest checkpoint",
in = ParameterIn.QUERY,
required = true,
example = "false",
schema = @Schema(implementation = boolean.class, defaultValue = "false")),
name = "savePoint",
description = "savepoint or checkpoint path",
in = ParameterIn.QUERY,
schema = @Schema(implementation = String.class)),
name = "allowNonRestored",
description = "ignore savepoint if cannot be restored",
in = ParameterIn.QUERY,
required = false,
schema = @Schema(implementation = boolean.class, defaultValue = "false"))
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
@PostMapping(value = "start")
public RestResponse start(@Parameter(hidden = true) Application app) {
try {
applicationActionService.start(app, false);
return RestResponse.success(true);
} catch (Exception e) {
return RestResponse.success(false).message(e.getMessage());
summary = "Cancel application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
name = "id",
description = "cancel app id",
in = ParameterIn.QUERY,
required = true,
example = "100000",
schema = @Schema(implementation = Long.class)),
name = "savePointed",
description = "trigger savepoint before taking stopping",
in = ParameterIn.QUERY,
required = true,
schema = @Schema(implementation = boolean.class, defaultValue = "false")),
name = "savePoint",
description = "savepoint path",
in = ParameterIn.QUERY,
example = "hdfs:///savepoint/100000",
schema = @Schema(implementation = String.class)),
name = "drain",
description = "send max watermark before canceling",
in = ParameterIn.QUERY,
required = true,
example = "false",
schema = @Schema(implementation = boolean.class, defaultValue = "false")),
name = "nativeFormat",
description = "use savepoint native format",
in = ParameterIn.QUERY,
required = true,
example = "false",
schema = @Schema(implementation = boolean.class, defaultValue = "false"))
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
@PostMapping(value = "cancel")
public RestResponse cancel(@Parameter(hidden = true) Application app) throws Exception {
return RestResponse.success();
@Operation(summary = "Clean application")
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
public RestResponse clean(Application app) {
return RestResponse.success(true);
/** force stop(stop normal start or in progress) */
@Operation(summary = "Force stop application")
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
public RestResponse forcedStop(Application app) {
return RestResponse.success();
@Operation(summary = "Get application on yarn proxy address")
public RestResponse yarn() {
return RestResponse.success(YarnUtils.getRMWebAppProxyURL());
@Operation(summary = "Get application on yarn name")
public RestResponse yarnName(Application app) {
String yarnName = applicationInfoService.getYarnName(app);
return RestResponse.success(yarnName);
@Operation(summary = "Check the application exist status")
public RestResponse checkName(Application app) {
AppExistsStateEnum exists = applicationInfoService.checkExists(app);
return RestResponse.success(exists.get());
@Operation(summary = "Get application conf")
public RestResponse readConf(Application app) throws IOException {
String config = applicationInfoService.readConf(app);
return RestResponse.success(config);
@Operation(summary = "Get application main-class")
public RestResponse getMain(Application application) {
String mainClass = applicationInfoService.getMain(application);
return RestResponse.success(mainClass);
@Operation(summary = "List application backups")
public RestResponse backups(ApplicationBackUp backUp, RestRequest request) {
IPage<ApplicationBackUp> backups = backUpService.getPage(backUp, request);
return RestResponse.success(backups);
@Operation(summary = "List application operation logs")
public RestResponse optionlog(ApplicationLog applicationLog, RestRequest request) {
IPage<ApplicationLog> applicationList = applicationLogService.getPage(applicationLog, request);
return RestResponse.success(applicationList);
@Operation(summary = "Delete application operation log")
@PermissionAction(id = "#applicationLog.appId", type = PermissionTypeEnum.APP)
public RestResponse deleteOperationLog(Long id) {
Boolean deleted = applicationLogService.removeById(id);
return RestResponse.success(deleted);
@Operation(summary = "Delete application")
@PermissionAction(id = "", type = PermissionTypeEnum.APP)
public RestResponse delete(Application app) throws InternalException {
Boolean deleted = applicationManageService.remove(app);
return RestResponse.success(deleted);
@Operation(summary = "Backup application when deleted")
@PermissionAction(id = "#backUp.appId", type = PermissionTypeEnum.APP)
public RestResponse deleteBak(ApplicationBackUp backUp) throws InternalException {
Boolean deleted = backUpService.removeById(backUp.getId());
return RestResponse.success(deleted);
@Operation(summary = "Check the application jar")
public RestResponse checkjar(String jar) {
File file = new File(jar);
try {
return RestResponse.success(true);
} catch (IOException e) {
return RestResponse.success(file).message(e.getLocalizedMessage());
@Operation(summary = "Upload the application jar")
public RestResponse upload(MultipartFile file) throws Exception {
String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
public RestResponse verifySchema(String path) {
final URI uri = URI.create(path);
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
RestResponse restResponse = RestResponse.success(true);
String error = null;
if (scheme == null) {
error =
"The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.";
} else if (pathPart == null) {
error =
"The path to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
} else if (pathPart.isEmpty() || "/".equals(pathPart)) {
error = "Cannot use the root directory for checkpoints.";
if (error != null) {
restResponse = RestResponse.success(false).message(error);
return restResponse;
@Operation(summary = "Check the application savepoint path")
public RestResponse checkSavepointPath(Application app) throws Exception {
String error = applicationInfoService.checkSavepointPath(app);
if (error == null) {
return RestResponse.success(true);
return RestResponse.success(false).message(error);
@Operation(summary = "Get application on k8s deploy logs")
name = "id",
description = "app id",
required = true,
example = "100000",
schema = @Schema(implementation = Long.class)),
name = "offset",
description = "number of log lines offset",
required = true,
example = "0",
schema = @Schema(implementation = int.class)),
name = "limit",
description = "number of log lines loaded at once",
required = true,
example = "100",
schema = @Schema(implementation = int.class)),
@PostMapping(value = "k8sStartLog")
public RestResponse k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
String resp = applicationInfoService.k8sStartLog(id, offset, limit);
return RestResponse.success(resp);