feat: Upgrade DataSync-SourceCode-Analysis-Http-Data-Sync to v2.5.0 (#829)

Co-authored-by: ray <zhongzhenchao@ingbaobei.com>
diff --git a/blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md b/blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
index 19303b3..e70d92d 100644
--- a/blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
+++ b/blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
@@ -10,7 +10,7 @@
 In `ShenYu` gateway, data synchronization refers to how to synchronize the updated data to the gateway after the data is sent in the background management system. The Apache ShenYu gateway currently supports data synchronization for `ZooKeeper`, `WebSocket`, `http long poll`, `Nacos`, `etcd` and `Consul`. The main content of this article is based on `http long poll` data synchronization source code analysis.
-> This paper based on `shenyu-2.4.0` version of the source code analysis, the official website of the introduction of please refer to the [Data Synchronization Design](https://shenyu.apache.org/docs/design/data-sync/) .
+> This paper based on `shenyu-2.5.0` version of the source code analysis, the official website of the introduction of please refer to the [Data Synchronization Design](https://shenyu.apache.org/docs/design/data-sync/) .
 ### 1. Http Long Polling
@@ -33,12 +33,12 @@
 Introduce dependencies in the `pom` file.
-        <!--shenyu data sync start use http-->
-        <dependency>
-        	<groupId>org.apache.shenyu</groupId>
-        	<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
-        	<version>${project.version}</version>
-        </dependency>
+<!--shenyu data sync start use http-->
+    <groupId>org.apache.shenyu</groupId>
+    <artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
+    <version>${project.version}</version>
 Add the following configuration to the `application.yml` configuration file.
@@ -60,43 +60,75 @@
 @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
+@EnableConfigurationProperties(value = HttpConfig.class)
 public class HttpSyncDataConfiguration {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
+    /**
+     * Rest template.
+     *
+     * @param httpConfig the http config
+     * @return the rest template
+     */
+    @Bean
+    public RestTemplate restTemplate(final HttpConfig httpConfig) {
+        OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
+        factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
+        factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
+        factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
+        return new RestTemplate(factory);
+    }
+    /**
+     * AccessTokenManager.
+     *
+     * @param httpConfig   the http config.
+     * @param restTemplate the rest template.
+     * @return the access token manager.
+     */
+    @Bean
+    public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
+        return new AccessTokenManager(restTemplate, httpConfig);
+    }
      * Http sync data service.
-     * @param httpConfig        
-     * @param pluginSubscriber   
-     * @param metaSubscribers    
-     * @param authSubscribers    
+     *
+     * @param httpConfig         the http config
+     * @param pluginSubscriber   the plugin subscriber
+     * @param restTemplate       the rest template
+     * @param metaSubscribers    the meta subscribers
+     * @param authSubscribers    the auth subscribers
+     * @param accessTokenManager the access token manager
      * @return the sync data service
-    public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
-                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
-        log.info("you use http long pull sync shenyu data");
-        return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
-                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
-    }
-    /**
-     * Http config http config.
-     * @return the http config
-     */
-    @Bean
-    @ConfigurationProperties(prefix = "shenyu.sync.http")
-    public HttpConfig httpConfig() {
-        return new HttpConfig();
+    public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
+                                               final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
+                                               final ObjectProvider<RestTemplate> restTemplate,
+                                               final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
+                                               final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+                                               final ObjectProvider<AccessTokenManager> accessTokenManager) {
+        LOGGER.info("you use http long pull sync shenyu data");
+        return new HttpSyncDataService(
+                Objects.requireNonNull(httpConfig.getIfAvailable()),
+                Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
+                Objects.requireNonNull(restTemplate.getIfAvailable()),
+                metaSubscribers.getIfAvailable(Collections::emptyList),
+                authSubscribers.getIfAvailable(Collections::emptyList),
+                Objects.requireNonNull(accessTokenManager.getIfAvailable())
+        );
-`HttpSyncDataConfiguration` is the configuration class for `Http long polling` data synchronization, responsible for creating `HttpSyncDataService` (responsible for the concrete implementation of `http` data synchronization) and `HttpConfig` (`admin` property configuration). It is annotated as follows.
+`HttpSyncDataConfiguration` is the configuration class for `Http long polling` data synchronization, responsible for creating `HttpSyncDataService` (responsible for the concrete implementation of `http` data synchronization) 、 `RestTemplate` and `AccessTokenManager` (responsible for the access token processing). It is annotated as follows.
 - `@Configuration`: indicates that this is a configuration class.
 - `@ConditionalOnClass(HttpSyncDataService.class)`: conditional annotation indicating that the class `HttpSyncDataService` is to be present.
 - `@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")`: conditional annotation to have the property `shenyu.sync.http.url` configured.
+- `@EnableConfigurationProperties(value = HttpConfig.class)`: `@EnableConfigurationProperties(value = HttpConfig.class)`: indicates that the annotation `@ConfigurationProperties(prefix = "shenyu.sync.http")` on `HttpConfig` will take effect, and the configuration class `HttpConfig` will be injected into the Ioc container.
 #### 2.2 Property initialization
@@ -106,22 +138,26 @@
 In the constructor of `HttpSyncDataService`, complete the property initialization.
-public class HttpSyncDataService implements SyncDataService, AutoCloseable {
+public class HttpSyncDataService implements SyncDataService {
     // omitted attribute field ......
-   public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
-        // 1. create data refresh factory
-        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
-        // 2. get config of admin 
-        this.httpConfig = httpConfig;
-        // shenyu-admin url
-        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
-        // 3. create httpClient, used to initiate requests to admin
-        this.httpClient = createRestTemplate();
-        // 4. start a long polling task
-        this.start();
+    public HttpSyncDataService(final HttpConfig httpConfig,
+                               final PluginDataSubscriber pluginDataSubscriber,
+                               final RestTemplate restTemplate,
+                               final List<MetaDataSubscriber> metaDataSubscribers,
+                               final List<AuthDataSubscriber> authDataSubscribers,
+                               final AccessTokenManager accessTokenManager) {
+          // 1. accessTokenManager
+          this.accessTokenManager = accessTokenManager;
+          // 2. create data refresh factory
+          this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
+          // 3. shenyu-admin url
+          this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
+          // 4. restTemplate
+          this.restTemplate = restTemplate;
+          // 5. start a long polling task
+          this.start();
@@ -130,25 +166,13 @@
 Other functions and related fields are omitted from the above code, and the initialization of the properties is done in the constructor, mainly.
+- the role of `accessTokenManager` is to request `admin` and update the `access token` regularly.
 - creating data processors for subsequent caching of various types of data (plugins, selectors, rules, metadata and authentication data).
 - obtaining the `admin` property configuration, mainly to obtain the `url` of the `admin`, `admin` with possible clusters, multiple split by a comma `(,)`.
-- creating `httpClient`, using `RestTemplate`, for launching requests to `admin`.
-  ```java
-      private RestTemplate createRestTemplate() {
-          OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
-          // connection establishment timeout of 10s
-          factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
-          // The gateway actively requests the configuration service of shenyu-admin, and the read timeout is 90s
-          factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
-          return new RestTemplate(factory);
-      }
-  ```
+- using `RestTemplate`, for launching requests to `admin`.
 - Start the long polling task.
@@ -159,29 +183,31 @@
 In the `start()` method, two things are done, one is to get the full amount of data, that is, to request the `admin` side to get all the data that needs to be synchronized, and then cache the acquired data into the gateway memory. The other is to open a multi-threaded execution of a long polling task.
-private void start() {
-        // Initialize only once, implemented by atomic classes. 
-        RUNNING = new AtomicBoolean(false);
+public class HttpSyncDataService implements SyncDataService {
+    // ......
+    private void start() {
         // It could be initialized multiple times, so you need to control that.
         if (RUNNING.compareAndSet(false, true)) {
             // fetch all group configs.
             // Initial startup, get full data
-            // A backend service, a thread
+            // one backend service, one thread
             int threadSize = serverList.size();
             // ThreadPoolExecutor
             this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                     new LinkedBlockingQueue<>(),
                     ShenyuThreadFactory.create("http-long-polling", true));
             // start long polling, each server creates a thread to listen for changes.
             this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
         } else {
-            log.info("shenyu http long polling was started, executor=[{}]", executor);
+            LOG.info("shenyu http long polling was started, executor=[{}]", executor);
+    // ......
 ##### 2.3.1 Fetch Data
@@ -203,8 +229,12 @@
 The `admin` may be a cluster, and here a request is made to each `admin` in a round-robin fashion, and if one succeeds, then the operation to get the full amount of data from the `admin` and cache it to the gateway is executed successfully. If there is an exception, the request is launched to the next `admin`.
-private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
-    // It is possible that admins are clustered, and here requests are made to each admin by means of a loop.
+public class HttpSyncDataService implements SyncDataService {
+    // ......
+    private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
+        // It is possible that admins are clustered, and here requests are made to each admin by means of a loop.
         for (int index = 0; index < this.serverList.size(); index++) {
             String server = serverList.get(index);
             try {
@@ -219,10 +249,13 @@
                 if (index >= serverList.size() - 1) {
                     throw e;
-                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
+                LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
+    // ......
 - HttpSyncDataService#doFetchGroupConfig()
@@ -230,38 +263,47 @@
 In this method, the request parameters are first assembled, then the request is launched through `httpClient` to `admin` to get the data, and finally the obtained data is updated to the gateway memory.
-// Launch a request to the admin backend management system to get all synchronized data
-private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
-    // 1. build request parameters, all grouped enumeration types
-    StringBuilder params = new StringBuilder();
-    for (ConfigGroupEnum groupKey : groups) {
-        params.append("groupKeys").append("=").append(groupKey.name()).append("&");
+public class HttpSyncDataService implements SyncDataService {
+    // ......
+    // Launch a request to the admin backend management system to get all synchronized data
+    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
+        // 1. build request parameters, all grouped enumeration types
+        StringBuilder params = new StringBuilder();
+        for (ConfigGroupEnum groupKey : groups) {
+            params.append("groupKeys").append("=").append(groupKey.name()).append("&");
+        }
+        // admin url:  /configs/fetch
+        String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");
+        LOG.info("request configs: [{}]", url);
+        String json;
+        try {
+            HttpHeaders headers = new HttpHeaders();
+            // set accessToken
+            headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
+            HttpEntity<String> httpEntity = new HttpEntity<>(headers);
+            // 2. get a request for change data
+            json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();
+        } catch (RestClientException e) {
+            String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
+            LOG.warn(message);
+            throw new ShenyuException(message, e);
+        }
+        // update local cache
+        // 3. Update data in gateway memory
+        boolean updated = this.updateCacheWithJson(json);
+        if (updated) {
+            LOG.debug("get latest configs: [{}]", json);
+            return;
+        }
+        // not updated. it is likely that the current config server has not been updated yet. wait a moment.
+        LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
+        // No data update on the server side, just wait 30s
+        ThreadUtils.sleep(TimeUnit.SECONDS, 30);
-    // admin url:  /configs/fetch
-    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
-    log.info("request configs: [{}]", url);
-    String json = null;
-    try {
-        // 2. get a request for change data
-        json = this.httpClient.getForObject(url, String.class);
-    } catch (RestClientException e) {
-        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
-        log.warn(message);
-        throw new ShenyuException(message, e);
-    }
-    // update local cache
-    // 3. Update data in gateway memory
-    boolean updated = this.updateCacheWithJson(json);
-    // The update was successful and the method is now complete
-    if (updated) {
-        log.info("get latest configs: [{}]", json);
-        return;
-    }
-    // not updated. it is likely that the current config server has not been updated yet. wait a moment.
-    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
-    // No data update on the server side, just wait 30s
-    ThreadUtils.sleep(TimeUnit.SECONDS, 30);
+    // ......
@@ -277,28 +319,41 @@
 Update the data in the gateway memory. Use `GSON` for deserialization, take the real data from the property `data` and give it to `DataRefreshFactory` to do the update.
+public class HttpSyncDataService implements SyncDataService {
+    // ......
     private boolean updateCacheWithJson(final String json) {
         // Using GSON for deserialization
         JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
-        JsonObject data = jsonObject.getAsJsonObject("data");
         // if the config cache will be updated?
-        return factory.executor(data);
+        return factory.executor(jsonObject.getAsJsonObject("data"));
+    // ......
 - DataRefreshFactory#executor()
-Update the data according to different data types and return the updated result. Here, `parallelStream()` is used for parallel update, and the specific update logic is given to the `dataRefresh.refresh()` method. In the update result, one of the data types is updated, which means that the operation has been updated.
+Update the data according to different data types and return the updated result. The specific update logic is given to the `dataRefresh.refresh()` method. In the update result, one of the data types is updated, which means that the operation has been updated.
+public final class DataRefreshFactory {
+    // ......
     public boolean executor(final JsonObject data) {
-        //updata data in parallelStream
+        // update data
         List<Boolean> result = ENUM_MAP.values().parallelStream()
                 .map(dataRefresh -> dataRefresh.refresh(data))
-        //有一个更新就表示此次发生了更新操作
+        // one of the data types is updated, which means that the operation has been updated.
         return result.stream().anyMatch(Boolean.TRUE::equals);
+    // ......
 - AbstractDataRefresh#refresh()
@@ -310,37 +365,50 @@
 In the generic `refresh()` method, it is responsible for data type conversion, determining whether an update is needed, and the actual data refresh operation.
+public abstract class AbstractDataRefresh<T> implements DataRefresh {
+    // ......
     public Boolean refresh(final JsonObject data) {
-        boolean updated = false;
         // convert data
         JsonObject jsonObject = convert(data);
-        if (null != jsonObject) {
-            // get data
-            ConfigData<T> result = fromJson(jsonObject);
-            // does it need to be updated
-            if (this.updateCacheIfNeed(result)) {
-                updated = true;
-                // real update logic, data refresh operation
-                refresh(result.getData());
-            }
+        if (Objects.isNull(jsonObject)) {
+            return false;
+        boolean updated = false;
+        // get data
+        ConfigData<T> result = fromJson(jsonObject);
+        // does it need to be updated
+        if (this.updateCacheIfNeed(result)) {
+            updated = true;
+            // real update logic, data refresh operation
+            refresh(result.getData());
+        }
         return updated;
+    // ......
 - AbstractDataRefresh#updateCacheIfNeed()
 The process of data conversion, which is based on different data types, we will not trace further to see if the data needs to be updated logically. The method name is `updateCacheIfNeed()`, which is implemented by method overloading.
-// result is data
-protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
+public abstract class AbstractDataRefresh<T> implements DataRefresh {
-// newVal is the latest value obtained
-// What kind of data type is groupEnum
-protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
+    // ......
+    // result is data
+    protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
+    // newVal is the latest value obtained
+    // What kind of data type is groupEnum
+    protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
         // If it is the first time, then it is put directly into the cache and returns true, indicating that the update was made this time
         if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
             return true;
@@ -349,22 +417,25 @@
         GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
             // md5 value is the same, no need to update
             if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
-                log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
+                LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
                 return oldVal;
             // The current cached data has been modified for a longer period than the new data and does not need to be updated.
             // must compare the last update time
             if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
-                log.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
+                LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
                 return oldVal;
-            log.info("update {} config: {}", groupEnum, newVal);
+            LOG.info("update {} config: {}", groupEnum, newVal);
             holder.result = true;
             return newVal;
         return holder.result;
+    // ......
 As you can see from the source code above, there are two cases where updates are not required.
@@ -377,25 +448,31 @@
 At this point, we have finished analyzing the logic of the `start()` method to get the full amount of data for the first time, followed by the long polling operation. For convenience, I will paste the `start()` method once more.
+public class HttpSyncDataService implements SyncDataService {
+    // ......
     private void start() {
         // It could be initialized multiple times, so you need to control that.
         if (RUNNING.compareAndSet(false, true)) {
             // fetch all group configs.
             // Initial startup, get full data
-            // one background service, one thread
+            // one backend service, one thread
             int threadSize = serverList.size();
-            // custom thread pool
+            // ThreadPoolExecutor
             this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                     new LinkedBlockingQueue<>(),
                     ShenyuThreadFactory.create("http-long-polling", true));
             // start long polling, each server creates a thread to listen for changes.
             this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
         } else {
-            log.info("shenyu http long polling was started, executor=[{}]", executor);
+            LOG.info("shenyu http long polling was started, executor=[{}]", executor);
+    // ......
 ##### 2.3.2 Execute Long Polling Task
@@ -409,41 +486,39 @@
 class HttpLongPollingTask implements Runnable {
-        private String server;
+    private final String server;
-        // Default retry 3 times
-        private final int retryTimes = 3;
+    HttpLongPollingTask(final String server) {
+        this.server = server;
+    }
-        HttpLongPollingTask(final String server) {
-            this.server = server;
-        }
-        @Override
-        public void run() {
-            // long polling
-            while (RUNNING.get()) {
-                for (int time = 1; time <= retryTimes; time++) {
-                    try {
-                        doLongPolling(server);
-                    } catch (Exception e) {
-                        // print warnning log.
-                        if (time < retryTimes) {
-                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
-                                    time, retryTimes - time, e.getMessage());
-                            // long polling failed, wait 5s and continue
-                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
-                            continue;
-                        }
-                        // print error, then suspended for a while.
-                        log.error("Long polling failed, try again after 5 minutes!", e);
-                        // failed all 3 times, wait 5 minutes and try again
-                        ThreadUtils.sleep(TimeUnit.MINUTES, 5);
+    @Override
+    public void run() {
+        // long polling
+        while (RUNNING.get()) {
+            // Default retry 3 times
+            int retryTimes = 3;
+            for (int time = 1; time <= retryTimes; time++) {
+                try {
+                    doLongPolling(server);
+                } catch (Exception e) {
+                    if (time < retryTimes) {
+                        LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
+                                time, retryTimes - time, e.getMessage());
+                        // long polling failed, wait 5s and continue
+                        ThreadUtils.sleep(TimeUnit.SECONDS, 5);
+                        continue;
+                    // print error, then suspended for a while.
+                    LOG.error("Long polling failed, try again after 5 minutes!", e);
+                    // 3 次都失败了,等 5 分钟再试
+                    ThreadUtils.sleep(TimeUnit.MINUTES, 5);
-            log.warn("Stop http long polling.");
+        LOG.warn("Stop http long polling.");
 - HttpSyncDataService#doLongPolling()
@@ -456,7 +531,8 @@
 - Based on the group that has changed, go back and get the data.
-private void doLongPolling(final String server) {
+public class HttpSyncDataService implements SyncDataService {
+    private void doLongPolling(final String server) {
         // build request params: md5 and lastModifyTime
         MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
         for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
@@ -466,19 +542,22 @@
                 params.put(group.name(), Lists.newArrayList(value));
-        // build request heaad and body
+        // build request head and body
         HttpHeaders headers = new HttpHeaders();
-        HttpEntity httpEntity = new HttpEntity(params, headers);
-        String listenerUrl = server + "/configs/listener";
-        log.debug("request listener configs: [{}]", listenerUrl);
-        JsonArray groupJson = null;
+        // set accessToken
+        headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
+        HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);
+        String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
+        JsonArray groupJson;
         //Initiate a request to admin to determine if the group data has changed
         //Here it just determines whether a group has changed or not
         try {
-            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
-            log.debug("listener result: [{}]", json);
-            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
+            String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();
+            LOG.info("listener result: [{}]", json);
+            JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);
+            groupJson = responseFromServer.getAsJsonArray("data");
         } catch (RestClientException e) {
             String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
             throw new ShenyuException(message, e);
@@ -507,7 +586,15 @@
                 this.doFetchGroupConfig(server, changedGroups);
+        if (Objects.nonNull(groupJson) && groupJson.size() > 0) {
+            // fetch group configuration async.
+            ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);
+            LOG.info("Group config changed: {}", Arrays.toString(changedGroups));
+            // Proactively get the changed data from admin, depending on the grouping, and take the data in full
+            this.doFetchGroupConfig(server, changedGroups);
+        }
 One special point needs to be explained here: In the long polling task, why don't you get the changed data directly? Instead, we determine which group data has been changed, and then request `admin` again to get the changed data?
@@ -517,11 +604,11 @@
 > After the gateway receives the response information, it only knows which Group has changed its configuration, and it needs to request the configuration data of that Group again.
 > There may be a question here: Why not write out the changed data directly?
 > We have discussed this issue in depth during development, because the `http` long polling mechanism can only guarantee quasi-real time, and if it is not processed in time at the gateway layer, it will be very difficult to update the configuration data.
-If the gateway layer is not processed in time, > or the administrator updates the configuration frequently, it is likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.
+If the gateway layer is not processed in time, or the administrator updates the configuration frequently, it is likely to miss the push of a configuration change, so for security reasons, we only inform a group that the information has changed.
 My personal understanding is that.
-> If the change data is written out directly, when the administrator updates the configuration frequently, the first update will `client` remove the blocking queue and return the response information to the gateway. If a second update is made at this time, then the current `client` is not in the blocking queue, so this time the change is missed. The same is true for the gateway layer's untimely processing. This is a long polling, one gateway one synchronization thread, there may be a time-consuming process. If `admin` has data changes, the current gateway client is not in the blocking queue and will not get the data.
+> If the change data is written out directly, when the administrator updates the configuration frequently, the first update will remove the `client` from blocking queue and return the response information to the gateway. If a second update is made at this time, then the current `client` is not in the blocking queue, so this time the change is missed. The same is true for the gateway layer's untimely processing. This is a long polling, one gateway one synchronization thread, there may be a time-consuming process. If `admin` has data changes, the current gateway client is not in the blocking queue and will not get the data.
 We have not yet analyzed the processing logic of the `admin` side, so let's talk about it roughly. At the `admin` end, the gateway `client` will be put into the blocking queue, and when there is a data change, the gateway `client` will come out of the queue and send the change data. So, if the gateway `client` is not in the blocking queue when there is a data change, then the current changed data is not available.
@@ -607,6 +694,10 @@
 The `InitializingBean` interface is implemented, so the `afterInitialize()` method is executed during the initialization of the `bean`. Execute periodic tasks via thread pool: updating the data in memory `(CACHE)` is executed every `5` minutes and starts after `5` minutes. Refreshing the local cache is reading data from the database to the local cache (in this case the memory), done by `refreshLocalCache()`.
+public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
+    // ......
      * is called in the afterPropertiesSet() method of the InitializingBean interface, which is executed during the initialization of the bean
@@ -618,17 +709,20 @@
         // Execution cycle task: Update data in memory (CACHE) is executed every 5 minutes and starts after 5 minutes
         // Prevent the admin from starting up first for a while and then generating data; then the gateway doesn't get the full amount of data when it first connects
         scheduler.scheduleWithFixedDelay(() -> {
-            log.info("http sync strategy refresh config start.");
+            LOG.info("http sync strategy refresh config start.");
             try {
                 // Read data from database to local cache (in this case, memory)
-                log.info("http sync strategy refresh config success.");
+                LOG.info("http sync strategy refresh config success.");
             } catch (Exception e) {
-                log.error("http sync strategy refresh config error!", e);
+                LOG.error("http sync strategy refresh config error!", e);
         }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
-        log.info("http sync strategy refresh interval: {}ms", syncInterval);
+        LOG.info("http sync strategy refresh interval: {}ms", syncInterval);
+    // ......
 - refreshLocalCache()
@@ -636,6 +730,10 @@
 Update for each of the 5 data types.
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
+    // ......
     // Read data from database to local cache (in this case, memory)
     private void refreshLocalCache() {
         //update app auth data
@@ -649,6 +747,9 @@
         //update meta data
+    // ......
 The logic of the 5 update methods is similar, call the `service` method to get the data and put it into the memory `CACHE`. Take the updateRuleData method `updateRuleCache()` for example, pass in the rule enumeration type and call `ruleService.listAll()` to get all the rule data from the database.
@@ -668,10 +769,14 @@
 Update the data in memory using the data in the database.
-// cache Map
-protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
+    // ......
+    // cache Map
+    protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
+    /**
      * if md5 is not the same as the original, then update lcoal cache.
      * @param group ConfigGroupEnum
      * @param <T> the type of class
@@ -686,6 +791,9 @@
         ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
         log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
+    // ......
 The initialization process is to start periodic tasks to update the memory data by fetching data from the database at regular intervals.
@@ -709,11 +817,13 @@
 public class ConfigController {
-    @Resource
-    private HttpLongPollingDataChangedListener longPollingListener;
+    private final HttpLongPollingDataChangedListener longPollingListener;
+    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
+        this.longPollingListener = longPollingListener;
+    }
     // Omit other logic
@@ -736,7 +846,11 @@
 Perform long polling tasks: If there are data changes, they will be responded to the client (in this case, the gateway side) immediately. Otherwise, the client will be blocked until there is a data change or a timeout.
+public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
+    // ......
+    /**
      * Execute long polling: If there is a data change, it will be responded to the client (here is the gateway side) immediately.
      * Otherwise, the client will otherwise remain blocked until there is a data change or a timeout.
      * @param request
@@ -751,11 +865,11 @@
         // Immediate response to the gateway if there is changed data
         if (CollectionUtils.isNotEmpty(changedGroup)) {
             this.generateResponse(response, changedGroup);
-            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
+            Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
-         // No change, then the client (in this case the gateway) is put into the blocking queue
+        // No change, then the client (in this case the gateway) is put into the blocking queue
         // listen for configuration changed.
         final AsyncContext asyncContext = request.startAsync();
         // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
@@ -763,7 +877,7 @@
         // block client's thread.
         scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
 - HttpLongPollingDataChangedListener#compareChangedGroup()
@@ -854,11 +968,13 @@
 public class ConfigController {
-    @Resource
-    private HttpLongPollingDataChangedListener longPollingListener;
+    private final HttpLongPollingDataChangedListener longPollingListener;
+    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
+        this.longPollingListener = longPollingListener;
+    }
      * Fetch configs shenyu result.
@@ -885,7 +1001,10 @@
 Data fetching is taken directly from `CACHE`, and then matched and encapsulated according to different grouping types.
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
+    // ......
      * fetch configuration from cache.
      * @param groupKey the group key
@@ -893,31 +1012,25 @@
     public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
         // get data from CACHE
-        ConfigDataCache config = CACHE.get(groupKey.name()); 
+        ConfigDataCache config = CACHE.get(groupKey.name());
         switch (groupKey) {
             case APP_AUTH: // app auth data
-                List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
+                return buildConfigData(config, AppAuthData.class);
             case PLUGIN: // plugin data
-                List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
+                return buildConfigData(config, PluginData.class);
             case RULE:   // rule data
-                List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
+                return buildConfigData(config, RuleData.class);
             case SELECTOR:  // selector data
-                List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
-            case META_DATA: // meta data
-                List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
+                return buildConfigData(config, SelectorData.class);
+            case META_DATA: // meta data 
+                return buildConfigData(config, MetaData.class);
             default:  // other data type, throw exception
                 throw new IllegalStateException("Unexpected groupKey: " + groupKey);
+    }
+    // ......
 #### 3.5 Data Change
@@ -965,6 +1078,8 @@
                 case SELECTOR:   // selector data
                     listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
+                    // pull and save API document on seletor changed
+                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                 case META_DATA:  // meta data
                     listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
@@ -1039,7 +1154,7 @@
                 // send response to client
-                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
+                Log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
diff --git a/i18n/zh/docusaurus-plugin-content-blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md b/i18n/zh/docusaurus-plugin-content-blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
index af3ee1e..1c504a2 100644
--- a/i18n/zh/docusaurus-plugin-content-blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
+++ b/i18n/zh/docusaurus-plugin-content-blog/DataSync-SourceCode-Analysis-Http-Data-Sync.md
@@ -10,7 +10,7 @@
 在`ShenYu`网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。`Apache ShenYu` 网关当前支持`ZooKeeper`、`WebSocket`、`Http长轮询`、`Nacos` 、`Etcd` 和 `Consul` 进行数据同步。本文的主要内容是基于`Http长轮询`的数据同步源码分析。
-> 本文基于`shenyu-2.4.0`版本进行源码分析,官网的介绍请参考 [数据同步原理](https://shenyu.apache.org/zh/docs/design/data-sync) 。
+> 本文基于`shenyu-2.5.0`版本进行源码分析,官网的介绍请参考 [数据同步原理](https://shenyu.apache.org/zh/docs/design/data-sync) 。
 ### 1. Http长轮询
@@ -32,13 +32,13 @@
-        <!--shenyu data sync start use http-->
-        <dependency>
-        	<groupId>org.apache.shenyu</groupId>
-        	<artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
-        	<version>${project.version}</version>
-        </dependency>
+<!--shenyu data sync start use http-->
+    <groupId>org.apache.shenyu</groupId>
+    <artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId>
+    <version>${project.version}</version>
@@ -53,52 +53,82 @@
  * Http sync data configuration for spring boot.
 @ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")
+@EnableConfigurationProperties(value = HttpConfig.class)
 public class HttpSyncDataConfiguration {
-    /**
-     * Http sync data service.
-     * 创建 HttpSyncDataService 
-     * @param httpConfig         http的配置
-     * @param pluginSubscriber   插件数据订阅
-     * @param metaSubscribers    元数据订阅
-     * @param authSubscribers    认证数据订阅
-     * @return the sync data service
-     */
-    @Bean
-    public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
-                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
-        log.info("you use http long pull sync shenyu data");
-        return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
-                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
-    /**
-     * Http config http config.
-     * 读取http的配置
-     * @return the http config
-     */
-    @Bean
-    @ConfigurationProperties(prefix = "shenyu.sync.http")
-    public HttpConfig httpConfig() {
-        return new HttpConfig();
-    }
+  /**
+   * Rest template.
+   * 创建RestTemplate
+   * @param httpConfig the http config       http配置
+   * @return the rest template
+   */
+  @Bean
+  public RestTemplate restTemplate(final HttpConfig httpConfig) {
+    OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
+    factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? (int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : httpConfig.getConnectionTimeout());
+    factory.setReadTimeout(Objects.isNull(httpConfig.getReadTimeout()) ? (int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT : httpConfig.getReadTimeout());
+    factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? (int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : httpConfig.getWriteTimeout());
+    return new RestTemplate(factory);
+  }
+  /**
+   * AccessTokenManager.
+   * 创建AccessTokenManager,专门用户对admin进行http请求时access token的处理
+   * @param httpConfig   the http config.      
+   * @param restTemplate the rest template.
+   * @return the access token manager.
+   */
+  @Bean
+  public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, final RestTemplate restTemplate) {
+    return new AccessTokenManager(restTemplate, httpConfig);
+  }
+  /**
+   * Http sync data service.
+   * 创建 HttpSyncDataService 
+   * @param httpConfig         the http config
+   * @param pluginSubscriber   the plugin subscriber
+   * @param restTemplate       the rest template
+   * @param metaSubscribers    the meta subscribers
+   * @param authSubscribers    the auth subscribers
+   * @param accessTokenManager the access token manager
+   * @return the sync data service
+   */
+  @Bean
+  public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig,
+                                             final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
+                                             final ObjectProvider<RestTemplate> restTemplate,
+                                             final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
+                                             final ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+                                             final ObjectProvider<AccessTokenManager> accessTokenManager) {
+    LOGGER.info("you use http long pull sync shenyu data");
+    return new HttpSyncDataService(
+            Objects.requireNonNull(httpConfig.getIfAvailable()),
+            Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
+            Objects.requireNonNull(restTemplate.getIfAvailable()),
+            metaSubscribers.getIfAvailable(Collections::emptyList),
+            authSubscribers.getIfAvailable(Collections::emptyList),
+            Objects.requireNonNull(accessTokenManager.getIfAvailable())
+    );
+  }
+`HttpSyncDataConfiguration`是`Http长轮询`数据同步的配置类,负责创建`HttpSyncDataService`(负责`http`数据同步的具体实现)、`RestTemplate`和`AccessTokenManager` (负责与`admin`http调用时access token的处理)。它的注解如下:
 - `@Configuration`:表示这是一个配置类;
 - `@ConditionalOnClass(HttpSyncDataService.class)`:条件注解,表示要有`HttpSyncDataService`这个类;
 - `@ConditionalOnProperty(prefix = "shenyu.sync.http", name = "url")`:条件注解,要有`shenyu.sync.http.url`这个属性配置。
+- `@EnableConfigurationProperties(value = HttpConfig.class)`:表示让HttpConfig上的注解`@ConfigurationProperties(prefix = "shenyu.sync.http")`生效,将`HttpConfig`这个配置类注入Ioc容器中。
@@ -109,21 +139,26 @@
-public class HttpSyncDataService implements SyncDataService, AutoCloseable {
+public class HttpSyncDataService implements SyncDataService {
     // 省略了属性字段......
-    public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
-        // 1.创建数据处理器
-        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
-        // 2.获取admin属性配置
-        this.httpConfig = httpConfig;
-        // shenyu-admin的url, 多个用逗号(,)分割
-        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
-        // 3.创建httpClient,用于向admin发起请求
-        this.httpClient = createRestTemplate();
-        // 4.开始执行长轮询任务
-        this.start();
+    public HttpSyncDataService(final HttpConfig httpConfig,
+                               final PluginDataSubscriber pluginDataSubscriber,
+                               final RestTemplate restTemplate,
+                               final List<MetaDataSubscriber> metaDataSubscribers,
+                               final List<AuthDataSubscriber> authDataSubscribers,
+                               final AccessTokenManager accessTokenManager) {
+          // 1.设置accessTokenManager
+          this.accessTokenManager = accessTokenManager;
+          // 2.创建数据处理器
+          this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
+          // 3.shenyu-admin的url, 多个用逗号(,)分割
+          this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
+          // 4.只用于http长轮询的restTemplate
+          this.restTemplate = restTemplate;
+          // 5.开始执行长轮询任务
+          this.start();
@@ -132,24 +167,13 @@
+- 设置`accessTokenManager`,定时向`admin`请求更新`accessToken`的值。然后每次向`admin`发起请求时都必须将`header`的`X-Access-Token`属性设置成`accessToken`对应的值;
 - 创建数据处理器,用于后续缓存各种类型的数据(插件、选择器、规则、元数据和认证数据);
 - 获取`admin`属性配置,主要是获取`admin`的`url`,`admin`有可能是集群,多个用逗号`(,)`分割;
-- 创建`httpClient`,使用的是`RestTemplate`,用于向`admin`发起请求;
-  ```java
-      private RestTemplate createRestTemplate() {
-          OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
-          // 建立连接超时时间为 10s
-          factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
-          // 网关主动请求 shenyu-admin 的配置服务,读取超时时间为 90s
-          factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
-          return new RestTemplate(factory);
-      }
-  ```
+- 设置`RestTemplate`,用于向`admin`发起请求;
 - 开始执行长轮询任务。
@@ -160,30 +184,30 @@
-private void start() {
-        // 只初始化一次,通过原子类实现。 
-        RUNNING = new AtomicBoolean(false);
-        // It could be initialized multiple times, so you need to control that.
-        if (RUNNING.compareAndSet(false, true)) {
-            // fetch all group configs.
-            // 初次启动,获取全量数据
-            this.fetchGroupConfig(ConfigGroupEnum.values());
+public class HttpSyncDataService implements SyncDataService {
+    // ......
-            // 一个后台服务,一个线程
-            int threadSize = serverList.size();
-            // 自定义线程池
-            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<>(),
-                    ShenyuThreadFactory.create("http-long-polling", true));
-            // start long polling, each server creates a thread to listen for changes.
-            // 开始长轮询,一个admin服务,创建一个线程用于数据同步
-            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
-        } else {
-            log.info("shenyu http long polling was started, executor=[{}]", executor);
-        }
+    private void start() {
+      // // 只初始化一次,通过原子类实现。 
+      if (RUNNING.compareAndSet(false, true)) {
+        // 初次启动,获取全量数据
+        this.fetchGroupConfig(ConfigGroupEnum.values());
+        // 一个后台服务,一个线程
+        int threadSize = serverList.size();
+        // 自定义线程池
+        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                ShenyuThreadFactory.create("http-long-polling", true));
+        // 开始长轮询,一个admin服务,创建一个线程用于数据同步
+        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
+      } else {
+        LOG.info("shenyu http long polling was started, executor=[{}]", executor);
+      }
+    // ......
 ##### 2.3.1 获取全量数据
@@ -205,26 +229,32 @@
-private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
+public class HttpSyncDataService implements SyncDataService {
+  // ......
+  private void fetchGroupConfig(final ConfigGroupEnum... groups) throws ShenyuException {
     // admin有可能是集群,这里通过循环的方式向每个admin发起请求
-        for (int index = 0; index < this.serverList.size(); index++) {
-            String server = serverList.get(index);
-            try {
-                // 真正去执行
-                this.doFetchGroupConfig(server, groups);
-                // 有一个成功,就成功了,可以退出循环
-                break;
-            } catch (ShenyuException e) {
-                // 出现异常,尝试执行下一个
-                // 最后一个也执行失败了,抛出异常
-                // no available server, throw exception.
-                if (index >= serverList.size() - 1) {
-                    throw e;
-                }
-                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
-            }
+    for (int index = 0; index < this.serverList.size(); index++) {
+      String server = serverList.get(index);
+      try {
+        // 真正去执行
+        this.doFetchGroupConfig(server, groups);
+        // 有一个成功,就成功了,可以退出循环
+        break;
+      } catch (ShenyuException e) {
+        // 出现异常,尝试执行下一个
+        // 最后一个也执行失败了,抛出异常
+        if (index >= serverList.size() - 1) {
+          throw e;
+        LOG.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
+      }
+  }
+  // ......
@@ -234,38 +264,40 @@
-// 向admin后台管理系统发起请求,获取所有同步数据
-private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
+public class HttpSyncDataService implements SyncDataService {
+  private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
     // 1. 拼请求参数,所有分组枚举类型
     StringBuilder params = new StringBuilder();
     for (ConfigGroupEnum groupKey : groups) {
-        params.append("groupKeys").append("=").append(groupKey.name()).append("&");
+      params.append("groupKeys").append("=").append(groupKey.name()).append("&");
     // admin端提供的接口  /configs/fetch
-    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
-    log.info("request configs: [{}]", url);
-    String json = null;
+    String url = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + StringUtils.removeEnd(params.toString(), "&");
+    LOG.info("request configs: [{}]", url);
+    String json;
     try {
-        // 2. 发起请求,获取变更数据
-        json = this.httpClient.getForObject(url, String.class);
+      HttpHeaders headers = new HttpHeaders();
+      // 设置accessToken
+      headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
+      HttpEntity<String> httpEntity = new HttpEntity<>(headers);
+      // 2. 发起请求,获取变更数据
+      json = this.restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class).getBody();
     } catch (RestClientException e) {
-        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
-        log.warn(message);
-        throw new ShenyuException(message, e);
+      String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
+      LOG.warn(message);
+      throw new ShenyuException(message, e);
-    // update local cache
     // 3. 更新网关内存中数据
     boolean updated = this.updateCacheWithJson(json);
-    // 更新成功,此方法就执行完成了
     if (updated) {
-        log.info("get latest configs: [{}]", json);
-        return;
+      LOG.debug("get latest configs: [{}]", json);
+      return;
-    // not updated. it is likely that the current config server has not been updated yet. wait a moment.
-    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
+    // 更新成功,此方法就执行完成了
+    LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
     // 服务端没有数据更新,就等30s
     ThreadUtils.sleep(TimeUnit.SECONDS, 30);
+  }
@@ -283,15 +315,14 @@
     private boolean updateCacheWithJson(final String json) {
         // 使用GSON进行反序列化
         JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
-        JsonObject data = jsonObject.getAsJsonObject("data");
         // if the config cache will be updated?
-        return factory.executor(data);
+        return factory.executor(jsonObject.getAsJsonObject("data"));
 - DataRefreshFactory#executor()
     public boolean executor(final JsonObject data) {
@@ -313,23 +344,33 @@
-    @Override
-    public Boolean refresh(final JsonObject data) {
-        boolean updated = false;
-        // 数据类型转换
-        JsonObject jsonObject = convert(data);
-        if (null != jsonObject) {
-            // 得到数据类型
-            ConfigData<T> result = fromJson(jsonObject);
-            // 是否需要更新
-            if (this.updateCacheIfNeed(result)) {
-                updated = true;
-                // 真正的更新逻辑,数据刷新操作
-                refresh(result.getData());
-            }
-        }
-        return updated;
+public abstract class AbstractDataRefresh<T> implements DataRefresh {
+  // ......
+  @Override
+  public Boolean refresh(final JsonObject data) {
+    // 数据类型转换
+    JsonObject jsonObject = convert(data);
+    if (Objects.isNull(jsonObject)) {
+      return false;
+    boolean updated = false;
+    // 得到数据类型
+    ConfigData<T> result = fromJson(jsonObject);
+    // 是否需要更新
+    if (this.updateCacheIfNeed(result)) {
+      updated = true;
+      // 真正的更新逻辑,数据刷新操作
+      refresh(result.getData());
+    }
+    return updated;
+  }
+  // ......
@@ -337,38 +378,44 @@
-// result是数据
-protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
+public abstract class AbstractDataRefresh<T> implements DataRefresh {
-// newVal是获取到的最新的值
-// groupEnum 是哪种数据类型
-protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
-        // 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新
-        if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
-            return true;
-        }
-        ResultHolder holder = new ResultHolder(false);
-        GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
-            // md5 值相同,不需要更新
-            if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
-                log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
-                return oldVal;
-            }
+  // ......
-            // 当前缓存的数据修改时间大于 新来的数据,不需要更新
-            // must compare the last update time
-            if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
-                log.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
-                return oldVal;
-            }
-            log.info("update {} config: {}", groupEnum, newVal);
-            holder.result = true;
-            return newVal;
-        });
-        return holder.result;
+  // result是数据
+  protected abstract boolean updateCacheIfNeed(ConfigData<T> result);
+  // newVal是获取到的最新的值
+  // groupEnum 是哪种数据类型
+  protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
+    // 如果是第一次,那么直接放到cache中,返回 true,表示此次进行了更新
+    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
+      return true;
+    ResultHolder holder = new ResultHolder(false);
+    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
+      // md5 值相同,不需要更新
+      if (StringUtils.equals(oldVal.getMd5(), newVal.getMd5())) {
+        LOG.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
+        return oldVal;
+      }
+      // 当前缓存的数据修改时间大于 新来的数据,不需要更新
+      // must compare the last update time
+      if (oldVal.getLastModifyTime() >= newVal.getLastModifyTime()) {
+        LOG.info("Last update time earlier than the current configuration, the [{}] config cache will not be updated", groupEnum);
+        return oldVal;
+      }
+      LOG.info("update {} config: {}", groupEnum, newVal);
+      holder.result = true;
+      return newVal;
+    });
+    return holder.result;
+  }
+  // ......
@@ -382,26 +429,30 @@
 分析到这里,就将`start()` 方法中初次启动,获取全量数据的逻辑分析完了,接下来是长轮询的操作。为了方便,我将`start()`方法再粘贴一次:
-    private void start() {
-        // It could be initialized multiple times, so you need to control that.
-        if (RUNNING.compareAndSet(false, true)) {
-            // fetch all group configs.
-            // 初次启动,获取全量数据
-            this.fetchGroupConfig(ConfigGroupEnum.values());
+public class HttpSyncDataService implements SyncDataService {
-            // 一个后台服务,一个线程
-            int threadSize = serverList.size();
-            // 自定义线程池
-            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<>(),
-                    ShenyuThreadFactory.create("http-long-polling", true));
-            // start long polling, each server creates a thread to listen for changes.
-            // 开始长轮询,一个admin服务,创建一个线程用于数据同步
-            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
-        } else {
-            log.info("shenyu http long polling was started, executor=[{}]", executor);
-        }
+  // ......
+  private void start() {
+    // // 只初始化一次,通过原子类实现。 
+    if (RUNNING.compareAndSet(false, true)) {
+      // 初次启动,获取全量数据
+      this.fetchGroupConfig(ConfigGroupEnum.values());
+      // 一个后台服务,一个线程
+      int threadSize = serverList.size();
+      // 自定义线程池
+      this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
+              new LinkedBlockingQueue<>(),
+              ShenyuThreadFactory.create("http-long-polling", true));
+      // 开始长轮询,一个admin服务,创建一个线程用于数据同步
+      this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
+    } else {
+      LOG.info("shenyu http long polling was started, executor=[{}]", executor);
+  }
+  // ......
@@ -413,45 +464,41 @@
 class HttpLongPollingTask implements Runnable {
-        private String server;
+  private final String server;
-        // 默认重试 3 次
-        private final int retryTimes = 3;
+  HttpLongPollingTask(final String server) {
+    this.server = server;
+  }
-        HttpLongPollingTask(final String server) {
-            this.server = server;
+  @Override
+  public void run() {
+    // 一直轮询
+    while (RUNNING.get()) {
+      // 默认重试 3 次
+      int retryTimes = 3;
+      for (int time = 1; time <= retryTimes; time++) {
+        try {
+          doLongPolling(server);
+        } catch (Exception e) {
+          if (time < retryTimes) {
+            LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
+                    time, retryTimes - time, e.getMessage());
+            // 长轮询失败了,等 5s 再继续
+            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
+            continue;
+          }
+          LOG.error("Long polling failed, try again after 5 minutes!", e);
+          // 3 次都失败了,等 5 分钟再试
+          ThreadUtils.sleep(TimeUnit.MINUTES, 5);
-        @Override
-        public void run() {
-            // 一直轮询
-            while (RUNNING.get()) {
-                for (int time = 1; time <= retryTimes; time++) {
-                    try {
-                        doLongPolling(server);
-                    } catch (Exception e) {
-                        // print warnning log.
-                        if (time < retryTimes) {
-                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
-                                    time, retryTimes - time, e.getMessage());
-                            // 长轮询失败了,等 5s 再继续
-                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
-                            continue;
-                        }
-                        // print error, then suspended for a while.
-                        log.error("Long polling failed, try again after 5 minutes!", e);
-                        // 3 次都失败了,等 5 分钟再试
-                        ThreadUtils.sleep(TimeUnit.MINUTES, 5);
-                    }
-                }
-            }
-            log.warn("Stop http long polling.");
-        }
+      }
+    LOG.warn("Stop http long polling.");
+  }
 - HttpSyncDataService#doLongPolling()
@@ -464,58 +511,60 @@
 - 根据发生变更的组,再去获取数据。
-private void doLongPolling(final String server) {
-        // 组装请求参数:md5 和 lastModifyTime
-        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
-        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
-            ConfigData<?> cacheConfig = factory.cacheConfigData(group);
-            if (cacheConfig != null) {
-                String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
-                params.put(group.name(), Lists.newArrayList(value));
-            }
-        }
-        // 组装请求头和请求体
-        HttpHeaders headers = new HttpHeaders();
-        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
-        HttpEntity httpEntity = new HttpEntity(params, headers);
-        String listenerUrl = server + "/configs/listener";
-        log.debug("request listener configs: [{}]", listenerUrl);
-        JsonArray groupJson = null;
-        //向admin发起请求,判断组数据是否发生变更
-        //这里只是判断了某个组是否发生变更
-        try {
-            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
-            log.debug("listener result: [{}]", json);
-            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
-        } catch (RestClientException e) {
-            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
-            throw new ShenyuException(message, e);
-        }
-        // 根据发生变更的组,再去获取数据
-        /**
-         * 官网对此处的解释:
-         * 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
-         * 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
-         * 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
-         * 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
-         *
-         * 个人理解:
-         * 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。
-         * 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。
-         * 网关层处理不及时,也是同理。
-         * 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。
-         * 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
-         */
-        if (groupJson != null) {
-            // fetch group configuration async.
-            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
-            if (ArrayUtils.isNotEmpty(changedGroups)) {
-                log.info("Group config changed: {}", Arrays.toString(changedGroups));
-                // 主动向admin获取变更的数据,根据分组不同,全量拿数据
-                this.doFetchGroupConfig(server, changedGroups);
-            }
-        }
+public class HttpSyncDataService implements SyncDataService {
+  private void doLongPolling(final String server) {
+    // 组装请求参数:md5 和 lastModifyTime
+    MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
+    for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
+      ConfigData<?> cacheConfig = factory.cacheConfigData(group);
+      if (cacheConfig != null) {
+        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
+        params.put(group.name(), Lists.newArrayList(value));
+      }
+    // 组装请求头和请求体
+    HttpHeaders headers = new HttpHeaders();
+    headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+    // 设置accessToken
+    headers.set(Constants.X_ACCESS_TOKEN, this.accessTokenManager.getAccessToken());
+    HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, headers);
+    String listenerUrl = server + Constants.SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
+    JsonArray groupJson;
+    //向admin发起请求,判断组数据是否发生变更
+    //这里只是判断了某个组是否发生变更
+    try {
+      String json = this.restTemplate.postForEntity(listenerUrl, httpEntity, String.class).getBody();
+      LOG.info("listener result: [{}]", json);
+      JsonObject responseFromServer = GsonUtils.getGson().fromJson(json, JsonObject.class);
+      groupJson = responseFromServer.getAsJsonArray("data");
+    } catch (RestClientException e) {
+      String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
+      throw new ShenyuException(message, e);
+    }
+    // 根据发生变更的组,再去获取数据
+    /**
+     * 官网对此处的解释:
+     * 网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
+     * 这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
+     * 我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
+     * 或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
+     *
+     * 个人理解:
+     * 如果将变更数据直接写出,当管理员频繁更新配置时,第一次更新了,将client移除阻塞队列,返回响应信息给网关。
+     * 如果这个时候进行了第二次更新,那么当前的client是不在阻塞队列中,所以这一次的变更就会错过。
+     * 网关层处理不及时,也是同理。
+     * 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。
+     * 如果admin有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
+     */
+    if (Objects.nonNull(groupJson) && groupJson.size() > 0) {
+      // fetch group configuration async.
+      ConfigGroupEnum[] changedGroups = GsonUtils.getGson().fromJson(groupJson, ConfigGroupEnum[].class);
+      LOG.info("Group config changed: {}", Arrays.toString(changedGroups));
+      this.doFetchGroupConfig(server, changedGroups);
+    }
+  }
@@ -529,7 +578,7 @@
-> 如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将`client`移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的`client`是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果`admin`有数据变更,当前网关client是没有在阻塞队列中,就不到数据。
+> 如果将变更数据直接写出,管理员频繁更新配置时,第一次更新了,将`client`移除阻塞队列,返回响应信息给网关。如果这个时候进行了第二次更新,那么当前的`client`是不在阻塞队列中,所以这一次的变更就会错过。网关层处理不及时,也是同理。 这是一个长轮询,一个网关一个同步线程,可能存在耗时的过程。如果`admin`有数据变更,当前网关client是没有在阻塞队列中,就会更新不到数据。
@@ -618,28 +667,33 @@
-    /**
-     * 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行
-     */
-    @Override
-    protected void afterInitialize() {
-        long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
-        // Periodically check the data for changes and update the cache
+public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
-        // 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行
-        // 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据
-        scheduler.scheduleWithFixedDelay(() -> {
-            log.info("http sync strategy refresh config start.");
-            try {
-                // 从数据库读取数据到本地缓存(这里就是内存)
-                this.refreshLocalCache();
-                log.info("http sync strategy refresh config success.");
-            } catch (Exception e) {
-                log.error("http sync strategy refresh config error!", e);
-            }
-        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
-        log.info("http sync strategy refresh interval: {}ms", syncInterval);
-    }
+  // ......
+  /**
+   * 在 InitializingBean接口中的afterPropertiesSet()方法中被调用,即在bean的初始化过程中执行
+   */
+  @Override
+  protected void afterInitialize() {
+    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
+    // 执行周期任务:更新内存中(CACHE)的数据每隔5分钟执行一次,5分钟后开始执行
+    // 防止admin先启动一段时间后,产生了数据;然后网关初次连接时,没有拿到全量数据
+    scheduler.scheduleWithFixedDelay(() -> {
+      LOG.info("http sync strategy refresh config start.");
+      try {
+        // 从数据库读取数据到本地缓存(这里就是内存)
+        this.refreshLocalCache();
+        LOG.info("http sync strategy refresh config success.");
+      } catch (Exception e) {
+        LOG.error("http sync strategy refresh config error!", e);
+      }
+    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
+    LOG.info("http sync strategy refresh interval: {}ms", syncInterval);
+  }
+  // ......
 - refreshLocalCache()
@@ -647,19 +701,26 @@
-    // 从数据库读取数据到本地缓存(这里就是内存)
-    private void refreshLocalCache() {
-        //更新认证数据
-        this.updateAppAuthCache();
-        //更新插件数据
-        this.updatePluginCache();
-        //更新规则数据
-        this.updateRuleCache();
-        //更新选择器数据
-        this.updateSelectorCache();
-        //更新元数据
-        this.updateMetaDataCache();
-    }
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
+  // ......
+  // 从数据库读取数据到本地缓存(这里就是内存)
+  private void refreshLocalCache() {
+    //更新认证数据
+    this.updateAppAuthCache();
+    //更新插件数据
+    this.updatePluginCache();
+    //更新规则数据
+    this.updateRuleCache();
+    //更新选择器数据
+    this.updateSelectorCache();
+    //更新元数据
+    this.updateMetaDataCache();
+  }
+  // ......
@@ -679,25 +740,32 @@
-// 缓存数据的 Map
-protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
-     * if md5 is not the same as the original, then update lcoal cache.
-     * 更新缓存中的数据
-     * @param group ConfigGroupEnum
-     * @param <T> the type of class
-     * @param data the new config data
-     */
-    protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
-        //数据序列化
-        String json = GsonUtils.getInstance().toJson(data);
-        //传入md5值和修改时间
-        ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
-        //更新分组数据
-        ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
-        log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
-    }
+  // ......
+  // 缓存数据的 Map
+  protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
+  /**
+   * if md5 is not the same as the original, then update lcoal cache.
+   * 更新缓存中的数据
+   * @param group ConfigGroupEnum
+   * @param <T> the type of class
+   * @param data the new config data
+   */
+  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
+    //数据序列化
+    String json = GsonUtils.getInstance().toJson(data);
+    //传入md5值和修改时间
+    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
+    //更新分组数据
+    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
+    LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
+  }
+  // ......
@@ -721,11 +789,13 @@
 public class ConfigController {
-    @Resource
-    private HttpLongPollingDataChangedListener longPollingListener;
+    private final HttpLongPollingDataChangedListener longPollingListener;
+    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
+      this.longPollingListener = longPollingListener;
+    }
     // 省略其他逻辑
@@ -748,34 +818,37 @@
-     * 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。
-     * 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。
-     * @param request
-     * @param response
-     */
-    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
-        // compare group md5
-        // 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组
-        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
-        String clientIp = getRemoteIp(request);
-        // response immediately.
-        // 有变更的数据,则立即向网关响应
-        if (CollectionUtils.isNotEmpty(changedGroup)) {
-            this.generateResponse(response, changedGroup);
-            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
-            return;
-        }
+public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
-         // 没有变更,则将客户端(这里就是网关)放进阻塞队列
-        // listen for configuration changed.
-        final AsyncContext asyncContext = request.startAsync();
-        // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
-        asyncContext.setTimeout(0L);
-        // block client's thread.
-        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
+  // ......
+  /**
+   * 执行长轮询:如果有数据变更,会立即响应给客户端(这里就是网关端)。
+   * 否则,否则客户端会一直被阻塞,直到有数据变更或者超时。
+   * @param request
+   * @param response
+   */
+  public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
+    // compare group md5
+    // 比较md5,判断网关的数据和admin端的数据是否一致,得到发生变更的数据组
+    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
+    String clientIp = getRemoteIp(request);
+    // response immediately.
+    // 有变更的数据,则立即向网关响应
+    if (CollectionUtils.isNotEmpty(changedGroup)) {
+      this.generateResponse(response, changedGroup);
+      Log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
+      return;
+    // 没有变更,则将客户端(这里就是网关)放进阻塞队列
+    final AsyncContext asyncContext = request.startAsync();
+    asyncContext.setTimeout(0L);
+    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
+  }
+  // ......
 - HttpLongPollingDataChangedListener#compareChangedGroup()
@@ -822,7 +895,7 @@
         public void run() {
             try {
-                // 60秒后移除,并响应客户端
+                // 先设置定时任务:60秒后移除,并响应客户端
                 this.asyncTimeoutFuture = scheduler.schedule(() -> {
                     List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
@@ -866,11 +939,13 @@
 public class ConfigController {
-    @Resource
-    private HttpLongPollingDataChangedListener longPollingListener;
+    private final HttpLongPollingDataChangedListener longPollingListener;
+    public ConfigController(final HttpLongPollingDataChangedListener longPollingListener) {
+      this.longPollingListener = longPollingListener;
+    }
      * Fetch configs shenyu result.
@@ -898,40 +973,32 @@
-    /**
-     * fetch configuration from cache.
-     * 获取分组下的全量数据
-     * @param groupKey the group key
-     * @return the configuration data
-     */
-    public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
-        // 直接从 CACHE 中拿数据
-        ConfigDataCache config = CACHE.get(groupKey.name()); 
-        switch (groupKey) {
-            case APP_AUTH: // 认证数据
-                List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
-            case PLUGIN: // 插件数据
-                List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
-            case RULE:   // 规则数据
-                List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
-            case SELECTOR:  // 选择器数据
-                List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
-            case META_DATA: // 元数据
-                List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
-                }.getType());
-                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
-            default:  // 其他类型,抛出异常
-                throw new IllegalStateException("Unexpected groupKey: " + groupKey);
-        }
+public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
+  /**
+   * fetch configuration from cache.
+   * 获取分组下的全量数据
+   * @param groupKey the group key
+   * @return the configuration data
+   */
+  public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
+    // 直接从 CACHE 中拿数据
+    ConfigDataCache config = CACHE.get(groupKey.name());
+    switch (groupKey) {
+      case APP_AUTH: // 认证数据
+        return buildConfigData(config, AppAuthData.class);
+      case PLUGIN: // 插件数据
+        return buildConfigData(config, PluginData.class);
+      case RULE:   // 规则数据
+        return buildConfigData(config, RuleData.class);
+      case SELECTOR:  // 选择器数据
+        return buildConfigData(config, SelectorData.class);
+      case META_DATA: // 元数据
+        return buildConfigData(config, MetaData.class);
+      default:  // 其他类型,抛出异常
+        throw new IllegalStateException("Unexpected groupKey: " + groupKey);
+    }
+  }
 #### 3.5 数据变更
@@ -979,6 +1046,8 @@
                 case SELECTOR:   // 选择器信息
                     listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
+                    // 当选择器数据更新时,更新API文档信息
+                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                 case META_DATA:  // 元数据
                     listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
@@ -1053,7 +1122,7 @@
                 // 发送响应
-                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
+                LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);