[Improve] flink job on yarn exists check improvement (#3424)
Co-authored-by: benjobs <benjobx@gmail.com>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 6059143..412eabe 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -171,6 +171,14 @@
return RestResponse.success();
}
+ @PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP)
+ @PostMapping(value = "check_start")
+ @RequiresPermissions("app:start")
+ public RestResponse checkStart(Application app) {
+ AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
+ return RestResponse.success(stateEnum.get());
+ }
+
@Operation(
summary = "Start application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
index c86ce3c..8d1e0b1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
@@ -21,6 +21,8 @@
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
import com.baomidou.mybatisplus.extension.service.IService;
import java.io.IOException;
@@ -220,4 +222,18 @@
* @return A list of strings representing the names of the uploaded jars.
*/
List<String> listHistoryUploadJars();
+
+ /**
+ * check application before start
+ *
+ * @param appParam
+ * @return org.apache.streampark.console.core.enums.AppExistsStateEnum
+ */
+ AppExistsStateEnum checkStart(Application appParam);
+
+ /**
+ * @param appName
+ * @return running,submitted, accepted job list in YARN
+ */
+ List<ApplicationReport> getYarnAppReport(String appName);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 8f22ca5..85188dc 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -19,7 +19,6 @@
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
-import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -27,7 +26,6 @@
import org.apache.streampark.common.enums.FlinkRestoreMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
@@ -65,6 +63,7 @@
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
+import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -144,6 +143,8 @@
@Autowired private ApplicationManageService applicationManageService;
+ @Autowired private ApplicationInfoService applicationInfoService;
+
@Autowired private ApplicationConfigService configService;
@Autowired private ApplicationLogService applicationLogService;
@@ -229,7 +230,7 @@
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
- this.updateToStopped(appParam);
+ this.doStopped(appParam);
}
}
@@ -325,60 +326,60 @@
cancelFutureMap.put(application.getId(), cancelFuture);
- CompletableFutureUtils.runTimeout(
- cancelFuture,
- 10L,
- TimeUnit.MINUTES,
- cancelResponse -> {
- applicationLog.setSuccess(true);
- if (cancelResponse != null && cancelResponse.savePointDir() != null) {
- String savePointDir = cancelResponse.savePointDir();
- log.info("savePoint path: {}", savePointDir);
- SavePoint savePoint = new SavePoint();
- savePoint.setPath(savePointDir);
- savePoint.setAppId(application.getId());
- savePoint.setLatest(true);
- savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
- savePoint.setCreateTime(new Date());
- savePoint.setTriggerTime(triggerTime);
- savePointService.save(savePoint);
+ cancelFuture.whenComplete(
+ (cancelResponse, throwable) -> {
+ cancelFutureMap.remove(application.getId());
+
+ if (throwable != null) {
+ String exception = ExceptionUtils.stringifyException(throwable);
+ applicationLog.setException(exception);
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+
+ if (throwable instanceof CancellationException) {
+ doStopped(application);
+ } else {
+ log.error("stop flink job failed.", throwable);
+ application.setOptionState(OptionStateEnum.NONE.getValue());
+ application.setState(FlinkAppStateEnum.FAILED.getValue());
+ updateById(application);
+
+ if (appParam.getSavePointed()) {
+ savePointService.expire(application.getId());
}
+ // re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(application));
- }
- },
- e -> {
- if (e.getCause() instanceof CancellationException) {
- updateToStopped(application);
+ TrackId id = toTrackId(application);
+ k8SFlinkTrackMonitor.unWatching(id);
+ k8SFlinkTrackMonitor.doWatching(id);
} else {
- log.error("stop flink job fail.", e);
- application.setOptionState(OptionStateEnum.NONE.getValue());
- application.setState(FlinkAppStateEnum.FAILED.getValue());
- updateById(application);
-
- if (appParam.getSavePointed()) {
- savePointService.expire(application.getId());
- }
-
- // re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
- TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.unWatching(id);
- k8SFlinkTrackMonitor.doWatching(id);
- } else {
- FlinkAppHttpWatcher.unWatching(application.getId());
- }
-
- String exception = ExceptionUtils.stringifyException(e);
- applicationLog.setException(exception);
- applicationLog.setSuccess(false);
+ FlinkAppHttpWatcher.unWatching(application.getId());
}
- })
- .whenComplete(
- (t, e) -> {
- cancelFutureMap.remove(application.getId());
- applicationLogService.save(applicationLog);
- });
+ }
+ return;
+ }
+
+ applicationLog.setSuccess(true);
+ // save log...
+ applicationLogService.save(applicationLog);
+
+ if (cancelResponse != null && cancelResponse.savePointDir() != null) {
+ String savePointDir = cancelResponse.savePointDir();
+ log.info("savePoint path: {}", savePointDir);
+ SavePoint savePoint = new SavePoint();
+ savePoint.setPath(savePointDir);
+ savePoint.setAppId(application.getId());
+ savePoint.setLatest(true);
+ savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
+ savePoint.setCreateTime(new Date());
+ savePoint.setTriggerTime(triggerTime);
+ savePointService.save(savePoint);
+ }
+
+ if (isKubernetesApp(application)) {
+ k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+ }
+ });
}
@Override
@@ -392,7 +393,7 @@
if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
ApiAlertException.throwIfTrue(
- checkAppRepeatInYarn(application.getJobName()),
+ !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
}
@@ -486,98 +487,87 @@
startFutureMap.put(application.getId(), future);
- CompletableFutureUtils.runTimeout(
- future,
- 2L,
- TimeUnit.MINUTES,
- submitResponse -> {
- if (submitResponse.flinkConfig() != null) {
- String jmMemory =
- submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
- if (jmMemory != null) {
- application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
- }
- String tmMemory =
- submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
- if (tmMemory != null) {
- application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
- }
- }
- application.setAppId(submitResponse.clusterId());
- if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
- application.setJobId(submitResponse.jobId());
- }
+ future.whenComplete(
+ (response, throwable) -> {
+ // 1) remove Future
+ startFutureMap.remove(application.getId());
- if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
- application.setJobManagerUrl(submitResponse.jobManagerUrl());
- applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
- }
- applicationLog.setYarnAppId(submitResponse.clusterId());
- application.setStartTime(new Date());
- application.setEndTime(null);
- if (isKubernetesApp(application)) {
- application.setRelease(ReleaseStateEnum.DONE.get());
- }
- updateById(application);
-
- // if start completed, will be added task to tracking queue
- if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ // 2) exception
+ if (throwable != null) {
+ String exception = ExceptionUtils.stringifyException(throwable);
+ applicationLog.setException(exception);
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ if (throwable instanceof CancellationException) {
+ doStopped(application);
+ } else {
+ Application app = getById(appParam.getId());
+ app.setState(FlinkAppStateEnum.FAILED.getValue());
+ app.setOptionState(OptionStateEnum.NONE.getValue());
+ updateById(app);
+ if (isKubernetesApp(app)) {
+ k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
- FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
- FlinkAppHttpWatcher.doWatching(application);
+ FlinkAppHttpWatcher.unWatching(appParam.getId());
}
+ }
+ return;
+ }
- applicationLog.setSuccess(true);
- // set savepoint to expire
- savePointService.expire(application.getId());
- },
- e -> {
- if (e.getCause() instanceof CancellationException) {
- updateToStopped(application);
- } else {
- String exception = ExceptionUtils.stringifyException(e);
- applicationLog.setException(exception);
- applicationLog.setSuccess(false);
- Application app = getById(appParam.getId());
- app.setState(FlinkAppStateEnum.FAILED.getValue());
- app.setOptionState(OptionStateEnum.NONE.getValue());
- updateById(app);
- if (isKubernetesApp(app)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(app));
- } else {
- FlinkAppHttpWatcher.unWatching(appParam.getId());
+ // 3) success
+ applicationLog.setSuccess(true);
+ if (response.flinkConfig() != null) {
+ String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
+ if (jmMemory != null) {
+ application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+ }
+ String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
+ if (tmMemory != null) {
+ application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+ }
+ }
+ application.setAppId(response.clusterId());
+ if (StringUtils.isNoneEmpty(response.jobId())) {
+ application.setJobId(response.jobId());
+ }
+
+ if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+ application.setJobManagerUrl(response.jobManagerUrl());
+ applicationLog.setJobManagerUrl(response.jobManagerUrl());
+ }
+ applicationLog.setYarnAppId(response.clusterId());
+ application.setStartTime(new Date());
+ application.setEndTime(null);
+
+ // if start completed, will be added task to tracking queue
+ if (isKubernetesApp(application)) {
+ application.setRelease(ReleaseStateEnum.DONE.get());
+ k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ String domainName = settingService.getIngressModeDefault();
+ if (StringUtils.isNotBlank(domainName)) {
+ try {
+ IngressController.configureIngress(
+ domainName, application.getClusterId(), application.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress, stack info:{}", e.getMessage());
+ applicationLog.setException(e.getMessage());
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ application.setState(FlinkAppStateEnum.FAILED.getValue());
+ application.setOptionState(OptionStateEnum.NONE.getValue());
}
}
- })
- .whenComplete(
- (t, e) -> {
- if (!K8sFlinkConfig.isV2Enabled()
- && FlinkExecutionMode.isKubernetesApplicationMode(
- application.getExecutionMode())) {
- String domainName = settingService.getIngressModeDefault();
- if (StringUtils.isNotBlank(domainName)) {
- try {
- IngressController.configureIngress(
- domainName, application.getClusterId(), application.getK8sNamespace());
- } catch (KubernetesClientException kubernetesClientException) {
- log.info(
- "Failed to create ingress, stack info:{}",
- kubernetesClientException.getMessage());
- applicationLog.setException(e.getMessage());
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- application.setState(FlinkAppStateEnum.FAILED.getValue());
- application.setOptionState(OptionStateEnum.NONE.getValue());
- updateById(application);
- return;
- }
- }
- }
-
- applicationLogService.save(applicationLog);
- startFutureMap.remove(application.getId());
- });
+ }
+ } else {
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
+ FlinkAppHttpWatcher.doWatching(application);
+ }
+ // update app
+ updateById(application);
+ // save log
+ applicationLogService.save(applicationLog);
+ });
}
/**
@@ -781,8 +771,8 @@
return properties;
}
- private void updateToStopped(Application app) {
- Application application = getById(app);
+ private void doStopped(Application appParam) {
+ Application application = getById(appParam);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.CANCELED.getValue());
application.setOptionTime(new Date());
@@ -796,6 +786,18 @@
} else {
FlinkAppHttpWatcher.unWatching(application.getId());
}
+ // kill application
+ if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
+ try {
+ List<ApplicationReport> applications =
+ applicationInfoService.getYarnAppReport(application.getJobName());
+ if (!applications.isEmpty()) {
+ YarnClient yarnClient = HadoopUtils.yarnClient();
+ yarnClient.killApplication(applications.get(0).getApplicationId());
+ }
+ } catch (Exception ignored) {
+ }
+ }
}
private String getSavePointed(Application appParam) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 11732cb..95883a0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -20,9 +20,11 @@
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -51,9 +53,13 @@
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -65,9 +71,11 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
@@ -318,6 +326,45 @@
}
@Override
+ public AppExistsStateEnum checkStart(Application appParam) {
+ Application application = getById(appParam.getId());
+ if (application == null) {
+ return AppExistsStateEnum.INVALID;
+ }
+ if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
+ boolean exists = !getYarnAppReport(application.getJobName()).isEmpty();
+ return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO;
+ }
+ // todo on k8s check...
+ return AppExistsStateEnum.NO;
+ }
+
+ @Override
+ public List<ApplicationReport> getYarnAppReport(String appName) {
+ try {
+ YarnClient yarnClient = HadoopUtils.yarnClient();
+ Set<String> types =
+ Sets.newHashSet(
+ ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
+ EnumSet<YarnApplicationState> states =
+ EnumSet.of(
+ YarnApplicationState.NEW,
+ YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.SUBMITTED,
+ YarnApplicationState.ACCEPTED,
+ YarnApplicationState.RUNNING);
+ Set<String> yarnTag = Sets.newHashSet("streampark");
+ List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag);
+ return applications.stream()
+ .filter(report -> report.getName().equals(appName))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "getYarnAppReport failed. Ensure that yarn is running properly. ", e);
+ }
+ }
+
+ @Override
public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
Application application = getById(id);
ApiAlertException.throwIfNull(
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
index 13110db..f32dbda 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
@@ -40,6 +40,7 @@
DELETE = '/flink/app/delete',
DELETE_BAK = '/flink/app/deletebak',
CREATE = '/flink/app/create',
+ CHECK_START = '/flink/app/check_start',
START = '/flink/app/start',
CLEAN = '/flink/app/clean',
BACKUPS = '/flink/app/backups',
@@ -228,3 +229,7 @@
export function fetchName(data: { config: string }) {
return defHttp.post({ url: APP_API.NAME, data });
}
+
+export function fetchCheckStart(data): Promise<AxiosResponse<number>> {
+ return defHttp.post({ url: APP_API.CHECK_START, data }, { isReturnNativeResponse: true });
+}
diff --git a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index d4e0ce1..19ba3b1 100644
--- a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -63,9 +63,7 @@
height: 18px;
background-color: #fff;
border-radius: 50%;
- transition:
- transform 0.5s,
- background-color 0.5s;
+ transition: transform 0.5s, background-color 0.5s;
will-change: transform;
}
diff --git a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 78cac5c..e08c25f 100644
--- a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++ b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -179,9 +179,7 @@
background-color: @component-background;
border: 1px solid rgb(0 0 0 / 8%);
border-radius: 0.25rem;
- box-shadow:
- 0 2px 2px 0 rgb(0 0 0 / 14%),
- 0 3px 1px -2px rgb(0 0 0 / 10%),
+ box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
0 1px 5px 0 rgb(0 0 0 / 6%);
background-clip: padding-box;
user-select: none;
diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index e5a9dac..1cd7e38 100644
--- a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -113,7 +113,7 @@
});
const getBindValue = computed(
- () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+ () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
);
const getSchema = computed((): FormSchema[] => {
diff --git a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index 8fdbc8f..e89a6ce 100644
--- a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -39,9 +39,7 @@
line-height: 44px;
background-color: @component-background;
border-top: 1px solid @border-color-base;
- box-shadow:
- 0 -6px 16px -8px rgb(0 0 0 / 8%),
- 0 -9px 28px 0 rgb(0 0 0 / 5%),
+ box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%),
0 -12px 48px 16px rgb(0 0 0 / 3%);
transition: width 0.2s;
diff --git a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854..35c0802 100644
--- a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
- default: () => ({}) as BasicColumn,
+ default: () => ({} as BasicColumn),
},
},
setup(props) {
diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index ef46340..b0b2f83 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -226,3 +226,11 @@
CLAIM = 2,
LEGACY = 3,
}
+
+export enum AppExistsEnum {
+ NO = 0,
+ IN_DB = 1,
+ IN_YARN = 2,
+ IN_KUBERNETES = 3,
+ INVALID = 4,
+}
diff --git a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 9a66074..c543be9 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -32,12 +32,9 @@
}
clear();
- timeId = setTimeout(
- () => {
- lockPage();
- },
- lockTime * 60 * 1000,
- );
+ timeId = setTimeout(() => {
+ lockPage();
+ }, lockTime * 60 * 1000);
}
function lockPage(): void {
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 368f490..4a15ec4 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -175,7 +175,7 @@
: never;
};
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as PropWrapper<T>);
export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as Array<keyof T>;
export const mutable = <T extends readonly any[] | Record<string, unknown>>(val: T) =>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 359f20a..98a0f01 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -31,8 +31,9 @@
import { BasicModal, useModalInner } from '/@/components/Modal';
import { useMessage } from '/@/hooks/web/useMessage';
import { useRouter } from 'vue-router';
- import { fetchStart } from '/@/api/flink/app';
- import { RestoreModeEnum } from '/@/enums/flinkEnum';
+ import { fetchCheckStart, fetchForcedStop, fetchStart } from '/@/api/flink/app';
+
+ import { AppExistsEnum, RestoreModeEnum } from '/@/enums/flinkEnum';
import { fetchFlinkEnv } from '/@/api/flink/flinkEnv';
import { renderFlinkAppRestoreMode } from '/@/views/flink/app/hooks/useFlinkRender';
@@ -121,8 +122,21 @@
baseColProps: { span: 24 },
});
- /* submit */
async function handleSubmit() {
+ // when then app is building, show forced starting modal
+ const resp = await fetchCheckStart({
+ id: receiveData.application.id,
+ });
+ if (resp.data.data === AppExistsEnum.IN_YARN) {
+ await fetchForcedStop({
+ id: receiveData.application.id,
+ });
+ }
+ await handleDoSubmit();
+ }
+
+ /* submit */
+ async function handleDoSubmit() {
try {
const formValue = (await validate()) as Recordable;
const savePointed = formValue.startSavePointed;