[INLONG-8163][Manager][DataProxy] Make DataProxy config interface compatible with old versions (#8164)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 1b4d273..1800c55 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -66,7 +66,7 @@
     // log print count
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
 
-    public static final String KEY_TENANT = "pulsarTenant";
+    public static final String KEY_TENANT = "tenant";
     public static final String KEY_NAMESPACE = "namespace";
 
     public static final String KEY_SERVICE_URL = "serviceUrl";
@@ -92,7 +92,7 @@
     private String clusterName;
     private MessageQueueZoneSinkContext sinkContext;
 
-    private String pulsarTenant;
+    private String tenant;
     private String namespace;
     private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
 
@@ -113,7 +113,7 @@
         this.config = config;
         this.clusterName = config.getClusterName();
         this.sinkContext = sinkContext;
-        this.pulsarTenant = config.getParams().get(KEY_TENANT);
+        this.tenant = config.getParams().get(KEY_TENANT);
         this.namespace = config.getParams().get(KEY_NAMESPACE);
     }
 
@@ -214,7 +214,7 @@
                 return false;
             }
             // topic
-            String producerTopic = idConfig.getPulsarTopicName(pulsarTenant, namespace);
+            String producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
             if (producerTopic == null) {
                 sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
                 sinkContext.addSendResultMetric(event, clusterName, event.getUid(), false, 0);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index e7c2583..3191b1e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -416,11 +416,24 @@
     /**
      * Get data proxy cluster list by the given cluster name
      *
+     * This method was deprecated since version 1.8.0,
+     * new method please see {@link InlongClusterService#getMetaConfig(String, String)}
+     *
      * @return data proxy config
      */
+    @Deprecated
     String getAllConfig(String clusterName, String md5);
 
     /**
+     * Get data proxy cluster list by the given cluster name.
+     *
+     * since version 1.8.0
+     *
+     * @return data proxy config
+     */
+    String getMetaConfig(String clusterName, String md5);
+
+    /**
      * Get the MQ info by cluster tag for Audit
      *
      * @param clusterTag cluster tag
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index f3e3e30..36012fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -66,6 +66,7 @@
 import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperator;
 import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
+import org.apache.inlong.manager.service.repository.DataProxyConfigRepositoryV2;
 import org.apache.inlong.manager.service.user.UserService;
 
 import com.github.pagehelper.Page;
@@ -120,10 +121,16 @@
     private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
+
     @Lazy
     @Autowired
+    @Deprecated
     private DataProxyConfigRepository proxyRepository;
 
+    @Lazy
+    @Autowired
+    private DataProxyConfigRepositoryV2 proxyRepositoryV2;
+
     @Override
     public Integer saveTag(ClusterTagRequest request, String operator) {
         LOGGER.debug("begin to save cluster tag {}", request);
@@ -1337,6 +1344,7 @@
     }
 
     @Override
+    @Deprecated
     public String getAllConfig(String clusterName, String md5) {
         DataProxyConfigResponse response = new DataProxyConfigResponse();
         String configMd5 = proxyRepository.getProxyMd5(clusterName);
@@ -1366,6 +1374,35 @@
     }
 
     @Override
+    public String getMetaConfig(String clusterName, String md5) {
+        DataProxyConfigResponse response = new DataProxyConfigResponse();
+        String configMd5 = proxyRepositoryV2.getProxyMd5(clusterName);
+        if (configMd5 == null) {
+            response.setResult(false);
+            response.setErrCode(DataProxyConfigResponse.REQ_PARAMS_ERROR);
+            return GSON.toJson(response);
+        }
+
+        // same config
+        if (configMd5.equals(md5)) {
+            response.setResult(true);
+            response.setErrCode(DataProxyConfigResponse.NOUPDATE);
+            response.setMd5(configMd5);
+            response.setData(new DataProxyCluster());
+            return GSON.toJson(response);
+        }
+
+        String configJson = proxyRepositoryV2.getProxyConfigJson(clusterName);
+        if (configJson == null) {
+            response.setResult(false);
+            response.setErrCode(DataProxyConfigResponse.REQ_PARAMS_ERROR);
+            return GSON.toJson(response);
+        }
+
+        return configJson;
+    }
+
+    @Override
     public AuditConfig getAuditConfig(String clusterTag) {
         AuditConfig auditConfig = new AuditConfig();
         ClusterPageRequest request = ClusterPageRequest.builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index bbd974e..263b91c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -77,14 +77,18 @@
 
 /**
  * DataProxyConfigRepository
+ * This repository was deprecated since version 1.8.0
  */
 @Lazy
+@Deprecated
 @Repository(value = "dataProxyConfigRepository")
 public class DataProxyConfigRepository implements IRepository {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
 
     public static final String KEY_NAMESPACE = "namespace";
+    public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
+    public static final String KEY_OLD_TENANT_KEY = "tenant";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -317,6 +321,12 @@
         } catch (Exception e) {
             LOGGER.error("parse json string to map error", e);
         }
+
+        // to be compatible with multi-tenancy #7914
+        String tenant = mapObj.get(KEY_NEW_TENANT_KEY);
+        mapObj.remove(KEY_NEW_TENANT_KEY);
+        mapObj.put(KEY_OLD_TENANT_KEY, tenant);
+
         return mapObj;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
new file mode 100644
index 0000000..a49dbdd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.repository;
+
+import org.apache.inlong.common.constant.ClusterSwitch;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.common.pojo.dataproxy.IRepository;
+import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
+import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
+import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
+import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
+import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
+
+import com.google.common.base.Splitter;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DataProxyConfigRepositoryV2
+ * Since version 1.8.0
+ */
+@Lazy
+@Repository(value = "dataProxyConfigRepositoryV2")
+public class DataProxyConfigRepositoryV2 implements IRepository {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepositoryV2.class);
+
+    public static final String KEY_NAMESPACE = "namespace";
+    public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+    public static final String KEY_BACKUP_TOPIC = "backup_topic";
+    public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
+    public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
+    public static final String KEY_SORT_CONSUMER_GROUP = "defaultSortConsumerGroup";
+    public static final String KEY_SINK_NAME = "defaultSinkName";
+
+    public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults()
+            .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
+    public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
+    public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
+    private static final Gson GSON = new Gson();
+
+    // key: proxyClusterName, value: jsonString
+    private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
+    // key: proxyClusterName, value: md5
+    private Map<String, String> proxyMd5Map = new ConcurrentHashMap<>();
+
+    private long reloadInterval;
+
+    @Autowired
+    private ClusterSetMapper clusterSetMapper;
+    @Autowired
+    private InlongClusterEntityMapper clusterMapper;
+    @Autowired
+    private InlongGroupEntityMapper inlongGroupMapper;
+    @Autowired
+    private StreamSinkEntityMapper streamSinkMapper;
+    @Autowired
+    private SortConfigLoader sortConfigLoader;
+
+    @PostConstruct
+    public void initialize() {
+        LOGGER.info("create repository for " + DataProxyConfigRepository.class.getSimpleName());
+        try {
+            this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
+            reload();
+            setReloadTimer();
+        } catch (Throwable t) {
+            LOGGER.error("Initialize DataProxyConfigRepository error", t);
+        }
+    }
+
+    /**
+     * get clusterSetMapper
+     *
+     * @return the clusterSetMapper
+     */
+    public ClusterSetMapper getClusterSetMapper() {
+        return clusterSetMapper;
+    }
+
+    /**
+     * set clusterSetMapper
+     *
+     * @param clusterSetMapper the clusterSetMapper to set
+     */
+    public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
+        this.clusterSetMapper = clusterSetMapper;
+    }
+
+    /**
+     * get clusterMapper
+     *
+     * @return the clusterMapper
+     */
+    public InlongClusterEntityMapper getClusterMapper() {
+        return clusterMapper;
+    }
+
+    /**
+     * set clusterMapper
+     *
+     * @param clusterMapper the clusterMapper to set
+     */
+    public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
+        this.clusterMapper = clusterMapper;
+    }
+
+    /**
+     * get inlongGroupMapper
+     *
+     * @return the inlongGroupMapper
+     */
+    public InlongGroupEntityMapper getInlongGroupMapper() {
+        return inlongGroupMapper;
+    }
+
+    /**
+     * set inlongGroupMapper
+     *
+     * @param inlongGroupMapper the inlongGroupMapper to set
+     */
+    public void setInlongGroupMapper(InlongGroupEntityMapper inlongGroupMapper) {
+        this.inlongGroupMapper = inlongGroupMapper;
+    }
+
+    /**
+     * get streamSinkMapper
+     *
+     * @return the streamSinkMapper
+     */
+    public StreamSinkEntityMapper getStreamSinkMapper() {
+        return streamSinkMapper;
+    }
+
+    /**
+     * set streamSinkMapper
+     *
+     * @param streamSinkMapper the streamSinkMapper to set
+     */
+    public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
+        this.streamSinkMapper = streamSinkMapper;
+    }
+
+    /**
+     * reload
+     */
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void reload() {
+        LOGGER.info("start to reload config:" + this.getClass().getSimpleName());
+        // reload proxy cluster
+        Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>();
+        this.reloadProxyCluster(proxyClusterMap);
+        if (proxyClusterMap.size() == 0) {
+            return;
+        }
+        // reload cache cluster
+        this.reloadCacheCluster(proxyClusterMap);
+        // reload inlong group id and inlong stream id
+        this.reloadInlongId(proxyClusterMap);
+
+        // generateClusterJson
+        this.generateClusterJson(proxyClusterMap);
+
+        LOGGER.info("end to reload config:" + this.getClass().getSimpleName());
+    }
+
+    /**
+     * reloadProxyCluster
+     */
+    private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+        for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
+            ProxyClusterObject obj = new ProxyClusterObject();
+            obj.setName(proxyCluster.getClusterName());
+            obj.setSetName(proxyCluster.getClusterTag());
+            obj.setZone(proxyCluster.getExtTag());
+            DataProxyCluster clusterObj = new DataProxyCluster();
+            clusterObj.setProxyCluster(obj);
+            proxyClusterMap.put(obj.getName(), clusterObj);
+        }
+    }
+
+    /**
+     * reloadCacheCluster
+     */
+    private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload cache cluster
+        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
+        for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
+            if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
+                continue;
+            }
+            Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
+            String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
+            if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
+                cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap<>())
+                        .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
+            }
+        }
+        // mark cache cluster to proxy cluster
+        Map<String, Map<String, String>> tagCache = new HashMap<>();
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster clusterObj = entry.getValue();
+            ProxyClusterObject proxyObj = clusterObj.getProxyCluster();
+            // cache
+            String clusterTag = proxyObj.getSetName();
+            String extTag = proxyObj.getZone();
+            if (StringUtils.isEmpty(extTag)) {
+                continue;
+            }
+            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
+            if (cacheClusterZoneMap != null) {
+                Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
+                for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
+                    if (cacheEntry.getValue().size() == 0) {
+                        continue;
+                    }
+                    Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
+                            k -> MAP_SPLITTER.split(cacheEntry.getKey()));
+                    if (isSubTag(wholeTagMap, subTagMap)) {
+                        CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
+                        cacheSet.setSetName(clusterTag);
+                        List<CacheCluster> cacheClusterList = cacheEntry.getValue();
+                        cacheSet.setType(cacheClusterList.get(0).getType());
+                        List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters();
+                        for (CacheCluster cacheCluster : cacheClusterList) {
+                            CacheClusterObject obj = new CacheClusterObject();
+                            obj.setName(cacheCluster.getClusterName());
+                            obj.setZone(cacheCluster.getExtTag());
+                            obj.setToken(cacheCluster.getToken());
+                            obj.setParams(fromJsonToMap(cacheCluster.getExtParams()));
+                            cacheClusters.add(obj);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Parse Json string to Java Map
+     */
+    private Map<String, String> fromJsonToMap(String jsonString) {
+        Map<String, String> mapObj = new HashMap<>();
+        if (StringUtils.isBlank(jsonString)) {
+            return mapObj;
+        }
+        try {
+            JsonObject obj = GSON.fromJson(jsonString, JsonObject.class);
+            for (String key : obj.keySet()) {
+                JsonElement child = obj.get(key);
+                if (child.isJsonPrimitive()) {
+                    mapObj.put(key, child.getAsString());
+                } else {
+                    mapObj.put(key, child.toString());
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("parse json string to map error", e);
+        }
+        return mapObj;
+    }
+
+    /**
+     * Parse Json string to JsonObject
+     */
+    private JsonObject fromJsonToJson(String jsonString) {
+        if (StringUtils.isBlank(jsonString)) {
+            return new JsonObject();
+        }
+        try {
+            return GSON.fromJson(jsonString, JsonObject.class);
+        } catch (Exception e) {
+            LOGGER.error("parse json string to json object error", e);
+            return new JsonObject();
+        }
+    }
+
+    /**
+     * reloadInlongId
+     */
+    private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) {
+        // reload inlong group id
+        Map<String, InlongGroupId> groupIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
+        // reload inlong group ext params
+        Map<String, Map<String, String>> groupParams = new HashMap<>();
+        groupIdMap.forEach((k, v) -> groupParams.put(k, fromJsonToMap(v.getExtParams())));
+        // reload inlong group ext
+        List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+                .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
+        groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
+        // reload inlong stream id
+        Map<String, InlongStreamId> streamIdMap = new HashMap<>();
+        clusterSetMapper.selectInlongStreamId()
+                .forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
+        // reload inlong stream ext params
+        Map<String, Map<String, String>> streamParams = new HashMap<>();
+        streamIdMap.forEach((k, v) -> streamParams.put(k, fromJsonToMap(v.getExtParams())));
+        // reload inlong stream ext
+        List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+                .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
+        streamExtCursor.forEach(v -> streamParams
+                .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>())
+                .put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue()));
+
+        // build Map<clusterTag, List<InlongIdObject>>
+        Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap,
+                streamParams);
+        // mark inlong id to proxy cluster
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            String clusterTag = entry.getValue().getProxyCluster().getSetName();
+            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
+            if (inlongIds != null) {
+                entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds);
+            }
+        }
+    }
+
+    /**
+     * parseInlongId
+     */
+    private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap,
+            Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap,
+            Map<String, Map<String, String>> streamParams) {
+        Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
+        for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
+            InlongStreamId streamIdObj = entry.getValue();
+            String groupId = streamIdObj.getInlongGroupId();
+            InlongGroupId groupIdObj = groupIdMap.get(groupId);
+            if (groupId == null || groupIdObj == null) {
+                LOGGER.debug("groupId {} or groupIdObj {} is null, ignored", groupId, groupIdObj);
+                continue;
+            }
+            // master
+            InLongIdObject obj = new InLongIdObject();
+            String inlongId = entry.getKey();
+            obj.setInlongId(inlongId);
+            Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v));
+            Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v));
+            if (StringUtils.isBlank(streamIdObj.getTopic())) {
+                obj.setTopic(groupIdObj.getTopic());
+            } else {
+                obj.setTopic(streamIdObj.getTopic());
+                obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
+            }
+            inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
+            // backup
+            InLongIdObject backupObj = new InLongIdObject();
+            backupObj.setInlongId(inlongId);
+            backupObj.getParams().putAll(obj.getParams());
+            Map<String, String> groupParam = groupParams.get(groupId);
+            if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG)
+                    && groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) {
+                String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG);
+                String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE);
+
+                Map<String, String> streamParam = streamParams.get(inlongId);
+                if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) {
+                    backupObj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE));
+                    backupObj.getParams().put(KEY_NAMESPACE, groupMqResource);
+                } else {
+                    backupObj.setTopic(groupMqResource);
+                }
+                inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(backupObj);
+            }
+        }
+        return inlongIdMap;
+    }
+
+    /**
+     * getInlongId
+     */
+    private String getInlongId(String inlongGroupId, String inlongStreamId) {
+        return inlongGroupId + "." + inlongStreamId;
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        Timer reloadTimer = new Timer(true);
+        TimerTask task = new RepositoryTimerTask<DataProxyConfigRepositoryV2>(this);
+        reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
+    }
+
+    /**
+     * generateClusterJson
+     */
+    private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) {
+        Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
+        Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
+        for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
+            DataProxyCluster proxyObj = entry.getValue();
+            // json
+            String jsonDataProxyCluster = GSON.toJson(proxyObj);
+            String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
+            DataProxyConfigResponse response = new DataProxyConfigResponse();
+            response.setResult(true);
+            response.setErrCode(DataProxyConfigResponse.SUCC);
+            response.setMd5(md5);
+            response.setData(proxyObj);
+            String jsonResponse = GSON.toJson(response);
+            newProxyConfigJson.put(entry.getKey(), jsonResponse);
+            newProxyMd5Map.put(entry.getKey(), md5);
+        }
+
+        // replace
+        this.proxyConfigJson = newProxyConfigJson;
+        this.proxyMd5Map = newProxyMd5Map;
+    }
+
+    /**
+     * isSubTag
+     */
+    private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
+        for (Entry<String, String> entry : subTagMap.entrySet()) {
+            String value = wholeTagMap.get(entry.getKey());
+            if (value == null || !value.equals(entry.getValue())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * getProxyMd5
+     */
+    public String getProxyMd5(String clusterName) {
+        return this.proxyMd5Map.get(clusterName);
+    }
+
+    /**
+     * getProxyConfigJson
+     */
+    public String getProxyConfigJson(String clusterName) {
+        return this.proxyConfigJson.get(clusterName);
+    }
+
+    /**
+     * changeClusterTag
+     */
+    public String changeClusterTag(String inlongGroupId, String clusterTag,
+            String topic) {
+        try {
+            // select
+            InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
+            if (oldGroup == null) {
+                throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+            }
+            String oldClusterTag = oldGroup.getInlongClusterTag();
+            if (StringUtils.equals(oldClusterTag, clusterTag)) {
+                return "Cluster tag is same.";
+            }
+            // prepare group
+            final InlongGroupEntity newGroup = this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
+            // load cluster
+            Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+            ClusterPageRequest clusterRequest = new ClusterPageRequest();
+            List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
+            clusters.forEach((v) -> {
+                clusterMap.put(v.getName(), v);
+            });
+            // prepare stream sink
+            SinkPageRequest request = new SinkPageRequest();
+            request.setInlongGroupId(inlongGroupId);
+            List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
+            List<StreamSinkEntity> newStreamSinks = new ArrayList<>();
+            for (StreamSinkEntity streamSink : streamSinks) {
+                String clusterName = streamSink.getInlongClusterName();
+                InlongClusterEntity cluster = clusterMap.get(clusterName);
+                if (cluster == null) {
+                    continue;
+                }
+                if (!StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+                    continue;
+                }
+                String clusterType = cluster.getType();
+                // find the cluster of same cluster tag and sink type, and add new stream sink
+                StreamSinkEntity newStreamSink = this.createNewStreamSink(clusters, clusterType, clusterTag,
+                        streamSink);
+                if (newStreamSink != null) {
+                    newStreamSinks.add(newStreamSink);
+                }
+            }
+            // update
+            newStreamSinks.forEach((v) -> {
+                streamSinkMapper.insert(v);
+            });
+            int rowCount = inlongGroupMapper.updateByIdentifierSelective(newGroup);
+            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+                LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
+                        newGroup.getInlongGroupId(), newGroup.getVersion());
+                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            }
+            return inlongGroupId;
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+            return e.getMessage();
+        }
+    }
+
+    /**
+     * createNewStreamSink
+     */
+    private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> clusters, String clusterType,
+            String clusterTag, StreamSinkEntity srcStreamSink) {
+        for (InlongClusterEntity v : clusters) {
+            if (StringUtils.equals(clusterType, v.getType())
+                    && StringUtils.equals(clusterTag, v.getClusterTags())) {
+                String newExtParams = v.getExtParams();
+                JsonObject extParams = fromJsonToJson(newExtParams);
+                if (extParams.has(KEY_SINK_NAME) && extParams.has(KEY_SORT_TASK_NAME)
+                        && extParams.has(KEY_DATA_NODE_NAME) && extParams.has(KEY_SORT_CONSUMER_GROUP)) {
+                    final String sinkName = extParams.get(KEY_SINK_NAME).getAsString();
+                    final String sortTaskName = extParams.get(KEY_SORT_TASK_NAME).getAsString();
+                    final String dataNodeName = extParams.get(KEY_DATA_NODE_NAME).getAsString();
+                    final String sortConsumerGroup = extParams.get(KEY_SORT_CONSUMER_GROUP).getAsString();
+                    StreamSinkEntity newStreamSink = copyStreamSink(srcStreamSink);
+                    newStreamSink.setInlongClusterName(v.getName());
+                    newStreamSink.setSinkName(sinkName);
+                    newStreamSink.setSortTaskName(sortTaskName);
+                    newStreamSink.setDataNodeName(dataNodeName);
+                    newStreamSink.setSortConsumerGroup(sortConsumerGroup);
+                    return newStreamSink;
+                }
+                return null;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * copyStreamSink
+     */
+    private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
+        StreamSinkEntity streamSinkDest = new StreamSinkEntity();
+        CommonBeanUtils.copyProperties(streamSink, streamSinkDest);
+        streamSinkDest.setId(null);
+        streamSinkDest.setModifyTime(new Date());
+        return streamSinkDest;
+    }
+
+    /**
+     * prepareClusterTagGroup
+     */
+    private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity oldGroup, String clusterTag, String topic)
+            throws IllegalAccessException, InvocationTargetException {
+        // parse ext_params
+        String extParams = oldGroup.getExtParams();
+        if (StringUtils.isEmpty(extParams)) {
+            extParams = "{}";
+        }
+        // parse json
+        JsonObject extParamsObj = fromJsonToJson(extParams);
+        // change cluster tag
+        extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, oldGroup.getInlongClusterTag());
+        extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
+        // copy properties
+        InlongGroupEntity newGroup = new InlongGroupEntity();
+        BeanUtils.copyProperties(newGroup, oldGroup);
+        newGroup.setId(null);
+        // change properties
+        newGroup.setInlongClusterTag(clusterTag);
+        newGroup.setMqResource(topic);
+        String newExtParams = extParamsObj.toString();
+        newGroup.setExtParams(newExtParams);
+        return newGroup;
+    }
+
+    /**
+     * removeBackupClusterTag
+     */
+    public String removeBackupClusterTag(String inlongGroupId) {
+        // select
+        InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
+        if (oldGroup == null) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+        // parse ext_params
+        String extParams = oldGroup.getExtParams();
+        if (StringUtils.isEmpty(extParams)) {
+            return inlongGroupId;
+        }
+        // parse json
+        JsonObject extParamsObj = fromJsonToJson(extParams);
+        if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
+            return inlongGroupId;
+        }
+        final String oldClusterTag = extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
+        extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
+        extParamsObj.remove(KEY_BACKUP_TOPIC);
+        String newExtParams = extParamsObj.toString();
+        oldGroup.setExtParams(newExtParams);
+        // update group
+        int rowCount = inlongGroupMapper.updateByIdentifierSelective(oldGroup);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
+                    oldGroup.getInlongGroupId(), oldGroup.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+        // load cluster
+        Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
+        ClusterPageRequest clusterRequest = new ClusterPageRequest();
+        List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
+        clusters.forEach((v) -> {
+            clusterMap.put(v.getName(), v);
+        });
+        // prepare stream sink
+        SinkPageRequest request = new SinkPageRequest();
+        request.setInlongGroupId(inlongGroupId);
+        List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
+        List<StreamSinkEntity> deleteStreamSinks = new ArrayList<>();
+        for (StreamSinkEntity streamSink : streamSinks) {
+            String clusterName = streamSink.getInlongClusterName();
+            InlongClusterEntity cluster = clusterMap.get(clusterName);
+            if (cluster == null) {
+                continue;
+            }
+            if (StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
+                deleteStreamSinks.add(streamSink);
+            }
+        }
+        // delete old stream sink
+        deleteStreamSinks.forEach((v) -> {
+            streamSinkMapper.deleteById(v.getId());
+        });
+        return inlongGroupId;
+    }
+}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 17de1fc..cea54c8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -70,11 +70,20 @@
     }
 
     @PostMapping("/dataproxy/getAllConfig")
-    @ApiOperation(value = "Get all proxy config")
+    @ApiOperation(value = "Get all proxy config. " +
+            "This method was deprecated since version 1.8.0. " +
+            "Please use new method /dataproxy/getMetaConfig")
+    @Deprecated
     public String getAllConfig(@RequestBody DataProxyConfigRequest request) {
         return clusterService.getAllConfig(request.getClusterName(), request.getMd5());
     }
 
+    @PostMapping("/dataproxy/getMetaConfig")
+    @ApiOperation(value = "Get all DataProxy meta config")
+    public String getMetaConfig(@RequestBody DataProxyConfigRequest request) {
+        return clusterService.getMetaConfig(request.getClusterName(), request.getMd5());
+    }
+
     @RequestMapping(value = "/changeClusterTag", method = RequestMethod.PUT)
     @ApiOperation(value = "Change cluster tag and topic for inlong group id")
     public Response<String> changeClusterTag(@RequestParam String inlongGroupId, @RequestParam String clusterTag,