| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.streampark.console.core.entity; |
| |
| import org.apache.streampark.common.conf.ConfigKeys; |
| import org.apache.streampark.common.enums.ClusterState; |
| import org.apache.streampark.common.enums.FlinkExecutionMode; |
| import org.apache.streampark.common.enums.FlinkK8sRestExposedType; |
| import org.apache.streampark.common.enums.ResolveOrder; |
| import org.apache.streampark.common.util.HttpClientUtils; |
| import org.apache.streampark.common.util.PropertiesUtils; |
| import org.apache.streampark.console.base.util.JacksonUtils; |
| import org.apache.streampark.console.core.utils.YarnQueueLabelExpression; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.flink.configuration.CoreOptions; |
| import org.apache.hc.client5.http.config.RequestConfig; |
| |
| import com.baomidou.mybatisplus.annotation.IdType; |
| import com.baomidou.mybatisplus.annotation.TableId; |
| import com.baomidou.mybatisplus.annotation.TableName; |
| import com.baomidou.mybatisplus.core.toolkit.support.SFunction; |
| import com.fasterxml.jackson.annotation.JsonIgnore; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import lombok.Data; |
| import lombok.SneakyThrows; |
| |
| import java.io.Serializable; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| @Data |
| @TableName("t_flink_cluster") |
| public class FlinkCluster implements Serializable { |
| |
| @TableId(type = IdType.AUTO) |
| private Long id; |
| |
| private String address; |
| |
| private String jobManagerUrl; |
| |
| private String clusterId; |
| |
| private String clusterName; |
| |
| private Integer executionMode; |
| |
| /** flink version */ |
| private Long versionId; |
| |
| private String k8sNamespace; |
| |
| private String serviceAccount; |
| |
| private String description; |
| |
| private Long userId; |
| |
| private String flinkImage; |
| |
| private String options; |
| |
| private String yarnQueue; |
| |
| private Boolean k8sHadoopIntegration; |
| |
| private String dynamicProperties; |
| |
| private Integer k8sRestExposedType; |
| |
| private String k8sConf; |
| |
| private Integer resolveOrder; |
| |
| private String exception; |
| |
| private Integer clusterState; |
| |
| private Date createTime; |
| |
| private Date startTime; |
| |
| private Date endTime; |
| |
| private Long alertId; |
| |
| private transient Integer allJobs = 0; |
| |
| private transient Integer affectedJobs = 0; |
| |
| @JsonIgnore |
| public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() { |
| return FlinkK8sRestExposedType.of(this.k8sRestExposedType); |
| } |
| |
| public FlinkExecutionMode getFlinkExecutionModeEnum() { |
| return FlinkExecutionMode.of(this.executionMode); |
| } |
| |
| public ClusterState getClusterStateEnum() { |
| return ClusterState.of(this.clusterState); |
| } |
| |
| @JsonIgnore |
| @SneakyThrows |
| public Map<String, Object> getOptionMap() { |
| if (StringUtils.isBlank(this.options)) { |
| return Collections.emptyMap(); |
| } |
| Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class); |
| if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) { |
| optionMap.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName); |
| optionMap.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue)); |
| } |
| optionMap.entrySet().removeIf(entry -> entry.getValue() == null); |
| return optionMap; |
| } |
| |
| @JsonIgnore |
| public URI getRemoteURI() { |
| try { |
| HttpClientUtils.httpGetRequest( |
| this.address, |
| RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); |
| return new URI(address); |
| } catch (Exception ignored) { |
| // |
| } |
| return null; |
| } |
| |
| @JsonIgnore |
| public Map<String, String> getFlinkConfig() throws JsonProcessingException { |
| String restUrl = this.address + "/jobmanager/config"; |
| String json = |
| HttpClientUtils.httpGetRequest( |
| restUrl, RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); |
| if (StringUtils.isBlank(json)) { |
| return Collections.emptyMap(); |
| } |
| List<Map<String, String>> confList = |
| JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {}); |
| Map<String, String> config = new HashMap<>(0); |
| confList.forEach(k -> config.put(k.get("key"), k.get("value"))); |
| return config; |
| } |
| |
| @JsonIgnore |
| public Map<String, Object> getProperties() { |
| Map<String, Object> propertyMap = new HashMap<>(); |
| Map<String, String> dynamicPropertyMap = |
| PropertiesUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties()); |
| propertyMap.putAll(this.getOptionMap()); |
| propertyMap.putAll(dynamicPropertyMap); |
| ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder()); |
| if (resolveOrder != null) { |
| propertyMap.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName()); |
| } |
| return propertyMap; |
| } |
| |
| public static class SFunc { |
| public static final SFunction<FlinkCluster, Long> ID = FlinkCluster::getId; |
| public static final SFunction<FlinkCluster, String> ADDRESS = FlinkCluster::getAddress; |
| public static final SFunction<FlinkCluster, String> JOB_MANAGER_URL = |
| FlinkCluster::getJobManagerUrl; |
| public static final SFunction<FlinkCluster, Integer> CLUSTER_STATE = |
| FlinkCluster::getClusterState; |
| public static final SFunction<FlinkCluster, Integer> EXECUTION_MODE = |
| FlinkCluster::getExecutionMode; |
| public static final SFunction<FlinkCluster, String> EXCEPTION = FlinkCluster::getException; |
| } |
| } |