blob: 7a8dd8a3f41d5bf9b65947f8bf27a494a3c36659 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.manager.service.impl;
import com.github.pagehelper.Page;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.streamnative.pulsar.manager.entity.EnvironmentEntity;
import io.streamnative.pulsar.manager.entity.EnvironmentsRepository;
import io.streamnative.pulsar.manager.service.EnvironmentCacheService;
import io.streamnative.pulsar.manager.utils.HttpUtil;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* A cache that caches environments.
*/
@Slf4j
@Service
public class EnvironmentCacheServiceImpl implements EnvironmentCacheService {
@Autowired
private EnvironmentsRepository environmentsRepository;
@Value("${backend.jwt.token}")
private String pulsarJwtToken;
private final Map<String, Map<String, ClusterData>> environments;
public EnvironmentCacheServiceImpl() {
this.environments = new ConcurrentHashMap<>();
}
@Override
public String getServiceUrl(HttpServletRequest request) {
String cluster = request.getHeader("x-pulsar-cluster");
return getServiceUrl(request, cluster);
}
@Override
public String getServiceUrl(HttpServletRequest request, String cluster) {
String environment = request.getHeader("environment");
return getServiceUrl(environment, cluster);
}
public String getServiceUrl(String environment, String cluster) {
if (StringUtils.isBlank(cluster)) {
// if there is no cluster is specified, forward the request to environment service url
Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment);
EnvironmentEntity environmentEntity = environmentEntityOptional.get();
String directRequestHost = environmentEntity.getBroker();
return directRequestHost;
} else {
return getServiceUrl(environment, cluster, 0);
}
}
private String getServiceUrl(String environment, String cluster, int numReloads) {
// if there is a cluster specified, lookup the cluster.
Map<String, ClusterData> clusters = environments.get(environment);
ClusterData clusterData;
if (null == clusters) {
clusterData = reloadCluster(environment, cluster);
} else {
clusterData = clusters.get(cluster);
if (clusterData == null) {
clusterData = reloadCluster(environment, cluster);
}
}
if (null == clusterData) {
// no environment and no cluster
throw new RuntimeException(
"No cluster '" + cluster + "' found in environment '" + environment + "'");
}
return clusterData.getServiceUrl();
}
private Map<String, String> jsonHeader() {
Map<String, String> header = Maps.newHashMap();
if (StringUtils.isNotBlank(pulsarJwtToken)) {
header.put("Authorization", String.format("Bearer %s", pulsarJwtToken));
}
header.put("Content-Type", "application/json");
return header;
}
@Scheduled(
initialDelay = 0L,
fixedDelayString = "${cluster.cache.reload.interval.ms}")
@Override
public void reloadEnvironments() {
int pageNum = 0;
final int pageSize = 100;
Set<String> newEnvironments = new HashSet<>();
Page<EnvironmentEntity> environmentPage = environmentsRepository.getEnvironmentsList(pageNum, pageSize);
List<EnvironmentEntity> environmentList = environmentPage.getResult();
while (!environmentList.isEmpty()) {
environmentList.forEach(env -> {
reloadEnvironment(env);
newEnvironments.add(env.getName());
});
++pageNum;
environmentPage = environmentsRepository.getEnvironmentsList(pageNum, pageSize);
environmentList = environmentPage.getResult();
}
log.info("Successfully reloaded environments : {}", newEnvironments);
Set<String> oldEnvironments = environments.keySet();
Set<String> goneEnvironments = Sets.difference(oldEnvironments, newEnvironments);
for (String env : goneEnvironments) {
environments.remove(env);
log.info("Removed cached environment {} since it is already deleted.", env);
}
}
private void reloadEnvironment(String environment) {
// if there is no clusters, lookup the clusters
EnvironmentEntity entity = environmentsRepository.findByName(environment).get();
reloadEnvironment(entity);
}
public void reloadEnvironment(EnvironmentEntity environment) {
Gson gson = new Gson();
String result = HttpUtil.doGet(
environment.getBroker() + "/admin/v2/clusters",
jsonHeader()
);
List<String> clustersList =
gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
log.info("Reload cluster list for environment {} : {}", environment.getName(), clustersList);
Set<String> newClusters = Sets.newHashSet(clustersList);
Map<String, ClusterData> clusterDataMap = environments.computeIfAbsent(
environment.getName(),
(e) -> new ConcurrentHashMap<>());
Set<String> oldClusters = clusterDataMap.keySet();
Set<String> goneClusters = Sets.difference(oldClusters, newClusters);
for (String cluster : goneClusters) {
log.info("Remove cluster {} from environment {}.", cluster, environment.getName());
clusterDataMap.remove(cluster);
}
for (String cluster : clustersList) {
reloadCluster(environment, cluster);
}
}
private ClusterData reloadCluster(String environment, String cluster) {
// if there is no clusters, lookup the clusters
return environmentsRepository.findByName(environment).map(env ->
reloadCluster(env, cluster)
).orElse(null);
}
private ClusterData reloadCluster(EnvironmentEntity environment, String cluster) {
log.info("Reloading cluster data for cluster {} @ environment {} ...",
cluster, environment.getName());
Gson gson = new Gson();
String clusterInfoUrl = environment.getBroker() + "/admin/v2/clusters/" + cluster;
String result = HttpUtil.doGet(
clusterInfoUrl,
jsonHeader()
);
if (null == result) {
// fail to fetch the cluster data or the cluster is not found
return null;
}
log.info("Loaded cluster data for cluster {} @ environment {} from {} : {}",
cluster, environment.getName(), clusterInfoUrl, result);
ClusterData clusterData = gson.fromJson(result, ClusterData.class);
Map<String, ClusterData> clusters = environments.computeIfAbsent(
environment.getName(),
(e) -> new ConcurrentHashMap<>());
clusters.put(cluster, clusterData);
log.info("Successfully loaded cluster data for cluster {} @ environment {} : {}",
cluster, environment.getName(), clusterData);
return clusterData;
}
}