blob: 291f1c7fbe2a3d422d3a100c311872ecf3f2a91b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.HadoopConfigUtils;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkGateWay;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkGateWayService;
import org.apache.streampark.console.core.service.SqlWorkBenchService;
import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum;
import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.factories.FactoryUtil;
import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
import org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory;
import org.apache.streampark.gateway.results.Column;
import org.apache.streampark.gateway.results.GatewayInfo;
import org.apache.streampark.gateway.results.OperationInfo;
import org.apache.streampark.gateway.results.ResultQueryCondition;
import org.apache.streampark.gateway.results.ResultSet;
import org.apache.streampark.gateway.service.SqlGatewayService;
import org.apache.streampark.gateway.session.SessionEnvironment;
import org.apache.streampark.gateway.session.SessionHandle;
import org.apache.flink.client.program.ClusterClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class SqlWorkBenchServiceImpl implements SqlWorkBenchService {
private final FlinkClusterService flinkClusterService;
private final FlinkGateWayService flinkGateWayService;
private final FlinkEnvService flinkEnvService;
/** Get SqlGatewayService instance by flinkGatewayId */
private SqlGatewayService getSqlGateWayService(Long flinkGatewayId) {
FlinkGateWay flinkGateWay = flinkGateWayService.getById(flinkGatewayId);
if (flinkGateWay == null) {
throw new IllegalArgumentException(
"flinkGateWay is not exist, please check your config, id: " + flinkGatewayId);
}
Map<String, String> config = new HashMap<>(2);
config.put(
FactoryUtil.SQL_GATEWAY_SERVICE_TYPE.getKey(),
flinkGateWay.getGatewayType().getIdentifier());
config.put(FlinkSqlGatewayServiceFactory.BASE_URI.getKey(), flinkGateWay.getAddress());
List<SqlGatewayService> actual = SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
if (actual.size() > 1) {
log.warn("There are more than one SqlGatewayService instance, please check your config");
}
return actual.get(0);
}
@Override
public GatewayInfo getGatewayInfo(Long flinkGatewayId) {
SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
return sqlGateWayService.getGatewayInfo();
}
@Override
public SessionHandle openSession(Long flinkGatewayId, Long flinkClusterId) {
Map<String, String> streamParkConf = new HashMap<>();
SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
URI remoteURI = flinkCluster.getRemoteURI();
String host = remoteURI.getHost();
String port = String.valueOf(remoteURI.getPort());
String clusterId = flinkCluster.getClusterId();
FlinkExecutionMode executionModeEnum = FlinkExecutionMode.of(flinkCluster.getExecutionMode());
streamParkConf.put("execution.target", executionModeEnum.getName());
renderConfByFlinkExecutionMode(
executionModeEnum, streamParkConf, host, port, clusterId, flinkCluster);
return sqlGateWayService.openSession(
new SessionEnvironment(
flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), null, streamParkConf));
}
private void renderConfByFlinkExecutionMode(
FlinkExecutionMode executionModeEnum,
Map<String, String> streamParkConf,
String host,
String port,
String clusterId,
FlinkCluster flinkCluster) {
switch (Objects.requireNonNull(executionModeEnum)) {
case REMOTE:
streamParkConf.put("rest.address", host);
streamParkConf.put("rest.port", port);
break;
case YARN_SESSION:
streamParkConf.put("yarn.application.id", clusterId);
HadoopConfigUtils.readSystemHadoopConf()
.forEach((k, v) -> streamParkConf.put("flink.hadoop." + k, v));
break;
case KUBERNETES_NATIVE_SESSION:
String k8sNamespace = flinkCluster.getK8sNamespace();
String restAddress;
try (ClusterClient<?> clusterClient =
(ClusterClient<?>)
KubernetesRetriever.newFinkClusterClient(
clusterId, k8sNamespace, FlinkK8sExecuteModeEnum.of(executionModeEnum))) {
restAddress = IngressController.ingressUrlAddress(k8sNamespace, clusterId, clusterClient);
} catch (Exception e) {
throw new IllegalArgumentException("get k8s rest address error", e);
}
streamParkConf.put("kubernetes.cluster-id", clusterId);
streamParkConf.put(
"kubernetes.jobmanager.service-account", flinkCluster.getServiceAccount());
streamParkConf.put("kubernetes.namespace", k8sNamespace);
streamParkConf.put("rest.address", restAddress);
break;
default:
throw new IllegalArgumentException("Unsupported execution mode: " + executionModeEnum);
}
}
@Override
public void closeSession(Long flinkGatewayId, String sessionHandleUUIDStr) {
SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
sqlGateWayService.closeSession(new SessionHandle(sessionHandleUUIDStr));
}
@Override
public void cancelOperation(
Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
getSqlGateWayService(flinkGatewayId)
.cancelOperation(new SessionHandle(sessionHandleUUIDStr), new OperationHandle(operationId));
}
@Override
public void closeOperation(Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
getSqlGateWayService(flinkGatewayId)
.closeOperation(new SessionHandle(sessionHandleUUIDStr), new OperationHandle(operationId));
}
@Override
public OperationInfo getOperationInfo(
Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
return getSqlGateWayService(flinkGatewayId)
.getOperationInfo(
new SessionHandle(sessionHandleUUIDStr), new OperationHandle(operationId));
}
@Override
public Column getOperationResultSchema(
Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
return getSqlGateWayService(flinkGatewayId)
.getOperationResultSchema(
new SessionHandle(sessionHandleUUIDStr), new OperationHandle(operationId));
}
@Override
public OperationHandle executeStatement(
Long flinkGatewayId, String sessionHandleUUIDStr, String statement) {
return getSqlGateWayService(flinkGatewayId)
.executeStatement(new SessionHandle(sessionHandleUUIDStr), statement, 10000L, null);
}
@Override
public ResultSet fetchResults(
Long flinkGatewayId,
String sessionHandleUUIDStr,
String operationId,
ResultQueryCondition resultQueryCondition) {
return getSqlGateWayService(flinkGatewayId)
.fetchResults(
new SessionHandle(sessionHandleUUIDStr),
new OperationHandle(operationId),
resultQueryCondition);
}
@Override
public void heartbeat(Long flinkGatewayId, String sessionHandle) {
getSqlGateWayService(flinkGatewayId).heartbeat(new SessionHandle(sessionHandle));
}
@Override
public boolean check(Long flinkGatewayId, Long flinkClusterId) {
FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
if (flinkCluster == null) {
throw new IllegalArgumentException("FlinkCluster not found");
}
FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
if (flinkEnv == null) {
throw new IllegalArgumentException("FlinkEnv not found");
}
return getSqlGateWayService(flinkGatewayId).check(flinkEnv.getFlinkVersion().majorVersion());
}
}