blob: 0a6996a870cfcbc2fdd8d45457b43ebbfb736ca2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.service.core.impl;
import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Used to cache the sort cluster config and reduce the number of query to database.
*/
@Lazy
@Service
public class SortClusterServiceImpl implements SortClusterService {
private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
private static final Gson GSON = new Gson();
private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
private static final int RESPONSE_CODE_SUCCESS = 0;
private static final int RESPONSE_CODE_NO_UPDATE = 1;
private static final int RESPONSE_CODE_FAIL = -1;
private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
private static final String KEY_GROUP_ID = "inlongGroupId";
private static final String KEY_STREAM_ID = "inlongStreamId";
private Map<String, List<String>> fieldMap;
// key : sort cluster name, value : md5
private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
// key : sort cluster name, value : cluster config
private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<>();
// key : sort cluster name, value : error log
private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<>();
private long reloadInterval;
@Autowired
private SortConfigLoader sortConfigLoader;
@Autowired
private SinkOperatorFactory sinkOperatorFactory;
@Autowired
private DataNodeOperatorFactory dataNodeOperatorFactory;
@PostConstruct
public void initialize() {
LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
try {
this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
reload();
setReloadTimer();
} catch (Throwable t) {
LOGGER.error("Initialize SortClusterConfigRepository error", t);
}
}
@Transactional(rollbackFor = Exception.class)
public void reload() {
LOGGER.debug("start to reload sort config");
try {
reloadAllClusterConfig();
} catch (Throwable t) {
LOGGER.error("fail to reload cluster config", t);
}
LOGGER.debug("end to reload config");
}
@Override
public SortClusterResponse getClusterConfig(String clusterName, String md5) {
// check if cluster name is valid or not.
if (StringUtils.isBlank(clusterName)) {
String errMsg = "Blank cluster name, return nothing";
LOGGER.debug(errMsg);
return SortClusterResponse.builder()
.msg(errMsg)
.code(RESPONSE_CODE_REQ_PARAMS_ERROR)
.build();
}
// if there is an error
if (sortClusterErrorLogMap.get(clusterName) != null) {
return SortClusterResponse.builder()
.msg(sortClusterErrorLogMap.get(clusterName))
.code(RESPONSE_CODE_FAIL)
.build();
}
// there is no config, but still return success.
if (sortClusterConfigMap.get(clusterName) == null) {
String errMsg = "There is not config for cluster " + clusterName;
LOGGER.debug(errMsg);
return SortClusterResponse.builder()
.msg(errMsg)
.code(RESPONSE_CODE_SUCCESS)
.build();
}
// if the same md5
if (sortClusterMd5Map.get(clusterName).equals(md5)) {
return SortClusterResponse.builder()
.msg("No update")
.code(RESPONSE_CODE_NO_UPDATE)
.md5(md5)
.build();
}
return SortClusterResponse.builder()
.msg("Success")
.code(RESPONSE_CODE_SUCCESS)
.data(sortClusterConfigMap.get(clusterName))
.md5(sortClusterMd5Map.get(clusterName))
.build();
}
private void reloadAllClusterConfig() {
// load all fields info
List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
fieldMap = new HashMap<>();
fieldInfos.forEach(info -> {
List<String> fields = fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>());
fields.add(info.getFieldName());
});
List<StreamSinkEntity> sinkEntities = sortConfigLoader.loadAllStreamSinkEntity();
// get all task under a given cluster, has been reduced into cluster and task.
List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
.filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName())
&& StringUtils.isNotBlank(dto.getSortTaskName())
&& StringUtils.isNotBlank(dto.getDataNodeName())
&& StringUtils.isNotBlank(dto.getSinkType()))
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
// get all stream sinks
Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream()
.filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName()))
.collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName));
// get all data nodes and group by node name
List<DataNodeEntity> dataNodeEntities = sortConfigLoader.loadAllDataNodeEntity();
Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream()
.filter(entity -> StringUtils.isNotBlank(entity.getName()))
.map(entity -> {
DataNodeOperator operator = dataNodeOperatorFactory.getInstance(entity.getType());
return operator.getFromEntity(entity);
})
.collect(Collectors.toMap(DataNodeInfo::getName, info -> info));
// re-org all SortClusterConfigs
Map<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<>();
Map<String, String> newMd5Map = new ConcurrentHashMap<>();
Map<String, String> newErrorLogMap = new ConcurrentHashMap<>();
clusterTaskMap.forEach((clusterName, taskList) -> {
try {
SortClusterConfig config = this.getConfigByClusterName(clusterName,
taskList, task2AllStreams, task2DataNodeMap);
String jsonStr = GSON.toJson(config);
String md5 = DigestUtils.md5Hex(jsonStr);
newConfigMap.put(clusterName, config);
newMd5Map.put(clusterName, md5);
} catch (Throwable e) {
// if get config failed, update the err log.
String errMsg = Optional.ofNullable(e.getMessage()).orElse("Unknown error, please check logs");
newErrorLogMap.put(clusterName, errMsg);
LOGGER.error("Failed to update cluster config={}", clusterName, e);
}
});
sortClusterErrorLogMap = newErrorLogMap;
sortClusterConfigMap = newConfigMap;
sortClusterMd5Map = newMd5Map;
}
private SortClusterConfig getConfigByClusterName(
String clusterName,
List<SortTaskInfo> tasks,
Map<String, List<StreamSinkEntity>> task2AllStreams,
Map<String, DataNodeInfo> task2DataNodeMap) {
List<SortTaskConfig> taskConfigs = tasks.stream()
.map(task -> {
try {
String taskName = task.getSortTaskName();
String type = task.getSinkType();
String dataNodeName = task.getDataNodeName();
DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName);
List<StreamSinkEntity> streams = task2AllStreams.get(taskName);
return SortTaskConfig.builder()
.name(taskName)
.type(type)
.idParams(this.parseIdParams(streams))
.sinkParams(this.parseSinkParams(nodeInfo))
.build();
} catch (Exception e) {
LOGGER.error("fail to parse sort task config of cluster={}", clusterName, e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return SortClusterConfig.builder()
.clusterName(clusterName)
.sortTasks(taskConfigs)
.build();
}
private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams) {
return streams.stream()
.map(streamSink -> {
try {
StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType());
List<String> fields = fieldMap.get(streamSink.getInlongGroupId());
return operator.parse2IdParams(streamSink, fields);
} catch (Exception e) {
LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}",
streamSink.getInlongGroupId(), streamSink.getInlongStreamId(),
streamSink.getSinkName(), streamSink.getSinkType(), e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
}
/**
* Set reload timer at the beginning of repository.
*/
private void setReloadTimer() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
}
}