blob: 1b3979459431ea9c05fe354048380f467f8e34dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.dromara.soul.web.cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.soul.common.dto.convert.DivideUpstream;
import org.dromara.soul.common.dto.zk.SelectorZkDTO;
import org.dromara.soul.common.utils.GSONUtils;
import org.dromara.soul.common.utils.UrlUtils;
import org.dromara.soul.web.concurrent.SoulThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* this is divide http url upstream.
*
* @author xiaoyu
*/
@Component
public class UpstreamCacheManager {
private static final Logger LOGGER = LoggerFactory.getLogger(UpstreamCacheManager.class);
private static final BlockingQueue<SelectorZkDTO> BLOCKING_QUEUE = new LinkedBlockingQueue<>(1024);
private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1;
private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = Maps.newConcurrentMap();
private static final Map<String, List<DivideUpstream>> SCHEDULED_MAP = Maps.newConcurrentMap();
@Value("${soul.upstream.delayInit:30}")
private Integer delayInit;
@Value("${soul.upstream.delayInit:10}")
private Integer scheduledTime;
/**
* Find upstream list by selector id list.
*
* @param selectorId the selector id
* @return the list
*/
public List<DivideUpstream> findUpstreamListBySelectorId(final String selectorId) {
return UPSTREAM_MAP.get(selectorId);
}
/**
* Remove by key.
*
* @param key the key
*/
static void removeByKey(final String key) {
UPSTREAM_MAP.remove(key);
}
/**
* Init.
*/
@PostConstruct
public void init() {
synchronized (LOGGER) {
ExecutorService executorService = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("save-upstream-task", false));
for (int i = 0; i < MAX_THREAD; i++) {
executorService.execute(new Worker());
}
new ScheduledThreadPoolExecutor(MAX_THREAD,
SoulThreadFactory.create("scheduled-upstream-task", false))
.scheduleWithFixedDelay(this::scheduled,
delayInit, scheduledTime, TimeUnit.SECONDS);
}
}
/**
* Submit.
*
* @param selectorZkDTO the selector zk dto
*/
static void submit(final SelectorZkDTO selectorZkDTO) {
try {
BLOCKING_QUEUE.put(selectorZkDTO);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
}
}
/**
* Execute.
*
* @param selectorZkDTO the selector zk dto
*/
public void execute(final SelectorZkDTO selectorZkDTO) {
final List<DivideUpstream> upstreamList =
GSONUtils.getInstance().fromList(selectorZkDTO.getHandle(), DivideUpstream[].class);
if (CollectionUtils.isNotEmpty(upstreamList)) {
SCHEDULED_MAP.put(selectorZkDTO.getId(), upstreamList);
UPSTREAM_MAP.put(selectorZkDTO.getId(), check(upstreamList));
}
}
private void scheduled() {
if (SCHEDULED_MAP.size() > 0) {
SCHEDULED_MAP.forEach((k, v) -> UPSTREAM_MAP.put(k, check(v)));
}
}
private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {
List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());
for (DivideUpstream divideUpstream : upstreamList) {
final boolean pass = UrlUtils.checkUrl(divideUpstream.getUpstreamUrl());
if (pass) {
resultList.add(divideUpstream);
}
}
return resultList;
}
/**
* The type Worker.
*/
class Worker implements Runnable {
@Override
public void run() {
runTask();
}
private void runTask() {
while (true) {
try {
final SelectorZkDTO selectorZkDTO = BLOCKING_QUEUE.take();
Optional.of(selectorZkDTO).ifPresent(UpstreamCacheManager.this::execute);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}