[INLONG-8163][Manager][DataProxy] Make DataProxy config interface compatible with old versions (#8164)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 1b4d273..1800c55 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -66,7 +66,7 @@
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
- public static final String KEY_TENANT = "pulsarTenant";
+ public static final String KEY_TENANT = "tenant";
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_SERVICE_URL = "serviceUrl";
@@ -92,7 +92,7 @@
private String clusterName;
private MessageQueueZoneSinkContext sinkContext;
- private String pulsarTenant;
+ private String tenant;
private String namespace;
private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
@@ -113,7 +113,7 @@
this.config = config;
this.clusterName = config.getClusterName();
this.sinkContext = sinkContext;
- this.pulsarTenant = config.getParams().get(KEY_TENANT);
+ this.tenant = config.getParams().get(KEY_TENANT);
this.namespace = config.getParams().get(KEY_NAMESPACE);
}
@@ -214,7 +214,7 @@
return false;
}
// topic
- String producerTopic = idConfig.getPulsarTopicName(pulsarTenant, namespace);
+ String producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
if (producerTopic == null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName, event.getUid(), false, 0);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index e7c2583..3191b1e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -416,11 +416,24 @@
/**
* Get data proxy cluster list by the given cluster name
*
+ * This method was deprecated since version 1.8.0,
+ * new method please see {@link InlongClusterService#getMetaConfig(String, String)}
+ *
* @return data proxy config
*/
+ @Deprecated
String getAllConfig(String clusterName, String md5);
/**
+ * Get data proxy cluster list by the given cluster name.
+ *
+ * since version 1.8.0
+ *
+ * @return data proxy config
+ */
+ String getMetaConfig(String clusterName, String md5);
+
+ /**
* Get the MQ info by cluster tag for Audit
*
* @param clusterTag cluster tag
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index f3e3e30..36012fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -66,6 +66,7 @@
import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperator;
import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
+import org.apache.inlong.manager.service.repository.DataProxyConfigRepositoryV2;
import org.apache.inlong.manager.service.user.UserService;
import com.github.pagehelper.Page;
@@ -120,10 +121,16 @@
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
+
@Lazy
@Autowired
+ @Deprecated
private DataProxyConfigRepository proxyRepository;
+ @Lazy
+ @Autowired
+ private DataProxyConfigRepositoryV2 proxyRepositoryV2;
+
@Override
public Integer saveTag(ClusterTagRequest request, String operator) {
LOGGER.debug("begin to save cluster tag {}", request);
@@ -1337,6 +1344,7 @@
}
@Override
+ @Deprecated
public String getAllConfig(String clusterName, String md5) {
DataProxyConfigResponse response = new DataProxyConfigResponse();
String configMd5 = proxyRepository.getProxyMd5(clusterName);
@@ -1366,6 +1374,35 @@
}
@Override
+ public String getMetaConfig(String clusterName, String md5) {
+ DataProxyConfigResponse response = new DataProxyConfigResponse();
+ String configMd5 = proxyRepositoryV2.getProxyMd5(clusterName);
+ if (configMd5 == null) {
+ response.setResult(false);
+ response.setErrCode(DataProxyConfigResponse.REQ_PARAMS_ERROR);
+ return GSON.toJson(response);
+ }
+
+ // same config
+ if (configMd5.equals(md5)) {
+ response.setResult(true);
+ response.setErrCode(DataProxyConfigResponse.NOUPDATE);
+ response.setMd5(configMd5);
+ response.setData(new DataProxyCluster());
+ return GSON.toJson(response);
+ }
+
+ String configJson = proxyRepositoryV2.getProxyConfigJson(clusterName);
+ if (configJson == null) {
+ response.setResult(false);
+ response.setErrCode(DataProxyConfigResponse.REQ_PARAMS_ERROR);
+ return GSON.toJson(response);
+ }
+
+ return configJson;
+ }
+
+ @Override
public AuditConfig getAuditConfig(String clusterTag) {
AuditConfig auditConfig = new AuditConfig();
ClusterPageRequest request = ClusterPageRequest.builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index bbd974e..263b91c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -77,14 +77,18 @@
/**
* DataProxyConfigRepository
+ * This repository was deprecated since version 1.8.0
*/
@Lazy
+@Deprecated
@Repository(value = "dataProxyConfigRepository")
public class DataProxyConfigRepository implements IRepository {
public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
public static final String KEY_NAMESPACE = "namespace";
+ public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
+ public static final String KEY_OLD_TENANT_KEY = "tenant";
public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
public static final String KEY_BACKUP_TOPIC = "backup_topic";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -317,6 +321,12 @@
} catch (Exception e) {
LOGGER.error("parse json string to map error", e);
}
+
+ // to be compatible with multi-tenancy #7914
+ String tenant = mapObj.get(KEY_NEW_TENANT_KEY);
+ mapObj.remove(KEY_NEW_TENANT_KEY);
+ mapObj.put(KEY_OLD_TENANT_KEY, tenant);
+
return mapObj;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
new file mode 100644
index 0000000..a49dbdd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.repository;
+
+import org.apache.inlong.common.constant.ClusterSwitch;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.common.pojo.dataproxy.IRepository;
+import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
+
+import com.google.common.base.Splitter;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DataProxyConfigRepositoryV2
+ * Since version 1.8.0
+ */
+@Lazy
+@Repository(value = "dataProxyConfigRepositoryV2")
+public class DataProxyConfigRepositoryV2 implements IRepository {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepositoryV2.class);
+
+ public static final String KEY_NAMESPACE = "namespace";
+ public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+ public static final String KEY_BACKUP_TOPIC = "backup_topic";
+ public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
+ public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
+ public static final String KEY_SORT_CONSUMER_GROUP = "defaultSortConsumerGroup";
+ public static final String KEY_SINK_NAME = "defaultSinkName";
+
+ public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults()
+ .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
+ public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
+ public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
+ private static final Gson GSON = new Gson();
+
+ // key: proxyClusterName, value: jsonString
+ private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
+ // key: proxyClusterName, value: md5
+ private Map<String, String> proxyMd5Map = new ConcurrentHashMap<>();
+
+ private long reloadInterval;
+
+ @Autowired
+ private ClusterSetMapper clusterSetMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterMapper;
+ @Autowired
+ private InlongGroupEntityMapper inlongGroupMapper;
+ @Autowired
+ private StreamSinkEntityMapper streamSinkMapper;
+ @Autowired
+ private SortConfigLoader sortConfigLoader;
+
+ @PostConstruct
+ public void initialize() {
+ LOGGER.info("create repository for " + DataProxyConfigRepository.class.getSimpleName());
+ try {
+ this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
+ reload();
+ setReloadTimer();
+ } catch (Throwable t) {
+ LOGGER.error("Initialize DataProxyConfigRepository error", t);
+ }
+ }
+
+ /**
+ * get clusterSetMapper
+ *
+ * @return the clusterSetMapper
+ */
+ public ClusterSetMapper getClusterSetMapper() {
+ return clusterSetMapper;
+ }
+
+ /**
+ * set clusterSetMapper
+ *
+ * @param clusterSetMapper the clusterSetMapper to set
+ */
+ public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
+ this.clusterSetMapper = clusterSetMapper;
+ }
+
+ /**
+ * get clusterMapper
+ *
+ * @return the clusterMapper
+ */
+ public InlongClusterEntityMapper getClusterMapper() {
+ return clusterMapper;
+ }
+
+ /**
+ * set clusterMapper
+ *
+ * @param clusterMapper the clusterMapper to set
+ */
+ public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
+ this.clusterMapper = clusterMapper;
+ }
+
+ /**
+ * get inlongGroupMapper
+ *
+ * @return the inlongGroupMapper
+ */
+ public InlongGroupEntityMapper getInlongGroupMapper() {
+ return inlongGroupMapper;
+ }
+
+ /**
+ * set inlongGroupMapper
+ *
+ * @param inlongGroupMapper the inlongGroupMapper to set
+ */
+ public void setInlongGroupMapper(InlongGroupEntityMapper inlongGroupMapper) {
+ this.inlongGroupMapper = inlongGroupMapper;
+ }
+
+ /**
+ * get streamSinkMapper
+ *
+ * @return the streamSinkMapper
+ */
+ public StreamSinkEntityMapper getStreamSinkMapper() {
+ return streamSinkMapper;
+ }
+
+ /**
+ * set streamSinkMapper
+ *
+ * @param streamSinkMapper the streamSinkMapper to set
+ */
+ public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
+ this.streamSinkMapper = streamSinkMapper;
+ }
+
+ /**
+ * reload
+ */
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public void reload() {
+ LOGGER.info("start to reload config:" + this.getClass().getSimpleName());
+ // reload proxy cluster
+ Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>();
+ this.reloadProxyCluster(proxyClusterMap);
+ if (proxyClusterMap.size() == 0) {
+ return;
+ }
+ // reload cache cluster
+ this.reloadCacheCluster(proxyClusterMap);
+ // reload inlong group id and inlong stream id
+ this.reloadInlongId(proxyClusterMap);
+
+ // generateClusterJson
+ this.generateClusterJson(proxyClusterMap);
+
+ LOGGER.info("end to reload config:" + this.getClass().getSimpleName());
+ }
+
+ /**
+ * reloadProxyCluster
+ */
+ private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+ for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
+ ProxyClusterObject obj = new ProxyClusterObject();
+ obj.setName(proxyCluster.getClusterName());
+ obj.setSetName(proxyCluster.getClusterTag());
+ obj.setZone(proxyCluster.getExtTag());
+ DataProxyCluster clusterObj = new DataProxyCluster();
+ clusterObj.setProxyCluster(obj);
+ proxyClusterMap.put(obj.getName(), clusterObj);
+ }
+ }
+
+ /**
+ * reloadCacheCluster
+ */
+ private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+ // reload cache cluster
+ Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
+ for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
+ if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
+ continue;
+ }
+ Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
+ String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
+ if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
+ cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap<>())
+ .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
+ }
+ }
+ // mark cache cluster to proxy cluster
+ Map<String, Map<String, String>> tagCache = new HashMap<>();
+ for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+ DataProxyCluster clusterObj = entry.getValue();
+ ProxyClusterObject proxyObj = clusterObj.getProxyCluster();
+ // cache
+ String clusterTag = proxyObj.getSetName();
+ String extTag = proxyObj.getZone();
+ if (StringUtils.isEmpty(extTag)) {
+ continue;
+ }
+ Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+ if (cacheClusterZoneMap != null) {
+ Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+ for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+ if (cacheEntry.getValue().size() == 0) {
+ continue;
+ }
+ Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+ k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+ if (isSubTag(wholeTagMap, subTagMap)) {
+ CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+ cacheSet.setSetName(clusterTag);
+ List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+ cacheSet.setType(cacheClusterList.get(0).getType());
+ List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters();
+ for (CacheCluster cacheCluster : cacheClusterList) {
+ CacheClusterObject obj = new CacheClusterObject();
+ obj.setName(cacheCluster.getClusterName());
+ obj.setZone(cacheCluster.getExtTag());
+ obj.setToken(cacheCluster.getToken());
+ obj.setParams(fromJsonToMap(cacheCluster.getExtParams()));
+ cacheClusters.add(obj);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Parse Json string to Java Map
+ */
+ private Map<String, String> fromJsonToMap(String jsonString) {
+ Map<String, String> mapObj = new HashMap<>();
+ if (StringUtils.isBlank(jsonString)) {
+ return mapObj;
+ }
+ try {
+ JsonObject obj = GSON.fromJson(jsonString, JsonObject.class);
+ for (String key : obj.keySet()) {
+ JsonElement child = obj.get(key);
+ if (child.isJsonPrimitive()) {
+ mapObj.put(key, child.getAsString());
+ } else {
+ mapObj.put(key, child.toString());
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("parse json string to map error", e);
+ }
+ return mapObj;
+ }
+
+ /**
+ * Parse Json string to JsonObject
+ */
+ private JsonObject fromJsonToJson(String jsonString) {
+ if (StringUtils.isBlank(jsonString)) {
+ return new JsonObject();
+ }
+ try {
+ return GSON.fromJson(jsonString, JsonObject.class);
+ } catch (Exception e) {
+ LOGGER.error("parse json string to json object error", e);
+ return new JsonObject();
+ }
+ }
+
+ /**
+ * reloadInlongId
+ */
+ private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) {
+ // reload inlong group id
+ Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+ clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+ // reload inlong group ext params
+ Map<String, Map<String, String>> groupParams = new HashMap<>();
+ groupIdMap.forEach((k, v) -> groupParams.put(k, fromJsonToMap(v.getExtParams())));
+ // reload inlong group ext
+ List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+ .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
+ groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
+ .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+ // reload inlong stream id
+ Map<String, InlongStreamId> streamIdMap = new HashMap<>();
+ clusterSetMapper.selectInlongStreamId()
+ .forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+ // reload inlong stream ext params
+ Map<String, Map<String, String>> streamParams = new HashMap<>();
+ streamIdMap.forEach((k, v) -> streamParams.put(k, fromJsonToMap(v.getExtParams())));
+ // reload inlong stream ext
+ List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+ .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
+ streamExtCursor.forEach(v -> streamParams
+ .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>())
+ .put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue()));
+
+ // build Map<clusterTag, List<InlongIdObject>>
+ Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap,
+ streamParams);
+ // mark inlong id to proxy cluster
+ for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+ String clusterTag = entry.getValue().getProxyCluster().getSetName();
+ List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+ if (inlongIds != null) {
+ entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds);
+ }
+ }
+ }
+
+ /**
+ * parseInlongId
+ */
+ private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap,
+ Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap,
+ Map<String, Map<String, String>> streamParams) {
+ Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+ for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
+ InlongStreamId streamIdObj = entry.getValue();
+ String groupId = streamIdObj.getInlongGroupId();
+ InlongGroupId groupIdObj = groupIdMap.get(groupId);
+ if (groupId == null || groupIdObj == null) {
+ LOGGER.debug("groupId {} or groupIdObj {} is null, ignored", groupId, groupIdObj);
+ continue;
+ }
+ // master
+ InLongIdObject obj = new InLongIdObject();
+ String inlongId = entry.getKey();
+ obj.setInlongId(inlongId);
+ Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v));
+ Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v));
+ if (StringUtils.isBlank(streamIdObj.getTopic())) {
+ obj.setTopic(groupIdObj.getTopic());
+ } else {
+ obj.setTopic(streamIdObj.getTopic());
+ obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
+ }
+ inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+ // backup
+ InLongIdObject backupObj = new InLongIdObject();
+ backupObj.setInlongId(inlongId);
+ backupObj.getParams().putAll(obj.getParams());
+ Map<String, String> groupParam = groupParams.get(groupId);
+ if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG)
+ && groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) {
+ String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG);
+ String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE);
+
+ Map<String, String> streamParam = streamParams.get(inlongId);
+ if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) {
+ backupObj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE));
+ backupObj.getParams().put(KEY_NAMESPACE, groupMqResource);
+ } else {
+ backupObj.setTopic(groupMqResource);
+ }
+ inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(backupObj);
+ }
+ }
+ return inlongIdMap;
+ }
+
+ /**
+ * getInlongId
+ */
+ private String getInlongId(String inlongGroupId, String inlongStreamId) {
+ return inlongGroupId + "." + inlongStreamId;
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ Timer reloadTimer = new Timer(true);
+ TimerTask task = new RepositoryTimerTask<DataProxyConfigRepositoryV2>(this);
+ reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
+ }
+
+ /**
+ * generateClusterJson
+ */
+ private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) {
+ Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
+ Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
+ for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+ DataProxyCluster proxyObj = entry.getValue();
+ // json
+ String jsonDataProxyCluster = GSON.toJson(proxyObj);
+ String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
+ DataProxyConfigResponse response = new DataProxyConfigResponse();
+ response.setResult(true);
+ response.setErrCode(DataProxyConfigResponse.SUCC);
+ response.setMd5(md5);
+ response.setData(proxyObj);
+ String jsonResponse = GSON.toJson(response);
+ newProxyConfigJson.put(entry.getKey(), jsonResponse);
+ newProxyMd5Map.put(entry.getKey(), md5);
+ }
+
+ // replace
+ this.proxyConfigJson = newProxyConfigJson;
+ this.proxyMd5Map = newProxyMd5Map;
+ }
+
+ /**
+ * isSubTag
+ */
+ private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
+ for (Entry<String, String> entry : subTagMap.entrySet()) {
+ String value = wholeTagMap.get(entry.getKey());
+ if (value == null || !value.equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * getProxyMd5
+ */
+ public String getProxyMd5(String clusterName) {
+ return this.proxyMd5Map.get(clusterName);
+ }
+
+ /**
+ * getProxyConfigJson
+ */
+ public String getProxyConfigJson(String clusterName) {
+ return this.proxyConfigJson.get(clusterName);
+ }
+
+ /**
+ * changeClusterTag
+ */
+ public String changeClusterTag(String inlongGroupId, String clusterTag,
+ String topic) {
+ try {
+ // select
+ InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
+ if (oldGroup == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+ String oldClusterTag = oldGroup.getInlongClusterTag();
+ if (StringUtils.equals(oldClusterTag, clusterTag)) {
+ return "Cluster tag is same.";
+ }
+ // prepare group
+ final InlongGroupEntity newGroup = this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
+ // load cluster
+ Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+ ClusterPageRequest clusterRequest = new ClusterPageRequest();
+ List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
+ clusters.forEach((v) -> {
+ clusterMap.put(v.getName(), v);
+ });
+ // prepare stream sink
+ SinkPageRequest request = new SinkPageRequest();
+ request.setInlongGroupId(inlongGroupId);
+ List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
+ List<StreamSinkEntity> newStreamSinks = new ArrayList<>();
+ for (StreamSinkEntity streamSink : streamSinks) {
+ String clusterName = streamSink.getInlongClusterName();
+ InlongClusterEntity cluster = clusterMap.get(clusterName);
+ if (cluster == null) {
+ continue;
+ }
+ if (!StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+ continue;
+ }
+ String clusterType = cluster.getType();
+ // find the cluster of same cluster tag and sink type, and add new stream sink
+ StreamSinkEntity newStreamSink = this.createNewStreamSink(clusters, clusterType, clusterTag,
+ streamSink);
+ if (newStreamSink != null) {
+ newStreamSinks.add(newStreamSink);
+ }
+ }
+ // update
+ newStreamSinks.forEach((v) -> {
+ streamSinkMapper.insert(v);
+ });
+ int rowCount = inlongGroupMapper.updateByIdentifierSelective(newGroup);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
+ newGroup.getInlongGroupId(), newGroup.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ return inlongGroupId;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ return e.getMessage();
+ }
+ }
+
+ /**
+ * createNewStreamSink
+ */
+ private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> clusters, String clusterType,
+ String clusterTag, StreamSinkEntity srcStreamSink) {
+ for (InlongClusterEntity v : clusters) {
+ if (StringUtils.equals(clusterType, v.getType())
+ && StringUtils.equals(clusterTag, v.getClusterTags())) {
+ String newExtParams = v.getExtParams();
+ JsonObject extParams = fromJsonToJson(newExtParams);
+ if (extParams.has(KEY_SINK_NAME) && extParams.has(KEY_SORT_TASK_NAME)
+ && extParams.has(KEY_DATA_NODE_NAME) && extParams.has(KEY_SORT_CONSUMER_GROUP)) {
+ final String sinkName = extParams.get(KEY_SINK_NAME).getAsString();
+ final String sortTaskName = extParams.get(KEY_SORT_TASK_NAME).getAsString();
+ final String dataNodeName = extParams.get(KEY_DATA_NODE_NAME).getAsString();
+ final String sortConsumerGroup = extParams.get(KEY_SORT_CONSUMER_GROUP).getAsString();
+ StreamSinkEntity newStreamSink = copyStreamSink(srcStreamSink);
+ newStreamSink.setInlongClusterName(v.getName());
+ newStreamSink.setSinkName(sinkName);
+ newStreamSink.setSortTaskName(sortTaskName);
+ newStreamSink.setDataNodeName(dataNodeName);
+ newStreamSink.setSortConsumerGroup(sortConsumerGroup);
+ return newStreamSink;
+ }
+ return null;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * copyStreamSink
+ */
+ private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
+ StreamSinkEntity streamSinkDest = new StreamSinkEntity();
+ CommonBeanUtils.copyProperties(streamSink, streamSinkDest);
+ streamSinkDest.setId(null);
+ streamSinkDest.setModifyTime(new Date());
+ return streamSinkDest;
+ }
+
+ /**
+ * prepareClusterTagGroup
+ */
+ private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity oldGroup, String clusterTag, String topic)
+ throws IllegalAccessException, InvocationTargetException {
+ // parse ext_params
+ String extParams = oldGroup.getExtParams();
+ if (StringUtils.isEmpty(extParams)) {
+ extParams = "{}";
+ }
+ // parse json
+ JsonObject extParamsObj = fromJsonToJson(extParams);
+ // change cluster tag
+ extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, oldGroup.getInlongClusterTag());
+ extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
+ // copy properties
+ InlongGroupEntity newGroup = new InlongGroupEntity();
+ BeanUtils.copyProperties(newGroup, oldGroup);
+ newGroup.setId(null);
+ // change properties
+ newGroup.setInlongClusterTag(clusterTag);
+ newGroup.setMqResource(topic);
+ String newExtParams = extParamsObj.toString();
+ newGroup.setExtParams(newExtParams);
+ return newGroup;
+ }
+
+ /**
+ * removeBackupClusterTag
+ */
+ public String removeBackupClusterTag(String inlongGroupId) {
+ // select
+ InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
+ if (oldGroup == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+ // parse ext_params
+ String extParams = oldGroup.getExtParams();
+ if (StringUtils.isEmpty(extParams)) {
+ return inlongGroupId;
+ }
+ // parse json
+ JsonObject extParamsObj = fromJsonToJson(extParams);
+ if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
+ return inlongGroupId;
+ }
+ final String oldClusterTag = extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
+ extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
+ extParamsObj.remove(KEY_BACKUP_TOPIC);
+ String newExtParams = extParamsObj.toString();
+ oldGroup.setExtParams(newExtParams);
+ // update group
+ int rowCount = inlongGroupMapper.updateByIdentifierSelective(oldGroup);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
+ oldGroup.getInlongGroupId(), oldGroup.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ // load cluster
+ Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+ ClusterPageRequest clusterRequest = new ClusterPageRequest();
+ List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
+ clusters.forEach((v) -> {
+ clusterMap.put(v.getName(), v);
+ });
+ // prepare stream sink
+ SinkPageRequest request = new SinkPageRequest();
+ request.setInlongGroupId(inlongGroupId);
+ List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
+ List<StreamSinkEntity> deleteStreamSinks = new ArrayList<>();
+ for (StreamSinkEntity streamSink : streamSinks) {
+ String clusterName = streamSink.getInlongClusterName();
+ InlongClusterEntity cluster = clusterMap.get(clusterName);
+ if (cluster == null) {
+ continue;
+ }
+ if (StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+ deleteStreamSinks.add(streamSink);
+ }
+ }
+ // delete old stream sink
+ deleteStreamSinks.forEach((v) -> {
+ streamSinkMapper.deleteById(v.getId());
+ });
+ return inlongGroupId;
+ }
+}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 17de1fc..cea54c8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -70,11 +70,20 @@
}
@PostMapping("/dataproxy/getAllConfig")
- @ApiOperation(value = "Get all proxy config")
+ @ApiOperation(value = "Get all proxy config. " +
+ "This method was deprecated since version 1.8.0. " +
+ "Please use new method /dataproxy/getMetaConfig")
+ @Deprecated
public String getAllConfig(@RequestBody DataProxyConfigRequest request) {
return clusterService.getAllConfig(request.getClusterName(), request.getMd5());
}
+ @PostMapping("/dataproxy/getMetaConfig")
+ @ApiOperation(value = "Get all DataProxy meta config")
+ public String getMetaConfig(@RequestBody DataProxyConfigRequest request) {
+ return clusterService.getMetaConfig(request.getClusterName(), request.getMd5());
+ }
+
@RequestMapping(value = "/changeClusterTag", method = RequestMethod.PUT)
@ApiOperation(value = "Change cluster tag and topic for inlong group id")
public Response<String> changeClusterTag(@RequestParam String inlongGroupId, @RequestParam String clusterTag,