[ISSUE #25] Add HealthService (#41)
* feat: add health service module
* feat: add FunctionManager
* chore: style optimize
* chore: remove redundant code
* chore: rename FunctionManagerBean to FunctionManagerLoader
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java
similarity index 66%
rename from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
rename to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java
index aba415a..8588311 100644
--- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/HealthCheckConfig.java
@@ -17,17 +17,15 @@
package org.apache.eventmesh.dashboard.console.config;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Component;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
-@Component
-public class SchedulerConfig {
+import java.util.List;
- @Bean
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- taskScheduler.setPoolSize(5);
- return taskScheduler;
- }
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class HealthCheckConfig {
+ private List<HealthCheckObjectConfig> checkObjectConfigList;
}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/CheckResultCache.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/CheckResultCache.java
new file mode 100644
index 0000000..540e383
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/CheckResultCache.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health;
+
+import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckStatus;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+public class CheckResultCache {
+
+ private final HashMap<String, HashMap<Long, CheckResult>> cacheMap = new HashMap<>();
+
+ public void update(String type, Long typeId, HealthCheckStatus status, String resultDesc, Long latency) {
+ HashMap<Long, CheckResult> subMap = cacheMap.get(type);
+ if (Objects.isNull(subMap)) {
+ subMap = new HashMap<>();
+ cacheMap.put(type, subMap);
+ }
+ CheckResult oldResult = subMap.get(typeId);
+ String oldDesc = Objects.isNull(oldResult.getResultDesc()) ? "" : oldResult.getResultDesc() + "\n";
+ CheckResult result = new CheckResult(status, oldDesc + resultDesc, LocalDateTime.now(),
+ latency, oldResult.getConfig());
+ subMap.put(typeId, result);
+ }
+
+ public void update(String type, Long typeId, HealthCheckStatus status, String resultDesc, Long latency, HealthCheckObjectConfig config) {
+ HashMap<Long, CheckResult> subMap = cacheMap.get(type);
+ if (Objects.isNull(subMap)) {
+ subMap = new HashMap<>();
+ cacheMap.put(type, subMap);
+ }
+ CheckResult resultToUpdate = subMap.get(typeId);
+ subMap.put(typeId, new CheckResult(status, resultDesc, LocalDateTime.now(), latency, config));
+ }
+
+ public Map<String, HashMap<Long, CheckResult>> getCacheMap() {
+ return Collections.unmodifiableMap(cacheMap);
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public class CheckResult {
+
+ /**
+ * the status of a health check.
+ *
+ * @see HealthCheckStatus
+ */
+ @Setter
+ private HealthCheckStatus status;
+ /**
+ * if not passed, this field is used to store some description.
+ */
+ private String resultDesc;
+ /**
+ * the time this record is inserted into memory map.
+ */
+ private LocalDateTime createTime;
+ /**
+ * latency of a health check, for example ping latency.
+ */
+ private Long latencyMilliSeconds;
+
+ private HealthCheckObjectConfig config;
+ }
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthExecutor.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthExecutor.java
new file mode 100644
index 0000000..765bdab
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthExecutor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health;
+
+import org.apache.eventmesh.dashboard.console.entity.health.HealthCheckResultEntity;
+import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckStatus;
+import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckType;
+import org.apache.eventmesh.dashboard.console.health.CheckResultCache.CheckResult;
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
+import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
+
+import java.util.ArrayList;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HealthExecutor {
+
+ @Setter
+ private HealthDataService dataService;
+
+ /**
+ * memory cache is used to store real-time health check result.
+ */
+ @Getter
+ @Setter
+ private CheckResultCache memoryCache;
+
+ /**
+ * execute function is where health check services work.
+ *
+ * @param service The health check service to be executed.
+ */
+
+ public void execute(AbstractHealthCheckService service) {
+ final long startTime = System.currentTimeMillis();
+ //TODO: execute is called by a ScheduledThreadPoolExecutor,
+ // when called, it should check if current service should doCheck(check service check rate can be dynamically configured).
+ try {
+ memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(), HealthCheckStatus.CHECKING, "",
+ null, service.getConfig());
+ //The callback interface is used to pass the processing methods for checking success and failure.
+ service.doCheck(new HealthCheckCallback() {
+ @Override
+ public void onSuccess() {
+ //when the health check is successful, the result is updated to the memory cache.
+ Long latency = System.currentTimeMillis() - startTime;
+ HealthCheckStatus status =
+ latency > service.getConfig().getRequestTimeoutMillis() ? HealthCheckStatus.TIMEOUT : HealthCheckStatus.PASSED;
+ memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(),
+ status, "health check success", latency
+ );
+ }
+
+ @Override
+ public void onFail(Exception e) {
+ //when the health check fails, the result is updated to the memory cache, passing in the exception message.
+ log.error("Health check failed for reason: {}. Service config is {}", e, service.getConfig());
+ memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(),
+ HealthCheckStatus.FAILED, e.getMessage(),
+ System.currentTimeMillis() - startTime);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Health check failed for reason: {}. Service config is {}", e, service.getConfig());
+ memoryCache.update(service.getConfig().getHealthCheckResourceType(), service.getConfig().getInstanceId(), HealthCheckStatus.FAILED,
+ e.getMessage(),
+ System.currentTimeMillis() - startTime);
+ }
+ }
+
+ /**
+ * this function should be called before any actual execute behaviour.<br> It will check the execution result of the last check cycle in the
+ * memory cache, set tasks that haven't finished status to time out and update the database.
+ */
+ public void startExecute() {
+ ArrayList<HealthCheckResultEntity> resultList = new ArrayList<>();
+ memoryCache.getCacheMap().forEach((type, subMap) -> {
+ subMap.forEach((instanceId, result) -> {
+ if (result.getStatus() == HealthCheckStatus.CHECKING) {
+ result.setStatus(HealthCheckStatus.TIMEOUT);
+ }
+ addToResultList(result, resultList);
+ });
+ });
+ if (!resultList.isEmpty()) {
+ dataService.batchUpdateCheckResultByClusterIdAndTypeAndTypeId(resultList);
+ }
+ }
+
+ /**
+ * this function should be called after all actual execute behaviour.<br> It will insert the result of this check cycle into the database. At this
+ * point the status of the tasks may be CHECKING, they will be updated on the next startExecute.
+ */
+ public void endExecute() {
+ ArrayList<HealthCheckResultEntity> resultList = new ArrayList<>();
+ memoryCache.getCacheMap().forEach((type, subMap) -> {
+ subMap.forEach((instanceId, result) -> {
+ addToResultList(result, resultList);
+ });
+ });
+ dataService.batchInsertHealthCheckResult(resultList);
+ }
+
+ /**
+ * Helper function to add a CheckResult to the resultList.
+ *
+ * @param result memory cached result object.
+ * @param resultList entity list to be inserted into the database.
+ */
+ private void addToResultList(CheckResult result, ArrayList<HealthCheckResultEntity> resultList) {
+ HealthCheckResultEntity newEntity = new HealthCheckResultEntity();
+ newEntity.setClusterId(result.getConfig().getClusterId());
+ newEntity.setType(HealthCheckType.toNumber(result.getConfig().getHealthCheckResourceType()));
+ newEntity.setTypeId(result.getConfig().getInstanceId());
+ newEntity.setResultDesc(result.getResultDesc());
+ newEntity.setStatus(result.getStatus().getNumber());
+
+ resultList.add(newEntity);
+ }
+
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthService.java
new file mode 100644
index 0000000..39485e1
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/HealthService.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health;
+
+import org.apache.eventmesh.dashboard.console.health.CheckResultCache.CheckResult;
+import org.apache.eventmesh.dashboard.console.health.annotation.HealthCheckType;
+import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+import org.apache.eventmesh.dashboard.console.health.check.impl.StorageRedisCheck;
+import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * HealthService is the manager of all health check services. It is responsible for creating, deleting and executing health check services.<br> In
+ * this class there is a {@link HealthExecutor} which is used to execute health check services, and also a map to store all health check services.
+ * when the function executeAll is called, health check service will be executed by {@link HealthExecutor}.
+ */
+@Slf4j
+public class HealthService {
+
+ private HealthExecutor healthExecutor;
+
+ /**
+ * class cache used to build healthCheckService instance.<br> key: HealthCheckObjectConfig.SimpleClassName value: HealthCheckService
+ *
+ * @see HealthCheckObjectConfig
+ */
+ private static final Map<String, Class<?>> HEALTH_CHECK_CLASS_CACHE = new HashMap<>();
+
+ static {
+ setClassCache(StorageRedisCheck.class);
+ }
+
+ private static void setClassCache(Class<?> clazz) {
+ HEALTH_CHECK_CLASS_CACHE.put(clazz.getSimpleName(), clazz);
+ }
+
+ /**
+ * This map is used to store HealthExecutor.<br> Outside key is Type(runtime, storage etc.), inside key is the id of type instance(runtimeId,
+ * storageId etc.).
+ *
+ * @see AbstractHealthCheckService
+ */
+ private Map<String, Map<Long, AbstractHealthCheckService>> checkServiceMap = new ConcurrentHashMap<>();
+
+ private ScheduledThreadPoolExecutor scheduledExecutor;
+
+ public Map<String, HashMap<Long, CheckResult>> getCheckResultCacheMap() {
+ return healthExecutor.getMemoryCache().getCacheMap();
+ }
+
+ public void insertCheckService(List<HealthCheckObjectConfig> configList) {
+ configList.forEach(this::insertCheckService);
+ }
+
+ public void insertCheckService(HealthCheckObjectConfig config) {
+ AbstractHealthCheckService healthCheckService = null;
+
+ try {
+ if (Objects.nonNull(config.getSimpleClassName())) {
+ Class<?> clazz = HEALTH_CHECK_CLASS_CACHE.get(config.getSimpleClassName());
+ healthCheckService = createCheckService(clazz, config);
+ //if simpleClassName is null, use type(storage) and subType(redis) to create healthCheckService
+ //healthCheckService is annotated with @HealthCheckType(type = "storage", subType = "redis")
+ } else if (Objects.isNull(config.getSimpleClassName())
+ && Objects.nonNull(config.getHealthCheckResourceType()) && Objects.nonNull(
+ config.getHealthCheckResourceSubType())) {
+ for (Entry<String, Class<?>> entry : HEALTH_CHECK_CLASS_CACHE.entrySet()) {
+ Class<?> clazz = entry.getValue();
+ HealthCheckType annotation = clazz.getAnnotation(HealthCheckType.class);
+ if (annotation != null && annotation.type().equals(config.getHealthCheckResourceType()) && annotation.subType()
+ .equals(config.getHealthCheckResourceSubType())) {
+ healthCheckService = createCheckService(clazz, config);
+ break;
+ }
+ }
+ // you can pass an object to create a HealthCheckService(not commonly used)
+ } else if (Objects.nonNull(config.getCheckClass())) {
+ healthCheckService = createCheckService(config.getCheckClass(), config);
+ }
+ } catch (Exception e) {
+ log.error("create healthCheckService failed, healthCheckObjectConfig:{}", config, e);
+ }
+
+ // if all above creation method failed
+ if (Objects.isNull(healthCheckService)) {
+ throw new RuntimeException("No construct method of Health Check Service is found, config is {}" + config);
+ }
+ }
+
+ public void deleteCheckService(String resourceType, Long resourceId) {
+ Map<Long, AbstractHealthCheckService> subMap = checkServiceMap.get(resourceType);
+ if (Objects.isNull(subMap)) {
+ return;
+ }
+ subMap.remove(resourceId);
+ if (subMap.isEmpty()) {
+ checkServiceMap.remove(resourceType);
+ }
+ }
+
+
+ public void createExecutor(HealthDataService dataService, CheckResultCache cache) {
+ healthExecutor = new HealthExecutor();
+ healthExecutor.setDataService(dataService);
+ healthExecutor.setMemoryCache(cache);
+ }
+
+ public void executeAll() {
+ healthExecutor.startExecute();
+
+ checkServiceMap.forEach((type, subMap) -> {
+ subMap.forEach((typeId, healthCheckService) -> {
+ healthExecutor.execute(healthCheckService);
+ });
+ });
+
+ healthExecutor.endExecute();
+ }
+
+ private AbstractHealthCheckService createCheckService(Class<?> clazz, HealthCheckObjectConfig config)
+ throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+ Constructor<?> constructor = clazz.getConstructor(HealthCheckObjectConfig.class);
+ return (AbstractHealthCheckService) constructor.newInstance(config);
+ }
+
+ /**
+ * start scheduled execution of health check services
+ *
+ * @param initialDelay unit is second
+ * @param period unit is second
+ */
+ public void startScheduledExecution(long initialDelay, long period) {
+ if (scheduledExecutor == null) {
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+ }
+ scheduledExecutor.scheduleAtFixedRate(this::executeAll, initialDelay, period, TimeUnit.SECONDS);
+ }
+
+ public void stopScheduledExecution() {
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ }
+ }
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/annotation/HealthCheckType.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/annotation/HealthCheckType.java
new file mode 100644
index 0000000..6f39658
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/annotation/HealthCheckType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is used to mark the type of health check service implement.
+ * @see org.apache.eventmesh.dashboard.console.enums.health.HealthCheckType
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface HealthCheckType {
+ /**
+ * type of resource. runtime, topic etc.
+ */
+ String type();
+
+ /**
+ * subtype of resource. For resource storage, it can be redis, rocketmq etc.
+ */
+ String subType() default "";
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/callback/HealthCheckCallback.java
similarity index 62%
copy from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
copy to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/callback/HealthCheckCallback.java
index aba415a..7f7b360 100644
--- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/callback/HealthCheckCallback.java
@@ -15,19 +15,16 @@
* limitations under the License.
*/
-package org.apache.eventmesh.dashboard.console.config;
+package org.apache.eventmesh.dashboard.console.health.callback;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Component;
+import org.apache.eventmesh.dashboard.console.health.HealthExecutor;
-@Component
-public class SchedulerConfig {
+/**
+ * Callback used by HealthService.doCheck to notify the caller of the result of the health check.<br>
+ * @see HealthExecutor
+ */
+public interface HealthCheckCallback {
+ public void onSuccess();
- @Bean
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- taskScheduler.setPoolSize(5);
- return taskScheduler;
- }
+ public void onFail(Exception e);
}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/AbstractHealthCheckService.java
similarity index 61%
copy from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
copy to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/AbstractHealthCheckService.java
index aba415a..6fa7045 100644
--- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/AbstractHealthCheckService.java
@@ -15,19 +15,23 @@
* limitations under the License.
*/
-package org.apache.eventmesh.dashboard.console.config;
+package org.apache.eventmesh.dashboard.console.health.check;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Component;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
-@Component
-public class SchedulerConfig {
+import lombok.Getter;
- @Bean
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- taskScheduler.setPoolSize(5);
- return taskScheduler;
+/**
+ * extends
+ */
+@Getter
+public abstract class AbstractHealthCheckService implements HealthCheckService {
+
+ private final HealthCheckObjectConfig config;
+
+ public AbstractHealthCheckService(HealthCheckObjectConfig healthCheckObjectConfig) {
+ this.config = healthCheckObjectConfig;
+ this.init();
}
+
}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/HealthCheckService.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/HealthCheckService.java
new file mode 100644
index 0000000..8f37363
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/HealthCheckService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health.check;
+
+import org.apache.eventmesh.dashboard.console.health.HealthExecutor;
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+
+/**
+ * Health check service interface.<br>
+ * To add a new check service, extend the {@link AbstractHealthCheckService}.
+ * @see AbstractHealthCheckService
+ */
+public interface HealthCheckService {
+
+ /**
+ * Do the health check.<br>
+ * To implement a new check service, add the necessary logic to call the success and fail functions of the callback.
+ * @param callback The behaviour of the callback is defined as a lambda function when used. Please refer to {@link HealthExecutor} for usage.
+ */
+ public void doCheck(HealthCheckCallback callback);
+
+ public void init();
+
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/config/HealthCheckObjectConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/config/HealthCheckObjectConfig.java
new file mode 100644
index 0000000..c33931b
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/config/HealthCheckObjectConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health.check.config;
+
+import lombok.Data;
+
+@Data
+public class HealthCheckObjectConfig {
+
+ private Long instanceId;
+
+ private String healthCheckResourceType;
+
+ private String healthCheckResourceSubType;
+
+ private String simpleClassName;
+
+ private Class<?> checkClass;
+
+ private Long clusterId;
+
+ private String connectUrl;
+ //mysql, redis
+ private String host;
+
+ private Integer port;
+
+ private String username;
+
+ private String password;
+
+ //mysql, redis
+ private String database;
+
+ private Long requestTimeoutMillis = Long.MAX_VALUE;
+}
\ No newline at end of file
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java
new file mode 100644
index 0000000..dcf0582
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health.check.impl;
+
+import org.apache.eventmesh.dashboard.console.health.annotation.HealthCheckType;
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.RedisURI.Builder;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+
+
+@HealthCheckType(type = "storage", subType = "redis")
+public class StorageRedisCheck extends AbstractHealthCheckService {
+
+ private RedisClient redisClient;
+
+ public StorageRedisCheck(HealthCheckObjectConfig healthCheckObjectConfig) {
+ super(healthCheckObjectConfig);
+ }
+
+ @Override
+ public void doCheck(HealthCheckCallback callback) {
+ try {
+ RedisAsyncCommands<String, String> commands = redisClient.connect().async();
+ commands.ping().thenAccept(result -> {
+ callback.onSuccess();
+ }).exceptionally(e -> {
+ if (e instanceof Exception) {
+ callback.onFail((Exception) e);
+ } else {
+ callback.onFail(new RuntimeException("RedisCheck failed."));
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ callback.onFail(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ String redisUrl;
+ if (Objects.nonNull(getConfig().getConnectUrl()) && !getConfig().getConnectUrl().isEmpty()) {
+ redisUrl = getConfig().getConnectUrl();
+ } else {
+ Builder builder = RedisURI.Builder.redis(getConfig().getHost(), getConfig().getPort());
+ if (Objects.nonNull(getConfig().getUsername()) && Objects.nonNull(getConfig().getPassword())) {
+ builder.withAuthentication(getConfig().getUsername(), getConfig().getPassword());
+ }
+ if (Objects.nonNull(getConfig().getRequestTimeoutMillis())) {
+ builder.withTimeout(Duration.ofMillis(getConfig().getRequestTimeoutMillis()));
+ }
+ if (Objects.nonNull(getConfig().getDatabase())) {
+ builder.withDatabase(Integer.parseInt(getConfig().getDatabase()));
+ }
+ redisUrl = builder.build().toString();
+ }
+ RedisClient redisClient = RedisClient.create(redisUrl);
+ }
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRocketmqCheck.java
similarity index 61%
copy from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
copy to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRocketmqCheck.java
index aba415a..b6f56b8 100644
--- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRocketmqCheck.java
@@ -15,19 +15,8 @@
* limitations under the License.
*/
-package org.apache.eventmesh.dashboard.console.config;
+package org.apache.eventmesh.dashboard.console.health.check.impl;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Component;
+public class StorageRocketmqCheck {
-@Component
-public class SchedulerConfig {
-
- @Bean
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- taskScheduler.setPoolSize(5);
- return taskScheduler;
- }
}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManager.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManager.java
new file mode 100644
index 0000000..50f5efb
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.spring.support;
+
+import org.apache.eventmesh.dashboard.console.health.CheckResultCache;
+import org.apache.eventmesh.dashboard.console.health.HealthService;
+import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * FunctionManager is in charge of tasks such as scheduled health checks
+ */
+public class FunctionManager {
+
+ @Setter
+ private FunctionManagerProperties properties;
+
+ @Getter
+ private HealthService healthService;
+
+ @Setter
+ private HealthDataService healthDataService;
+
+ public void initFunctionManager() {
+ // HealthService Initialization
+ healthService = new HealthService();
+ CheckResultCache checkResultCache = new CheckResultCache();
+ healthService.createExecutor(healthDataService, checkResultCache);
+ healthService.startScheduledExecution(5, 5);
+ }
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerLoader.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerLoader.java
new file mode 100644
index 0000000..405cedc
--- /dev/null
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerLoader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.spring.support;
+
+import org.apache.eventmesh.dashboard.console.health.HealthService;
+import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FunctionManagerLoader {
+
+ private FunctionManager functionManager;
+
+ private FunctionManagerProperties properties;
+
+ @Autowired
+ private HealthDataService healthDataService;
+
+ @Bean
+ public HealthService getHealthService() {
+ return functionManager.getHealthService();
+ }
+
+ @PostConstruct
+ void initManager() {
+ functionManager = new FunctionManager();
+ functionManager.setProperties(properties);
+ functionManager.setHealthDataService(healthDataService);
+ functionManager.initFunctionManager();
+ }
+}
diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerProperties.java
similarity index 61%
copy from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
copy to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerProperties.java
index aba415a..5709dc1 100644
--- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/config/SchedulerConfig.java
+++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/spring/support/FunctionManagerProperties.java
@@ -15,19 +15,8 @@
* limitations under the License.
*/
-package org.apache.eventmesh.dashboard.console.config;
+package org.apache.eventmesh.dashboard.console.spring.support;
-import org.springframework.context.annotation.Bean;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Component;
+public class FunctionManagerProperties {
-@Component
-public class SchedulerConfig {
-
- @Bean
- public ThreadPoolTaskScheduler taskScheduler() {
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- taskScheduler.setPoolSize(5);
- return taskScheduler;
- }
}
diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthExecutorTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthExecutorTest.java
new file mode 100644
index 0000000..c2bc178
--- /dev/null
+++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthExecutorTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.eventmesh.dashboard.console.EventMeshDashboardApplication;
+import org.apache.eventmesh.dashboard.console.entity.health.HealthCheckResultEntity;
+import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckStatus;
+import org.apache.eventmesh.dashboard.console.enums.health.HealthCheckType;
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+import org.apache.eventmesh.dashboard.console.service.health.impl.HealthDataServiceDatabaseImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.jdbc.Sql;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(classes = EventMeshDashboardApplication.class)
+@ActiveProfiles("test")
+@Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = {"classpath:use-test-schema.sql", "classpath:eventmesh-dashboard.sql"})
+class HealthExecutorTest {
+
+ private HealthExecutor healthExecutor = new HealthExecutor();
+ private CheckResultCache memoryCache = new CheckResultCache();
+
+ @Autowired
+ HealthDataServiceDatabaseImpl healthDataService;
+
+ @Mock
+ AbstractHealthCheckService successHealthCheckService;
+
+ @Mock
+ AbstractHealthCheckService failHealthCheckService;
+
+ @Mock
+ AbstractHealthCheckService timeoutHealthCheckService;
+
+ @BeforeEach
+ public void initMock() {
+ Mockito.lenient().doAnswer((Answer<Void>) invocation -> {
+ HealthCheckCallback callback = invocation.getArgument(0);
+ callback.onSuccess();
+ return null;
+ }).when(successHealthCheckService).doCheck(any(HealthCheckCallback.class));
+ Mockito.lenient().doAnswer((Answer<Void>) invocation -> {
+ HealthCheckCallback callback = invocation.getArgument(0);
+ callback.onFail(new RuntimeException("TestRuntimeException"));
+ return null;
+ }).when(failHealthCheckService).doCheck(any(HealthCheckCallback.class));
+ Mockito.lenient().doAnswer((Answer<Void>) invocation -> {
+ HealthCheckCallback callback = invocation.getArgument(0);
+ CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ callback.onFail(new RuntimeException("TestRuntimeException"));
+ });
+ return null;
+ }).when(timeoutHealthCheckService).doCheck(any(HealthCheckCallback.class));
+
+ healthExecutor.setDataService(healthDataService);
+ healthExecutor.setMemoryCache(memoryCache);
+ HealthCheckObjectConfig config1 = new HealthCheckObjectConfig();
+ config1.setInstanceId(1L);
+ config1.setHealthCheckResourceType("storage");
+ config1.setHealthCheckResourceSubType("redis");
+ config1.setConnectUrl("redis://localhost:6379");
+ config1.setSimpleClassName("StorageRedisCheck");
+ config1.setClusterId(1L);
+ Mockito.lenient().when(successHealthCheckService.getConfig()).thenReturn(config1);
+ Mockito.lenient().when(timeoutHealthCheckService.getConfig()).thenReturn(config1);
+ HealthCheckObjectConfig config2 = new HealthCheckObjectConfig();
+ config2.setInstanceId(2L);
+ config2.setHealthCheckResourceType("storage");
+ config2.setHealthCheckResourceSubType("redis");
+ config2.setConnectUrl("redis://localhost:6379");
+ config2.setSimpleClassName("StorageRedisCheck");
+ config2.setClusterId(1L);
+ Mockito.lenient().when(failHealthCheckService.getConfig()).thenReturn(config2);
+ }
+
+ @Test
+ public void testExecute() {
+ healthExecutor.execute(successHealthCheckService);
+ healthExecutor.execute(failHealthCheckService);
+ assertEquals(2, memoryCache.getCacheMap().get("storage").size());
+ assertNotEquals(memoryCache.getCacheMap().get("storage").get(1L).getStatus(), memoryCache.getCacheMap().get("storage").get(2L).getStatus());
+ }
+
+
+ @Test
+ public void testEndExecute() {
+ healthExecutor.execute(successHealthCheckService);
+ healthExecutor.execute(failHealthCheckService);
+ healthExecutor.endExecute();
+ HealthCheckResultEntity query = new HealthCheckResultEntity();
+ query.setClusterId(1L);
+ query.setType(HealthCheckType.STORAGE.getNumber());
+ query.setTypeId(2L);
+ assertNotNull(healthDataService.queryHealthCheckResultByClusterIdAndTypeAndTypeId(query).get(0).getStatus());
+ }
+
+ @Test
+ public void testStartExecute() {
+ healthExecutor.execute(successHealthCheckService);
+ healthExecutor.execute(failHealthCheckService);
+ //to test startExecute(), we need to call endExecute() first
+ healthExecutor.endExecute();
+ healthExecutor.startExecute();
+ HealthCheckResultEntity query = new HealthCheckResultEntity();
+ query.setClusterId(1L);
+ query.setType(HealthCheckType.STORAGE.getNumber());
+ query.setTypeId(1L);
+ assertEquals(1, healthDataService.queryHealthCheckResultByClusterIdAndTypeAndTypeId(query).get(0).getStatus());
+ }
+
+ @Test
+ public void testTimeout() {
+ healthExecutor.execute(timeoutHealthCheckService);
+ healthExecutor.endExecute();
+ healthExecutor.startExecute();
+
+ HealthCheckResultEntity query = new HealthCheckResultEntity();
+ query.setClusterId(1L);
+ query.setType(HealthCheckType.STORAGE.getNumber());
+ query.setTypeId(1L);
+ assertEquals(HealthCheckStatus.TIMEOUT.getNumber(),
+ healthDataService.queryHealthCheckResultByClusterIdAndTypeAndTypeId(query).get(0).getStatus());
+ }
+
+ @Test
+ public void testFull() throws InterruptedException {
+ healthExecutor.startExecute();
+ healthExecutor.execute(successHealthCheckService);
+ healthExecutor.execute(failHealthCheckService);
+ healthExecutor.endExecute();
+ Thread.sleep(1000);
+ healthExecutor.startExecute();
+ healthExecutor.execute(successHealthCheckService);
+ healthExecutor.execute(failHealthCheckService);
+ healthExecutor.endExecute();
+ HealthCheckResultEntity query = new HealthCheckResultEntity();
+ query.setClusterId(1L);
+ query.setType(HealthCheckType.STORAGE.getNumber());
+ query.setTypeId(1L);
+ assertEquals(2, healthDataService.queryHealthCheckResultByClusterIdAndTypeAndTypeId(query).size());
+ }
+
+}
\ No newline at end of file
diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthServiceTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthServiceTest.java
new file mode 100644
index 0000000..278852f
--- /dev/null
+++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/HealthServiceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService;
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+import org.apache.eventmesh.dashboard.console.service.health.HealthDataService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class HealthServiceTest {
+
+ HealthService healthService = new HealthService();
+
+ @Mock
+ private HealthDataService healthDataService;
+
+ @Mock
+ private CheckResultCache checkResultCache;
+
+ @BeforeEach
+ void init() {
+ healthService.createExecutor(healthDataService, checkResultCache);
+ }
+
+ @Test
+ void testInsertCheckServiceWithAnnotation() {
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(2L);
+ config.setHealthCheckResourceType("storage");
+ config.setHealthCheckResourceSubType("redis");
+ config.setConnectUrl("redis://localhost:6379");
+ healthService.insertCheckService(config);
+ }
+
+ @Test
+ void testInsertCheckServiceWithSimpleClassName() {
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(1L);
+ config.setSimpleClassName("StorageRedisCheck");
+ config.setClusterId(1L);
+ config.setConnectUrl("redis://localhost:6379");
+ healthService.insertCheckService(config);
+ }
+
+ @Test
+ void testInsertCheckServiceWithClass() {
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(1L);
+ config.setClusterId(1L);
+ config.setCheckClass(TestHealthCheckService.class);
+ healthService.insertCheckService(config);
+ }
+
+ @Test
+ void testDeleteCheckService() {
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(1L);
+ config.setHealthCheckResourceType("storage");
+ config.setClusterId(1L);
+ config.setCheckClass(TestHealthCheckService.class);
+ healthService.insertCheckService(config);
+ healthService.deleteCheckService("storage", 1L);
+ }
+
+ @Test
+ void testExecuteAll(){
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(1L);
+ config.setHealthCheckResourceType("storage");
+ config.setClusterId(1L);
+ config.setCheckClass(TestHealthCheckService.class);
+ healthService.insertCheckService(config);
+ healthService.executeAll();
+ }
+
+ public static class TestHealthCheckService extends AbstractHealthCheckService {
+
+
+ public TestHealthCheckService(HealthCheckObjectConfig healthCheckObjectConfig) {
+ super(healthCheckObjectConfig);
+ }
+
+ @Override
+ public void doCheck(HealthCheckCallback callback) {
+ callback.onSuccess();
+ }
+
+ @Override
+ public void init() {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheckTest.java
new file mode 100644
index 0000000..d7c5a58
--- /dev/null
+++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheckTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.dashboard.console.health.check.impl;
+
+import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig;
+import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class StorageRedisCheckTest {
+
+ private StorageRedisCheck storageRedisCheck;
+
+ @BeforeEach
+ public void init() {
+ HealthCheckObjectConfig config = new HealthCheckObjectConfig();
+ config.setInstanceId(1L);
+ config.setHealthCheckResourceType("storage");
+ config.setHealthCheckResourceSubType("redis");
+ config.setSimpleClassName("StorageRedisCheck");
+ config.setClusterId(1L);
+ config.setConnectUrl("redis://localhost:6379");
+ storageRedisCheck = new StorageRedisCheck(config);
+ }
+
+ @Test
+ public void testDoCheck() {
+ storageRedisCheck.doCheck(new HealthCheckCallback() {
+ @Override
+ public void onSuccess() {
+ System.out.println("success");
+ }
+
+ @Override
+ public void onFail(Exception e) {
+ System.out.println("fail");
+ }
+ });
+ }
+}
\ No newline at end of file