blob: 9505c964c17324f9cebd1e107f73276d1256f757 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.service.core.impl;
import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.ClusterSwitch;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Implementation of {@link SortSourceService}.
*/
@Lazy
@Service
public class SortSourceServiceImpl implements SortSourceService {
private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceServiceImpl.class);
private static final Gson GSON = new Gson();
private static final Set<String> SUPPORTED_MQ_TYPE = new HashSet<String>() {
{
add(MQType.KAFKA);
add(MQType.TUBEMQ);
add(MQType.PULSAR);
}
};
private static final String KEY_AUTH = "authentication";
private static final String KEY_TENANT = "tenant";
private static final int RESPONSE_CODE_SUCCESS = 0;
private static final int RESPONSE_CODE_NO_UPDATE = 1;
private static final int RESPONSE_CODE_FAIL = -1;
private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
/**
* key 1: cluster name, key 2: task name, value : md5
*/
private Map<String, Map<String, String>> sortSourceMd5Map = new ConcurrentHashMap<>();
/**
* key 1: cluster name, key 2: task name, value : source config
*/
private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap<>();
private Map<String, SortSourceClusterInfo> sortClusters;
private Map<String, List<SortSourceClusterInfo>> mqClusters;
private Map<String, SortSourceGroupInfo> groupInfos;
private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
private Map<String, String> backupClusterTag;
private Map<String, String> backupGroupMqResource;
private Map<String, Map<String, String>> backupStreamMqResource;
private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;
@Autowired
private SortConfigLoader configLoader;
@PostConstruct
public void initialize() {
LOGGER.info("create repository for " + SortSourceServiceImpl.class.getSimpleName());
try {
reload();
setReloadTimer();
} catch (Throwable t) {
LOGGER.error("initialize SortSourceConfigRepository error", t);
}
}
@Transactional(rollbackFor = Exception.class)
public void reload() {
LOGGER.debug("start to reload sort config.");
try {
reloadAllConfigs();
parseAll();
} catch (Throwable t) {
LOGGER.error("fail to reload all source config", t);
}
LOGGER.debug("end to reload config");
}
@Override
public SortSourceConfigResponse getSourceConfig(
String cluster,
String task,
String md5) {
// if cluster or task are invalid
if (StringUtils.isBlank(cluster) || StringUtils.isBlank(task)) {
String errMsg = "blank cluster name or task name, return nothing";
LOGGER.debug(errMsg);
return SortSourceConfigResponse.builder()
.code(RESPONSE_CODE_REQ_PARAMS_ERROR)
.msg(errMsg)
.build();
}
// if there is no config, but still return success
if (!sortSourceConfigMap.containsKey(cluster) || !sortSourceConfigMap.get(cluster).containsKey(task)) {
String errMsg = String.format("there is no valid source config of cluster %s, task %s", cluster, task);
LOGGER.debug(errMsg);
return SortSourceConfigResponse.builder()
.code(RESPONSE_CODE_SUCCESS)
.msg(errMsg)
.build();
}
// if the same md5
if (sortSourceMd5Map.get(cluster).get(task).equals(md5)) {
return SortSourceConfigResponse.builder()
.code(RESPONSE_CODE_NO_UPDATE)
.msg("No update")
.md5(md5)
.build();
}
// if there is bad config
if (sortSourceConfigMap.get(cluster).get(task).getCacheZones().isEmpty()) {
String errMsg = String.format("find empty cache zones of cluster %s, task %s, "
+ "please check the manager log", cluster, task);
LOGGER.debug(errMsg);
return SortSourceConfigResponse.builder()
.code(RESPONSE_CODE_FAIL)
.msg(errMsg)
.build();
}
return SortSourceConfigResponse.builder()
.code(RESPONSE_CODE_SUCCESS)
.msg("Success")
.data(sortSourceConfigMap.get(cluster).get(task))
.md5(sortSourceMd5Map.get(cluster).get(task))
.build();
}
private void reloadAllConfigs() {
// reload mq cluster and sort cluster
List<SortSourceClusterInfo> allClusters = configLoader.loadAllClusters();
sortClusters = allClusters.stream()
.collect(Collectors.toMap(SortSourceClusterInfo::getName, v -> v));
// group mq clusters by cluster tag
mqClusters = allClusters.stream()
.filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType()))
.filter(SortSourceClusterInfo::isConsumable)
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
// reload all stream sinks, to Map<clusterName, Map<taskName, List<groupId>>> format
List<SortSourceStreamSinkInfo> allStreamSinks = configLoader.loadAllStreamSinks();
streamSinkMap = new HashMap<>();
allStreamSinks.stream()
.filter(sink -> StringUtils.isNotBlank(sink.getSortClusterName()))
.filter(sink -> StringUtils.isNotBlank(sink.getSortTaskName()))
.forEach(sink -> {
Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap =
streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
List<SortSourceStreamSinkInfo> sinkInfoList =
task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList<>());
sinkInfoList.add(sink);
});
// reload all groups
groupInfos = configLoader.loadAllGroup()
.stream()
.collect(Collectors.toMap(SortSourceGroupInfo::getGroupId, info -> info));
// reload all back up cluster
backupClusterTag = configLoader.loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG)
.stream()
.collect(Collectors.toMap(InlongGroupExtEntity::getInlongGroupId, InlongGroupExtEntity::getKeyValue));
// reload all back up group mq resource
backupGroupMqResource = configLoader.loadGroupBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE)
.stream()
.collect(Collectors.toMap(InlongGroupExtEntity::getInlongGroupId, InlongGroupExtEntity::getKeyValue));
// reload all streams
allStreams = configLoader.loadAllStreams()
.stream()
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
// reload all back up stream mq resource
backupStreamMqResource = configLoader.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE)
.stream()
.collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId,
Collectors.toMap(InlongStreamExtEntity::getInlongStreamId,
InlongStreamExtEntity::getKeyValue)));
}
private void parseAll() {
// Prepare CacheZones for each cluster and task
Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
streamSinkMap.forEach((sortClusterName, task2SinkList) -> {
// prepare the new config and md5
Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
Map<String, String> task2Md5 = new ConcurrentHashMap<>();
task2SinkList.forEach((taskName, sinkList) -> {
try {
CacheZoneConfig cacheZoneConfig =
CacheZoneConfig.builder()
.sortClusterName(sortClusterName)
.sortTaskId(taskName)
.build();
Map<String, CacheZone> cacheZoneMap =
this.parseCacheZones(sortClusterName, sinkList);
cacheZoneConfig.setCacheZones(cacheZoneMap);
// prepare md5
String jsonStr = GSON.toJson(cacheZoneConfig);
String md5 = DigestUtils.md5Hex(jsonStr);
task2Config.put(taskName, cacheZoneConfig);
task2Md5.put(taskName, md5);
} catch (Throwable t) {
LOGGER.warn("failed to parse sort source config of sortCluster={}, task={}",
sortClusterName, taskName);
}
});
newConfigMap.put(sortClusterName, task2Config);
newMd5Map.put(sortClusterName, task2Md5);
});
sortSourceConfigMap = newConfigMap;
sortSourceMd5Map = newMd5Map;
sortClusters = null;
mqClusters = null;
groupInfos = null;
allStreams = null;
backupClusterTag = null;
backupGroupMqResource = null;
backupStreamMqResource = null;
streamSinkMap = null;
}
private Map<String, CacheZone> parseCacheZones(
String clusterName,
List<SortSourceStreamSinkInfo> sinkList) {
Preconditions.expectNotNull(sortClusters.get(clusterName), "sort cluster should not be NULL");
String sortClusterTag = sortClusters.get(clusterName).getClusterTags();
// get group infos
List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
.filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
&& allStreams.containsKey(sinkInfo.getGroupId())
&& allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId()))
.collect(Collectors.toList());
// group them by cluster tag.
Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
.filter(sink -> Objects.nonNull(groupInfos.get(sink.getGroupId())))
.filter(sink -> {
if (StringUtils.isBlank(sortClusterTag)) {
return true;
}
return sortClusterTag.equals(groupInfos.get(sink.getGroupId()).getClusterTag());
})
.collect(Collectors.groupingBy(sink -> {
SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
return groupInfo.getClusterTag();
}));
// group them by second cluster tag.
Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
.filter(sink -> backupClusterTag.containsKey(sink.getGroupId()))
.filter(sink -> {
if (StringUtils.isBlank(sortClusterTag)) {
return true;
}
return sortClusterTag.equals(backupClusterTag.get(sink.getGroupId()));
})
.collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true);
return Stream.of(cacheZones, backupCacheZones)
.flatMap(Collection::stream)
.collect(Collectors.toMap(
CacheZone::getZoneName,
cacheZone -> cacheZone,
(zone1, zone2) -> {
zone1.getTopics().addAll(zone2.getTopics());
return zone1;
}));
}
private List<CacheZone> parseCacheZonesByTag(
Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks,
boolean isBackup) {
return tag2Sinks.keySet().stream()
.filter(mqClusters::containsKey)
.flatMap(tag -> {
List<SortSourceStreamSinkInfo> sinks = tag2Sinks.get(tag);
List<SortSourceClusterInfo> clusters = mqClusters.get(tag);
return clusters.stream()
.map(cluster -> {
CacheZone zone = null;
try {
zone = this.parseCacheZone(sinks, cluster, isBackup);
} catch (IllegalStateException e) {
LOGGER.error("fail to init cache zone for cluster " + cluster, e);
}
return zone;
});
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private CacheZone parseCacheZone(
List<SortSourceStreamSinkInfo> sinks,
SortSourceClusterInfo cluster,
boolean isBackupTag) {
switch (cluster.getType()) {
case ClusterType.PULSAR:
return parsePulsarZone(sinks, cluster, isBackupTag);
default:
throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s",
cluster.getType(), cluster));
}
}
private CacheZone parsePulsarZone(
List<SortSourceStreamSinkInfo> sinks,
SortSourceClusterInfo cluster,
boolean isBackupTag) {
Map<String, String> param = cluster.getExtParamsMap();
String tenant = param.get(KEY_TENANT);
String auth = param.get(KEY_AUTH);
List<Topic> sdkTopics = sinks.stream()
.map(sink -> {
String groupId = sink.getGroupId();
String streamId = sink.getStreamId();
SortSourceGroupInfo groupInfo = groupInfos.get(groupId);
SortSourceStreamInfo streamInfo = allStreams.get(groupId).get(streamId);
try {
String namespace = groupInfo.getMqResource();
String topic = streamInfo.getMqResource();
if (isBackupTag) {
if (backupGroupMqResource.containsKey(groupId)) {
namespace = backupGroupMqResource.get(groupId);
}
if (backupStreamMqResource.containsKey(groupId)
&& backupStreamMqResource.get(groupId).containsKey(streamId)) {
topic = backupStreamMqResource.get(groupId).get(streamId);
}
}
String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic);
return Topic.builder()
.topic(fullTopic)
.topicProperties(sink.getExtParamsMap())
.build();
} catch (Exception e) {
LOGGER.error("fail to parse topic of groupId={}, streamId={}", groupId, streamId, e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return CacheZone.builder()
.zoneName(cluster.getName())
.serviceUrl(cluster.getUrl())
.topics(sdkTopics)
.authentication(auth)
.cacheZoneProperties(cluster.getExtParamsMap())
.zoneType(ClusterType.PULSAR)
.build();
}
/**
* Set reload timer at the beginning of repository.
*/
private void setReloadTimer() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
long reloadInterval = 60000L;
executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
}
}