blob: 555c55f9cc48e365c7a1e5f562eaaf511c4155da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.dataproxy.config.holder;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.CacheType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import com.google.gson.Gson;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Json to object
*/
public class MetaConfigHolder extends ConfigHolder {
private static final String metaConfigFileName = "metadata.json";
private static final int MAX_ALLOWED_JSON_FILE_SIZE = 300 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(MetaConfigHolder.class);
private static final Gson GSON = new Gson();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// meta data
private String dataMd5 = "";
private String dataStr = "";
private final AtomicLong lastUpdVersion = new AtomicLong(0);
private String tmpDataMd5 = "";
private final AtomicLong lastSyncVersion = new AtomicLong(0);
// cached data
private final AtomicInteger clusterType = new AtomicInteger(CacheType.N.getId());
private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSinkMap = new ConcurrentHashMap<>();
public MetaConfigHolder() {
super(metaConfigFileName);
}
/**
* get source topic by groupId and streamId
*/
public String getSrcBaseTopicName(String groupId, String streamId) {
IdTopicConfig idTopicConfig = getSrcIdTopicConfig(groupId, streamId);
if (idTopicConfig == null) {
return null;
}
return idTopicConfig.getTopicName();
}
public IdTopicConfig getSrcIdTopicConfig(String groupId, String streamId) {
IdTopicConfig idTopicConfig = null;
if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
idTopicConfig = id2TopicSrcMap.get(InlongId.generateUid(groupId, streamId));
if (idTopicConfig == null) {
idTopicConfig = id2TopicSrcMap.get(groupId);
}
}
return idTopicConfig;
}
/**
* get topic by groupId and streamId
*/
public String getSourceTopicName(String groupId, String streamId) {
String topic = null;
if (StringUtils.isNotEmpty(groupId) && !id2TopicSrcMap.isEmpty()) {
IdTopicConfig idTopicConfig = id2TopicSrcMap.get(InlongId.generateUid(groupId, streamId));
if (idTopicConfig == null) {
idTopicConfig = id2TopicSrcMap.get(groupId);
}
if (idTopicConfig != null) {
topic = idTopicConfig.getTopicName();
}
}
return topic;
}
public IdTopicConfig getSinkIdTopicConfig(String groupId, String streamId) {
IdTopicConfig idTopicConfig = null;
if (StringUtils.isNotEmpty(groupId) && !id2TopicSinkMap.isEmpty()) {
idTopicConfig = id2TopicSinkMap.get(InlongId.generateUid(groupId, streamId));
if (idTopicConfig == null) {
idTopicConfig = id2TopicSinkMap.get(groupId);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get Sink Topic Config by groupId = {}, streamId = {}, IdTopicConfig = {}",
groupId, streamId, idTopicConfig);
}
return idTopicConfig;
}
public String getConfigMd5() {
if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
return tmpDataMd5;
} else {
return dataMd5;
}
}
public boolean updateConfigMap(InLongMetaConfig metaConfig) {
String inDataJsonStr;
// check cache data
synchronized (this.lastSyncVersion) {
if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
if (tmpDataMd5.equals(metaConfig.getMd5())) {
return false;
}
LOG.info("Update metadata: NOT UPDATE, {} is loading, but wast over {} ms",
getFileName(), System.currentTimeMillis() - this.lastSyncVersion.get());
return false;
} else {
if (dataMd5.equals(metaConfig.getMd5())) {
return false;
}
}
InLongMetaConfig newMetaConfig = buildMixedMetaConfig(metaConfig);
try {
inDataJsonStr = GSON.toJson(newMetaConfig);
} catch (Throwable e) {
LOG.error("Update metadata: failure to serial meta config to json", e);
return false;
}
return storeConfigToFile(inDataJsonStr, newMetaConfig);
}
}
private InLongMetaConfig buildMixedMetaConfig(InLongMetaConfig metaConfig) {
// process and check cluster info
Map<String, CacheClusterConfig> newClusterConfigMap =
new HashMap<>(metaConfig.getClusterConfigMap().size());
newClusterConfigMap.putAll(metaConfig.getClusterConfigMap());
// process id2topic info
Map<String, IdTopicConfig> newIdTopicConfigMap =
new HashMap<>(metaConfig.getIdTopicConfigMap().size());
newIdTopicConfigMap.putAll(metaConfig.getIdTopicConfigMap());
return new InLongMetaConfig(metaConfig.getMd5(),
metaConfig.getMqType(), newClusterConfigMap, newIdTopicConfigMap);
}
public List<CacheClusterConfig> forkCachedCLusterConfig() {
List<CacheClusterConfig> result = new ArrayList<>();
if (mqClusterMap.isEmpty()) {
return result;
}
for (Map.Entry<String, CacheClusterConfig> entry : mqClusterMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
continue;
}
CacheClusterConfig config = new CacheClusterConfig();
config.setClusterName(entry.getValue().getClusterName());
config.setToken(entry.getValue().getToken());
config.getParams().putAll(entry.getValue().getParams());
result.add(config);
}
return result;
}
public Set<String> getAllTopicName() {
Set<String> result = new HashSet<>();
// add default topics first
if (CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
result.addAll(CommonConfigHolder.getInstance().getDefTopics());
}
// add configured topics
for (IdTopicConfig topicConfig : id2TopicSrcMap.values()) {
if (topicConfig == null) {
continue;
}
result.add(topicConfig.getTopicName());
}
return result;
}
@Override
protected boolean loadFromFileToHolder() {
// check meta update setting
if (!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
&& !ConfigManager.handshakeManagerOk.get()) {
LOG.warn("Load metadata: StartupUsingLocalMetaFile is false, don't obtain metadata from {}"
+ " before handshake with Manager", getFileName());
return false;
}
String jsonString = "";
InLongMetaConfig metaConfig;
readWriteLock.writeLock().lock();
try {
jsonString = loadConfigFromFile();
if (StringUtils.isBlank(jsonString)) {
LOG.error("Load metadata: NOT LOADED, changed but empty records, file:{}", getFileName());
return true;
}
try {
metaConfig = GSON.fromJson(jsonString, InLongMetaConfig.class);
} catch (Throwable e) {
LOG.error("Load metadata: NOT LOADED, parse json config failure, file:{}", getFileName(), e);
return true;
}
// check required fields
ImmutablePair<Boolean, String> paramChkResult = validRequiredFields(metaConfig);
if (!paramChkResult.getLeft()) {
LOG.error("Load metadata: NOT LOADED, {}, file:{}",
paramChkResult.getRight(), getFileName());
return true;
}
// update cached data
replaceCacheConfig(metaConfig.getMqType(),
metaConfig.getClusterConfigMap(), metaConfig.getIdTopicConfigMap());
this.dataMd5 = metaConfig.getMd5();
this.dataStr = jsonString;
LOG.info("Load metadata: LOADED success, from {}!", getFileName());
return true;
} catch (Throwable e) {
LOG.error("Load metadata: NOT LOADED, load from {} throw exception", getFileName(), e);
return false;
} finally {
if (this.lastSyncVersion.get() == 0) {
this.lastUpdVersion.set(System.currentTimeMillis());
this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get());
} else {
this.lastUpdVersion.set(this.lastSyncVersion.get());
}
readWriteLock.writeLock().unlock();
}
}
/**
* store meta config to file
*
* @param metaJsonStr meta info string
* @param metaConfig meta info object
*
* @return store result
*/
private boolean storeConfigToFile(String metaJsonStr, InLongMetaConfig metaConfig) {
boolean isSuccess = false;
String filePath = getFilePath();
if (StringUtils.isBlank(filePath)) {
LOG.error("Store metadata: error in writing file {} as the file path is null.", getFileName());
return isSuccess;
}
readWriteLock.writeLock().lock();
try {
File sourceFile = new File(filePath);
File targetFile = new File(getNextBackupFileName());
File tmpNewFile = new File(getFileName() + ".tmp");
if (sourceFile.exists()) {
FileUtils.copyFile(sourceFile, targetFile);
}
FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, StandardCharsets.UTF_8);
FileUtils.copyFile(tmpNewFile, sourceFile);
tmpNewFile.delete();
tmpDataMd5 = metaConfig.getMd5();
lastSyncVersion.set(System.currentTimeMillis());
isSuccess = true;
setFileChanged();
} catch (Throwable ex) {
LOG.error("Store metadata: exception thrown while writing to file {}", getFileName(), ex);
} finally {
readWriteLock.writeLock().unlock();
}
return isSuccess;
}
/**
* update locally cached configuration with input information
*
* @param cacheType mq cluster type
* @param clusterConfigMap mq cluster config
* @param idTopicConfigMap id to topic config
*/
private void replaceCacheConfig(CacheType cacheType,
Map<String, CacheClusterConfig> clusterConfigMap,
Map<String, IdTopicConfig> idTopicConfigMap) {
this.clusterType.getAndSet(cacheType.getId());
// remove deleted id2topic config
Set<String> tmpRmvKeys = new HashSet<>();
for (Map.Entry<String, IdTopicConfig> entry : id2TopicSrcMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
continue;
}
if (!idTopicConfigMap.containsKey(entry.getKey())) {
tmpRmvKeys.add(entry.getKey());
}
}
for (String key : tmpRmvKeys) {
id2TopicSrcMap.remove(key);
}
// add new id2topic source config
id2TopicSrcMap.putAll(idTopicConfigMap);
// add new id2topic sink config
id2TopicSinkMap.putAll(idTopicConfigMap);
// remove deleted cluster config
tmpRmvKeys.clear();
for (Map.Entry<String, CacheClusterConfig> entry : mqClusterMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
continue;
}
if (!clusterConfigMap.containsKey(entry.getKey())) {
tmpRmvKeys.add(entry.getKey());
}
}
for (String key : tmpRmvKeys) {
mqClusterMap.remove(key);
}
// add new mq cluster config
mqClusterMap.putAll(clusterConfigMap);
}
/**
* load configure from holder
*
* @return the configure info
*/
private String loadConfigFromFile() {
String result = "";
if (StringUtils.isBlank(getFileName())) {
LOG.error("Load metadata: fail to load json {} as the file name is null.", getFileName());
return result;
}
InputStream inStream = null;
try {
URL url = getClass().getClassLoader().getResource(getFileName());
inStream = url != null ? url.openStream() : null;
if (inStream == null) {
LOG.error("Load metadata: fail to load json {} as the input stream is null", getFileName());
return result;
}
int size = inStream.available();
if (size > MAX_ALLOWED_JSON_FILE_SIZE) {
LOG.error("Load metadata: fail to load json {} as the content size({}) over max allowed size({})",
getFileName(), size, MAX_ALLOWED_JSON_FILE_SIZE);
return result;
}
byte[] buffer = new byte[size];
inStream.read(buffer);
result = new String(buffer, StandardCharsets.UTF_8);
} catch (Throwable e) {
LOG.error("Load metadata: exception thrown while load from file {}", getFileName(), e);
} finally {
if (null != inStream) {
try {
inStream.close();
} catch (IOException e) {
LOG.error("Load metadata: fail in inStream.close for file {}", getFileName(), e);
}
}
}
return result;
}
/**
* check required fields status
*
* @param metaConfig response from Manager
*
* @return check result
*/
public ImmutablePair<Boolean, String> validRequiredFields(InLongMetaConfig metaConfig) {
if (metaConfig == null) {
return ImmutablePair.of(false, "metaConfig object is null");
} else if (metaConfig.getMd5() == null) {
return ImmutablePair.of(false, "metaConfig.md5 field is null");
} else if (metaConfig.getMqType() == null) {
return ImmutablePair.of(false, "metaConfig.mqType field is null");
} else if (metaConfig.getMqType() == CacheType.N) {
return ImmutablePair.of(false, "metaConfig.mqType value is CacheType.N");
} else if (metaConfig.getClusterConfigMap() == null) {
return ImmutablePair.of(false, "metaConfig.clusterConfigMap field is null");
} else if (metaConfig.getClusterConfigMap().isEmpty()) {
return ImmutablePair.of(false, "metaConfig.clusterConfigMap field is empty");
} else if (metaConfig.getIdTopicConfigMap() == null) {
return ImmutablePair.of(false, "metaConfig.idTopicConfigMap field is null");
} else if (metaConfig.getIdTopicConfigMap().isEmpty()) {
return ImmutablePair.of(false, "metaConfig.idTopicConfigMap is empty");
}
return ImmutablePair.of(true, "ok");
}
}