blob: 263b91c52eedbf580b8a3e08afee915148a89841 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.service.repository;
import org.apache.inlong.common.constant.ClusterSwitch;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.IRepository;
import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
/**
* DataProxyConfigRepository
* This repository was deprecated since version 1.8.0
*/
@Lazy
@Deprecated
@Repository(value = "dataProxyConfigRepository")
public class DataProxyConfigRepository implements IRepository {
public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
public static final String KEY_OLD_TENANT_KEY = "tenant";
public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
public static final String KEY_BACKUP_TOPIC = "backup_topic";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
public static final String KEY_SORT_CONSUMER_GROUP = "defaultSortConsumerGroup";
public static final String KEY_SINK_NAME = "defaultSinkName";
public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
private static final Gson GSON = new Gson();
// key: proxyClusterName, value: jsonString
private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
// key: proxyClusterName, value: md5
private Map<String, String> proxyMd5Map = new ConcurrentHashMap<>();
private long reloadInterval;
@Autowired
private ClusterSetMapper clusterSetMapper;
@Autowired
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongGroupEntityMapper inlongGroupMapper;
@Autowired
private StreamSinkEntityMapper streamSinkMapper;
@Autowired
private SortConfigLoader sortConfigLoader;
@PostConstruct
public void initialize() {
LOGGER.info("create repository for " + DataProxyConfigRepository.class.getSimpleName());
try {
this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
reload();
setReloadTimer();
} catch (Throwable t) {
LOGGER.error("Initialize DataProxyConfigRepository error", t);
}
}
/**
* get clusterSetMapper
*
* @return the clusterSetMapper
*/
public ClusterSetMapper getClusterSetMapper() {
return clusterSetMapper;
}
/**
* set clusterSetMapper
*
* @param clusterSetMapper the clusterSetMapper to set
*/
public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
this.clusterSetMapper = clusterSetMapper;
}
/**
* get clusterMapper
*
* @return the clusterMapper
*/
public InlongClusterEntityMapper getClusterMapper() {
return clusterMapper;
}
/**
* set clusterMapper
*
* @param clusterMapper the clusterMapper to set
*/
public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
this.clusterMapper = clusterMapper;
}
/**
* get inlongGroupMapper
*
* @return the inlongGroupMapper
*/
public InlongGroupEntityMapper getInlongGroupMapper() {
return inlongGroupMapper;
}
/**
* set inlongGroupMapper
*
* @param inlongGroupMapper the inlongGroupMapper to set
*/
public void setInlongGroupMapper(InlongGroupEntityMapper inlongGroupMapper) {
this.inlongGroupMapper = inlongGroupMapper;
}
/**
* get streamSinkMapper
*
* @return the streamSinkMapper
*/
public StreamSinkEntityMapper getStreamSinkMapper() {
return streamSinkMapper;
}
/**
* set streamSinkMapper
*
* @param streamSinkMapper the streamSinkMapper to set
*/
public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
this.streamSinkMapper = streamSinkMapper;
}
/**
* reload
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void reload() {
LOGGER.info("start to reload config:" + this.getClass().getSimpleName());
// reload proxy cluster
Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>();
this.reloadProxyCluster(proxyClusterMap);
if (proxyClusterMap.size() == 0) {
return;
}
// reload cache cluster
this.reloadCacheCluster(proxyClusterMap);
// reload inlong group id and inlong stream id
this.reloadInlongId(proxyClusterMap);
// generateClusterJson
this.generateClusterJson(proxyClusterMap);
LOGGER.info("end to reload config:" + this.getClass().getSimpleName());
}
/**
* reloadProxyCluster
*/
private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) {
for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) {
ProxyClusterObject obj = new ProxyClusterObject();
obj.setName(proxyCluster.getClusterName());
obj.setSetName(proxyCluster.getClusterTag());
obj.setZone(proxyCluster.getExtTag());
DataProxyCluster clusterObj = new DataProxyCluster();
clusterObj.setProxyCluster(obj);
proxyClusterMap.put(obj.getName(), clusterObj);
}
}
/**
* reloadCacheCluster
*/
private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) {
// reload cache cluster
Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>();
for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) {
if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
continue;
}
Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap<>())
.computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
}
}
// mark cache cluster to proxy cluster
Map<String, Map<String, String>> tagCache = new HashMap<>();
for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
DataProxyCluster clusterObj = entry.getValue();
ProxyClusterObject proxyObj = clusterObj.getProxyCluster();
// cache
String clusterTag = proxyObj.getSetName();
String extTag = proxyObj.getZone();
if (StringUtils.isEmpty(extTag)) {
continue;
}
Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
if (cacheClusterZoneMap != null) {
Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));
for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
if (cacheEntry.getValue().size() == 0) {
continue;
}
Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(),
k -> MAP_SPLITTER.split(cacheEntry.getKey()));
if (isSubTag(wholeTagMap, subTagMap)) {
CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
cacheSet.setSetName(clusterTag);
List<CacheCluster> cacheClusterList = cacheEntry.getValue();
cacheSet.setType(cacheClusterList.get(0).getType());
List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters();
for (CacheCluster cacheCluster : cacheClusterList) {
CacheClusterObject obj = new CacheClusterObject();
obj.setName(cacheCluster.getClusterName());
obj.setZone(cacheCluster.getExtTag());
obj.setToken(cacheCluster.getToken());
obj.setParams(fromJsonToMap(cacheCluster.getExtParams()));
cacheClusters.add(obj);
}
}
}
}
}
}
/**
* Parse Json string to Java Map
*/
private Map<String, String> fromJsonToMap(String jsonString) {
Map<String, String> mapObj = new HashMap<>();
if (StringUtils.isBlank(jsonString)) {
return mapObj;
}
try {
JsonObject obj = GSON.fromJson(jsonString, JsonObject.class);
for (String key : obj.keySet()) {
JsonElement child = obj.get(key);
if (child.isJsonPrimitive()) {
mapObj.put(key, child.getAsString());
} else {
mapObj.put(key, child.toString());
}
}
} catch (Exception e) {
LOGGER.error("parse json string to map error", e);
}
// to be compatible with multi-tenancy #7914
String tenant = mapObj.get(KEY_NEW_TENANT_KEY);
mapObj.remove(KEY_NEW_TENANT_KEY);
mapObj.put(KEY_OLD_TENANT_KEY, tenant);
return mapObj;
}
/**
* Parse Json string to JsonObject
*/
private JsonObject fromJsonToJson(String jsonString) {
if (StringUtils.isBlank(jsonString)) {
return new JsonObject();
}
try {
return GSON.fromJson(jsonString, JsonObject.class);
} catch (Exception e) {
LOGGER.error("parse json string to json object error", e);
return new JsonObject();
}
}
/**
* reloadInlongId
*/
private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) {
// reload inlong group id
Map<String, InlongGroupId> groupIdMap = new HashMap<>();
clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
// reload inlong group ext params
Map<String, Map<String, String>> groupParams = new HashMap<>();
groupIdMap.forEach((k, v) -> groupParams.put(k, fromJsonToMap(v.getExtParams())));
// reload inlong group ext
List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
.loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
.put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
// reload inlong stream id
Map<String, InlongStreamId> streamIdMap = new HashMap<>();
clusterSetMapper.selectInlongStreamId()
.forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
// reload inlong stream ext params
Map<String, Map<String, String>> streamParams = new HashMap<>();
streamIdMap.forEach((k, v) -> streamParams.put(k, fromJsonToMap(v.getExtParams())));
// reload inlong stream ext
List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
streamExtCursor.forEach(v -> streamParams
.computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>())
.put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue()));
// build Map<clusterTag, List<InlongIdObject>>
Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap,
streamParams);
// mark inlong id to proxy cluster
for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
String clusterTag = entry.getValue().getProxyCluster().getSetName();
List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
if (inlongIds != null) {
entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds);
}
}
}
/**
* parseInlongId
*/
private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap,
Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap,
Map<String, Map<String, String>> streamParams) {
Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>();
for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) {
InlongStreamId streamIdObj = entry.getValue();
String groupId = streamIdObj.getInlongGroupId();
InlongGroupId groupIdObj = groupIdMap.get(groupId);
if (groupId == null || groupIdObj == null) {
LOGGER.debug("groupId {} or groupIdObj {} is null, ignored", groupId, groupIdObj);
continue;
}
// master
InLongIdObject obj = new InLongIdObject();
String inlongId = entry.getKey();
obj.setInlongId(inlongId);
Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v));
Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v));
if (StringUtils.isBlank(streamIdObj.getTopic())) {
obj.setTopic(groupIdObj.getTopic());
} else {
obj.setTopic(streamIdObj.getTopic());
obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic());
}
inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj);
// backup
InLongIdObject backupObj = new InLongIdObject();
backupObj.setInlongId(inlongId);
backupObj.getParams().putAll(obj.getParams());
Map<String, String> groupParam = groupParams.get(groupId);
if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG)
&& groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) {
String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG);
String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE);
Map<String, String> streamParam = streamParams.get(inlongId);
if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) {
backupObj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE));
backupObj.getParams().put(KEY_NAMESPACE, groupMqResource);
} else {
backupObj.setTopic(groupMqResource);
}
inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(backupObj);
}
}
return inlongIdMap;
}
/**
* getInlongId
*/
private String getInlongId(String inlongGroupId, String inlongStreamId) {
return inlongGroupId + "." + inlongStreamId;
}
/**
* setReloadTimer
*/
private void setReloadTimer() {
Timer reloadTimer = new Timer(true);
TimerTask task = new RepositoryTimerTask<DataProxyConfigRepository>(this);
reloadTimer.scheduleAtFixedRate(task, reloadInterval, reloadInterval);
}
/**
* generateClusterJson
*/
private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) {
Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>();
Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>();
for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) {
DataProxyCluster proxyObj = entry.getValue();
// json
String jsonDataProxyCluster = GSON.toJson(proxyObj);
String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
DataProxyConfigResponse response = new DataProxyConfigResponse();
response.setResult(true);
response.setErrCode(DataProxyConfigResponse.SUCC);
response.setMd5(md5);
response.setData(proxyObj);
String jsonResponse = GSON.toJson(response);
newProxyConfigJson.put(entry.getKey(), jsonResponse);
newProxyMd5Map.put(entry.getKey(), md5);
}
// replace
this.proxyConfigJson = newProxyConfigJson;
this.proxyMd5Map = newProxyMd5Map;
}
/**
* isSubTag
*/
private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
for (Entry<String, String> entry : subTagMap.entrySet()) {
String value = wholeTagMap.get(entry.getKey());
if (value == null || !value.equals(entry.getValue())) {
return false;
}
}
return true;
}
/**
* getProxyMd5
*/
public String getProxyMd5(String clusterName) {
return this.proxyMd5Map.get(clusterName);
}
/**
* getProxyConfigJson
*/
public String getProxyConfigJson(String clusterName) {
return this.proxyConfigJson.get(clusterName);
}
/**
* changeClusterTag
*/
public String changeClusterTag(String inlongGroupId, String clusterTag,
String topic) {
try {
// select
InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
if (oldGroup == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
String oldClusterTag = oldGroup.getInlongClusterTag();
if (StringUtils.equals(oldClusterTag, clusterTag)) {
return "Cluster tag is same.";
}
// prepare group
final InlongGroupEntity newGroup = this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
// load cluster
Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
ClusterPageRequest clusterRequest = new ClusterPageRequest();
List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
clusters.forEach((v) -> {
clusterMap.put(v.getName(), v);
});
// prepare stream sink
SinkPageRequest request = new SinkPageRequest();
request.setInlongGroupId(inlongGroupId);
List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
List<StreamSinkEntity> newStreamSinks = new ArrayList<>();
for (StreamSinkEntity streamSink : streamSinks) {
String clusterName = streamSink.getInlongClusterName();
InlongClusterEntity cluster = clusterMap.get(clusterName);
if (cluster == null) {
continue;
}
if (!StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
continue;
}
String clusterType = cluster.getType();
// find the cluster of same cluster tag and sink type, and add new stream sink
StreamSinkEntity newStreamSink = this.createNewStreamSink(clusters, clusterType, clusterTag,
streamSink);
if (newStreamSink != null) {
newStreamSinks.add(newStreamSink);
}
}
// update
newStreamSinks.forEach((v) -> {
streamSinkMapper.insert(v);
});
int rowCount = inlongGroupMapper.updateByIdentifierSelective(newGroup);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
newGroup.getInlongGroupId(), newGroup.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
return inlongGroupId;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return e.getMessage();
}
}
/**
* createNewStreamSink
*/
private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> clusters, String clusterType,
String clusterTag, StreamSinkEntity srcStreamSink) {
for (InlongClusterEntity v : clusters) {
if (StringUtils.equals(clusterType, v.getType())
&& StringUtils.equals(clusterTag, v.getClusterTags())) {
String newExtParams = v.getExtParams();
JsonObject extParams = fromJsonToJson(newExtParams);
if (extParams.has(KEY_SINK_NAME) && extParams.has(KEY_SORT_TASK_NAME)
&& extParams.has(KEY_DATA_NODE_NAME) && extParams.has(KEY_SORT_CONSUMER_GROUP)) {
final String sinkName = extParams.get(KEY_SINK_NAME).getAsString();
final String sortTaskName = extParams.get(KEY_SORT_TASK_NAME).getAsString();
final String dataNodeName = extParams.get(KEY_DATA_NODE_NAME).getAsString();
final String sortConsumerGroup = extParams.get(KEY_SORT_CONSUMER_GROUP).getAsString();
StreamSinkEntity newStreamSink = copyStreamSink(srcStreamSink);
newStreamSink.setInlongClusterName(v.getName());
newStreamSink.setSinkName(sinkName);
newStreamSink.setSortTaskName(sortTaskName);
newStreamSink.setDataNodeName(dataNodeName);
newStreamSink.setSortConsumerGroup(sortConsumerGroup);
return newStreamSink;
}
return null;
}
}
return null;
}
/**
* copyStreamSink
*/
private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
StreamSinkEntity streamSinkDest = new StreamSinkEntity();
CommonBeanUtils.copyProperties(streamSink, streamSinkDest);
streamSinkDest.setId(null);
streamSinkDest.setModifyTime(new Date());
return streamSinkDest;
}
/**
* prepareClusterTagGroup
*/
private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity oldGroup, String clusterTag, String topic)
throws IllegalAccessException, InvocationTargetException {
// parse ext_params
String extParams = oldGroup.getExtParams();
if (StringUtils.isEmpty(extParams)) {
extParams = "{}";
}
// parse json
JsonObject extParamsObj = fromJsonToJson(extParams);
// change cluster tag
extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, oldGroup.getInlongClusterTag());
extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
// copy properties
InlongGroupEntity newGroup = new InlongGroupEntity();
BeanUtils.copyProperties(newGroup, oldGroup);
newGroup.setId(null);
// change properties
newGroup.setInlongClusterTag(clusterTag);
newGroup.setMqResource(topic);
String newExtParams = extParamsObj.toString();
newGroup.setExtParams(newExtParams);
return newGroup;
}
/**
* removeBackupClusterTag
*/
public String removeBackupClusterTag(String inlongGroupId) {
// select
InlongGroupEntity oldGroup = inlongGroupMapper.selectByGroupId(inlongGroupId);
if (oldGroup == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
// parse ext_params
String extParams = oldGroup.getExtParams();
if (StringUtils.isEmpty(extParams)) {
return inlongGroupId;
}
// parse json
JsonObject extParamsObj = fromJsonToJson(extParams);
if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
return inlongGroupId;
}
final String oldClusterTag = extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
extParamsObj.remove(KEY_BACKUP_TOPIC);
String newExtParams = extParamsObj.toString();
oldGroup.setExtParams(newExtParams);
// update group
int rowCount = inlongGroupMapper.updateByIdentifierSelective(oldGroup);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("inlong group has already updated with group id={}, curVersion={}",
oldGroup.getInlongGroupId(), oldGroup.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
// load cluster
Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
ClusterPageRequest clusterRequest = new ClusterPageRequest();
List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest);
clusters.forEach((v) -> {
clusterMap.put(v.getName(), v);
});
// prepare stream sink
SinkPageRequest request = new SinkPageRequest();
request.setInlongGroupId(inlongGroupId);
List<StreamSinkEntity> streamSinks = streamSinkMapper.selectByCondition(request);
List<StreamSinkEntity> deleteStreamSinks = new ArrayList<>();
for (StreamSinkEntity streamSink : streamSinks) {
String clusterName = streamSink.getInlongClusterName();
InlongClusterEntity cluster = clusterMap.get(clusterName);
if (cluster == null) {
continue;
}
if (StringUtils.equals(oldClusterTag, cluster.getClusterTags())) {
deleteStreamSinks.add(streamSink);
}
}
// delete old stream sink
deleteStreamSinks.forEach((v) -> {
streamSinkMapper.deleteById(v.getId());
});
return inlongGroupId;
}
}