blob: 2349fc0af813a204ab9ba458ea57f713b1256f1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.ResourceFromEnum;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@Data
@TableName("t_flink_app")
@Slf4j
public class Application implements Serializable {
@TableId(type = IdType.AUTO)
private Long id;
private Long teamId;
/** 1) custom code 2) flink SQL */
private Integer jobType;
private Long projectId;
/** creator */
private Long userId;
/** The name of the frontend and program displayed in yarn */
private String jobName;
private String appId;
private String jobId;
/** The address of the jobmanager, that is, the direct access address of the Flink web UI */
private String jobManagerUrl;
/** flink version */
private Long versionId;
/** k8s cluster id */
private String clusterId;
/** flink docker base image */
private String flinkImage;
/** The resource name of the flink job on k8s, equivalent to clusterId in application mode. */
private String k8sName;
/** k8s namespace */
private String k8sNamespace = Constant.DEFAULT;
/** The exposed type of the rest service of K8s(kubernetes.rest-service.exposed.type) */
private Integer k8sRestExposedType;
/** flink kubernetes pod template */
private String k8sPodTemplate;
private String k8sJmPodTemplate;
private String k8sTmPodTemplate;
@Getter private String ingressTemplate;
private String defaultModeIngress;
/** flink-hadoop integration on flink-k8s mode */
private Boolean k8sHadoopIntegration;
private Integer state;
/** task release status */
@TableField("`release`")
private Integer release;
/** determine if a task needs to be built */
private Boolean build;
/** max restart retries after job failed */
private Integer restartSize;
/** has restart count */
private Integer restartCount;
private Integer optionState;
/** alert id */
private Long alertId;
private String args;
/** application module */
private String module;
private String options;
private String hotParams;
private Integer resolveOrder;
private Integer executionMode;
private String dynamicProperties;
private Integer appType;
/** determine if tracking status */
private Integer tracking;
private String jar;
/**
* for upload type tasks, checkSum needs to be recorded whether it needs to be republished after
* the update and modify.
*/
private Long jarCheckSum;
private String mainClass;
private Date startTime;
private Date endTime;
private Long duration;
/** checkpoint max failure interval */
private Integer cpMaxFailureInterval;
/** checkpoint failure rate interval */
private Integer cpFailureRateInterval;
/** Actions triggered after X minutes failed Y times: 1: send alert 2: restart */
private Integer cpFailureAction;
/** overview */
@TableField("TOTAL_TM")
private Integer totalTM;
@TableField("HADOOP_USER")
private String hadoopUser;
private Integer totalSlot;
private Integer availableSlot;
private Integer jmMemory;
private Integer tmMemory;
private Integer totalTask;
/** the cluster id bound to the task in remote mode */
private Long flinkClusterId;
private String description;
private Date createTime;
private Date optionTime;
private Date modifyTime;
/** 1: cicd (build from csv) 2: upload (upload local jar job) */
private Integer resourceFrom;
private String tags;
private Boolean probing = false;
/** running job */
private transient JobsOverview.Task overview;
private transient String teamResource;
private transient String dependency;
private transient Long sqlId;
private transient String flinkSql;
private transient Integer[] stateArray;
private transient Integer[] jobTypeArray;
private transient Boolean backUp = false;
private transient Boolean restart = false;
private transient String userName;
private transient String nickName;
private transient String config;
private transient Long configId;
private transient String flinkVersion;
private transient String confPath;
private transient Integer format;
private transient String savePoint;
private transient Boolean savePointed = false;
private transient Boolean drain = false;
private transient Boolean nativeFormat = false;
private transient Long savePointTimeout = 60L;
private transient Boolean allowNonRestored = false;
private transient Integer restoreMode;
private transient String socketId;
private transient String projectName;
private transient String createTimeFrom;
private transient String createTimeTo;
private transient String backUpDescription;
private transient String yarnQueue;
/** Flink Web UI Url */
private transient String flinkRestUrl;
/** refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline} */
private transient Integer buildStatus;
private transient AppControl appControl;
public void setDefaultModeIngress(String defaultModeIngress) {
this.defaultModeIngress = defaultModeIngress;
}
public void setK8sNamespace(String k8sNamespace) {
this.k8sNamespace = StringUtils.isBlank(k8sNamespace) ? Constant.DEFAULT : k8sNamespace;
}
public K8sPodTemplates getK8sPodTemplates() {
return K8sPodTemplates.of(k8sPodTemplate, k8sJmPodTemplate, k8sTmPodTemplate);
}
public void setState(Integer state) {
this.state = state;
this.tracking = shouldTracking() ? 1 : 0;
}
public void setYarnQueueByHotParams() {
if (!(FlinkExecutionMode.YARN_APPLICATION == this.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == this.getFlinkExecutionMode())) {
return;
}
Map<String, Object> hotParamsMap = this.getHotParamsMap();
if (MapUtils.isNotEmpty(hotParamsMap)
&& hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) {
String yarnQueue = hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString();
String labelExpr =
Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()))
.map(Object::toString)
.orElse(null);
this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, labelExpr).toString());
}
}
/**
* Determine if a FlinkAppState requires tracking.
*
* @return 1: need to be tracked | 0: no need to be tracked.
*/
public Boolean shouldTracking() {
switch (getStateEnum()) {
case ADDED:
case CREATED:
case FINISHED:
case FAILED:
case CANCELED:
case TERMINATED:
case POS_TERMINATED:
return false;
default:
return true;
}
}
/**
* Determine whether the application can be started to prevent repeated starts.
*
* @return true: can start | false: can not start.
*/
public boolean isCanBeStart() {
switch (getStateEnum()) {
case ADDED:
case CREATED:
case FAILED:
case CANCELED:
case FINISHED:
case LOST:
case TERMINATED:
case SUCCEEDED:
case KILLED:
case POS_TERMINATED:
return true;
default:
return false;
}
}
@JsonIgnore
public ReleaseStateEnum getReleaseState() {
return ReleaseStateEnum.of(release);
}
@JsonIgnore
public FlinkDevelopmentMode getDevelopmentMode() {
return FlinkDevelopmentMode.of(jobType);
}
@JsonIgnore
public FlinkAppStateEnum getStateEnum() {
return FlinkAppStateEnum.of(state);
}
@JsonIgnore
public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
return FlinkK8sRestExposedType.of(this.k8sRestExposedType);
}
@JsonIgnore
public FlinkExecutionMode getFlinkExecutionMode() {
return FlinkExecutionMode.of(executionMode);
}
public boolean cpFailedTrigger() {
return this.cpMaxFailureInterval != null
&& this.cpFailureRateInterval != null
&& this.cpFailureAction != null;
}
public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob()
&& other.isFlinkSqlJob()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
/** Local compilation and packaging working directory */
@JsonIgnore
public String getDistHome() {
String path =
String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule());
log.info("local distHome:{}", path);
return path;
}
@JsonIgnore
public String getLocalAppHome() {
String path = String.format("%s/%s", Workspace.local().APP_WORKSPACE(), id.toString());
log.info("local appHome:{}", path);
return path;
}
@JsonIgnore
public String getRemoteAppHome() {
String path = String.format("%s/%s", Workspace.remote().APP_WORKSPACE(), id.toString());
log.info("remote appHome:{}", path);
return path;
}
/** Automatically identify remoteAppHome or localAppHome based on app FlinkExecutionMode */
@JsonIgnore
public String getAppHome() {
switch (this.getFlinkExecutionMode()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
case YARN_PER_JOB:
case YARN_SESSION:
case REMOTE:
case LOCAL:
return getLocalAppHome();
case YARN_APPLICATION:
return getRemoteAppHome();
default:
throw new UnsupportedOperationException(
"unsupported executionMode ".concat(getFlinkExecutionMode().getName()));
}
}
@JsonIgnore
public String getAppLib() {
return getAppHome().concat("/lib");
}
@JsonIgnore
public ApplicationType getApplicationType() {
return ApplicationType.of(appType);
}
@JsonIgnore
@SneakyThrows
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
return Collections.emptyMap();
}
Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
return optionMap;
}
@JsonIgnore
public boolean isFlinkSqlJob() {
return FlinkDevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isFlinkSqlJobOrPyFlinkJob() {
return FlinkDevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType())
|| FlinkDevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isCustomCodeJob() {
return FlinkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isCustomCodeOrPyFlinkJob() {
return FlinkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
|| FlinkDevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isUploadJob() {
return isCustomCodeOrPyFlinkJob()
&& ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}
@JsonIgnore
public boolean isCICDJob() {
return isCustomCodeOrPyFlinkJob()
&& ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
}
public boolean isStreamParkJob() {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
}
@JsonIgnore
@SneakyThrows
public Dependency getDependencyObject() {
return Dependency.toDependency(this.dependency);
}
@JsonIgnore
public DependencyInfo getDependencyInfo() {
return Dependency.toDependency(getDependency()).toJarPackDeps();
}
@JsonIgnore
public boolean isRunning() {
return FlinkAppStateEnum.RUNNING.getValue() == this.getState();
}
@JsonIgnore
public boolean isNeedRollback() {
return ReleaseStateEnum.NEED_ROLLBACK.get() == this.getRelease();
}
@JsonIgnore
public boolean isNeedRestartOnFailed() {
if (this.restartSize != null && this.restartCount != null) {
return this.restartSize > 0 && this.restartCount <= this.restartSize;
}
return false;
}
@JsonIgnore
public StorageType getStorageType() {
return getStorageType(getExecutionMode());
}
public static StorageType getStorageType(Integer execMode) {
FlinkExecutionMode executionModeEnum = FlinkExecutionMode.of(execMode);
switch (Objects.requireNonNull(executionModeEnum)) {
case YARN_APPLICATION:
return StorageType.HDFS;
case YARN_PER_JOB:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
case KUBERNETES_NATIVE_APPLICATION:
case REMOTE:
return StorageType.LFS;
default:
throw new UnsupportedOperationException("Unsupported ".concat(executionModeEnum.getName()));
}
}
@JsonIgnore
public FsOperator getFsOperator() {
return FsOperator.of(getStorageType());
}
@JsonIgnore
public Workspace getWorkspace() {
return Workspace.of(getStorageType());
}
@JsonIgnore
@SneakyThrows
@SuppressWarnings("unchecked")
public Map<String, Object> getHotParamsMap() {
if (StringUtils.isNotBlank(this.hotParams)) {
Map<String, Object> hotParamsMap = JacksonUtils.read(this.hotParams, Map.class);
hotParamsMap.entrySet().removeIf(entry -> entry.getValue() == null);
return hotParamsMap;
}
return Collections.EMPTY_MAP;
}
@SneakyThrows
public void doSetHotParams() {
updateHotParams(this);
}
@SneakyThrows
public void updateHotParams(Application appParam) {
if (appParam != this) {
this.hotParams = null;
}
FlinkExecutionMode executionModeEnum = appParam.getFlinkExecutionMode();
Map<String, String> hotParams = new HashMap<>(0);
if (needFillYarnQueueLabel(executionModeEnum)) {
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
}
if (MapUtils.isNotEmpty(hotParams)) {
this.setHotParams(JacksonUtils.write(hotParams));
}
}
private boolean needFillYarnQueueLabel(FlinkExecutionMode mode) {
return FlinkExecutionMode.YARN_PER_JOB == mode || FlinkExecutionMode.YARN_APPLICATION == mode;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return id.equals(((Application) o).id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
public static class SFunc {
public static final SFunction<Application, Long> ID = Application::getId;
public static final SFunction<Application, String> JOB_ID = Application::getJobId;
public static final SFunction<Application, Date> START_TIME = Application::getStartTime;
public static final SFunction<Application, Date> END_TIME = Application::getEndTime;
public static final SFunction<Application, Long> DURATION = Application::getDuration;
public static final SFunction<Application, Integer> TOTAL_TASK = Application::getTotalTask;
public static final SFunction<Application, Integer> TOTAL_TM = Application::getTotalTM;
public static final SFunction<Application, Integer> TOTAL_SLOT = Application::getTotalSlot;
public static final SFunction<Application, Integer> JM_MEMORY = Application::getJmMemory;
public static final SFunction<Application, Integer> TM_MEMORY = Application::getTmMemory;
public static final SFunction<Application, Integer> STATE = Application::getState;
public static final SFunction<Application, String> OPTIONS = Application::getOptions;
public static final SFunction<Application, Integer> AVAILABLE_SLOT =
Application::getAvailableSlot;
public static final SFunction<Application, Integer> EXECUTION_MODE =
Application::getExecutionMode;
public static final SFunction<Application, String> JOB_MANAGER_URL =
Application::getJobManagerUrl;
}
}