Introduce scheduled executor wrapper with dynamic interval (#8916)
* Introduce scheduled executor wrapper with dynamic interval
* Add validation for configkey
diff --git a/api/src/main/java/com/cloud/user/ResourceLimitService.java b/api/src/main/java/com/cloud/user/ResourceLimitService.java
index 0a64cbb..04560df 100644
--- a/api/src/main/java/com/cloud/user/ResourceLimitService.java
+++ b/api/src/main/java/com/cloud/user/ResourceLimitService.java
@@ -38,7 +38,7 @@
static final ConfigKey<Long> MaxProjectSecondaryStorage = new ConfigKey<>("Project Defaults", Long.class, "max.project.secondary.storage", "400",
"The default maximum secondary storage space (in GiB) that can be used for a project", false);
static final ConfigKey<Long> ResourceCountCheckInterval = new ConfigKey<>("Advanced", Long.class, "resourcecount.check.interval", "300",
- "Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", false);
+ "Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", true);
static final ConfigKey<String> ResourceLimitHostTags = new ConfigKey<>("Advanced", String.class, "resource.limit.host.tags", "",
"A comma-separated list of tags for host resource limits", true);
static final ConfigKey<String> ResourceLimitStorageTags = new ConfigKey<>("Advanced", String.class, "resource.limit.storage.tags", "",
diff --git a/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java
new file mode 100644
index 0000000..b8d7e78
--- /dev/null
+++ b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Uses a ScheduledExecutorService and config key to execute a runnable,
+ * dynamically rescheduling based on the long value of the config key.
+ * Timing is similar to ScheduledExecutorService.scheduleAtFixedRate(),
+ * but we look up the next runtime dynamically via the config key.
+ * <p>
+ * If config key is zero, this disables the execution. We skip execution
+ * and check once a minute in order to re-start execution if re-enabled.
+ */
+public class ConfigKeyScheduledExecutionWrapper implements Runnable {
+ protected Logger logger = LogManager.getLogger(getClass());
+ private final ScheduledExecutorService executorService;
+ private final Runnable command;
+ private final ConfigKey<?> configKey;
+ private final TimeUnit unit;
+ private long enableIntervalSeconds = 60;
+
+ private void validateArgs(ScheduledExecutorService executorService, Runnable command, ConfigKey<?> configKey) {
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorService cannot be null");
+ }
+ if (command == null) {
+ throw new IllegalArgumentException("Command cannot be null");
+ }
+ if (configKey == null) {
+ throw new IllegalArgumentException("ConfigKey cannot be null");
+ }
+ if (!(configKey.value() instanceof Long || configKey.value() instanceof Integer)) {
+ throw new IllegalArgumentException("ConfigKey value must be a Long or Integer");
+ }
+ }
+
+ public ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command,
+ ConfigKey<?> configKey, TimeUnit unit) {
+ validateArgs(executorService, command, configKey);
+ this.executorService = executorService;
+ this.command = command;
+ this.configKey = configKey;
+ this.unit = unit;
+ }
+
+ protected ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command,
+ ConfigKey<?> configKey, int enableIntervalSeconds, TimeUnit unit) {
+ validateArgs(executorService, command, configKey);
+ this.executorService = executorService;
+ this.command = command;
+ this.configKey = configKey;
+ this.unit = unit;
+ this.enableIntervalSeconds = enableIntervalSeconds;
+ }
+
+ public ScheduledFuture<?> start() {
+ long duration = getConfigValue();
+ duration = duration < 0 ? 0 : duration;
+ return this.executorService.schedule(this, duration, this.unit);
+ }
+
+ long getConfigValue() {
+ if (this.configKey.value() instanceof Long) {
+ return (Long) this.configKey.value();
+ } else if (this.configKey.value() instanceof Integer) {
+ return (Integer) this.configKey.value();
+ } else {
+ throw new IllegalArgumentException("ConfigKey value must be a Long or Integer");
+ }
+ }
+
+ @Override
+ public void run() {
+ if (getConfigValue() <= 0) {
+ executorService.schedule(this, enableIntervalSeconds, TimeUnit.SECONDS);
+ return;
+ }
+
+ long startTime = System.nanoTime();
+ try {
+ command.run();
+ } catch (Throwable t) {
+ logger.warn(String.format("Last run of %s encountered an error", this.command.getClass()), t);
+ } finally {
+ long elapsed = System.nanoTime() - startTime;
+ long delay = this.unit.toNanos(getConfigValue()) - elapsed;
+ delay = delay > 0 ? delay : 0;
+ executorService.schedule(this, delay, NANOSECONDS);
+ }
+ }
+}
diff --git a/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java
new file mode 100644
index 0000000..fbb4dc2
--- /dev/null
+++ b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.config;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConfigKeyScheduledExecutionWrapperTest {
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("TestExecutor"));
+
+ @Mock
+ ConfigKey<Integer> configKey;
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullExecutorTest() {
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(null, runnable, configKey, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullCommandTest() {
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, null, configKey, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullConfigKeyTest() {
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, null, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void invalidConfigKeyTest() {
+ TestRunnable runnable = new TestRunnable();
+ ConfigKey<String> configKey = new ConfigKey<>(String.class, "test", "test", "test", "test", true,
+ ConfigKey.Scope.Global, null, null, null, null, null, ConfigKey.Kind.CSV, null);
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void scheduleOncePerSecondTest() {
+ when(configKey.value()).thenReturn(1);
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS);
+ runner.start();
+
+ waitSeconds(3);
+ assertThat("Runnable ran once per second", runnable.getRunCount(), isOneOf(2, 3));
+ }
+
+ private void waitSeconds(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000L + 100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void scheduleTwicePerSecondTest() {
+ when(configKey.value()).thenReturn(500);
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
+ runner.start();
+
+ waitSeconds(2);
+ assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
+ }
+
+ @Test
+ public void scheduleDynamicTest() {
+ // start with twice per second, then switch to four times per second
+ when(configKey.value()).thenReturn(500);
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
+ runner.start();
+
+ waitSeconds(2);
+ assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
+
+ runnable.resetRunCount();
+ when(configKey.value()).thenReturn(250);
+ waitSeconds(2);
+ assertThat("Runnable ran four times per second", runnable.getRunCount(), isOneOf(7, 8));
+ }
+
+ @Test
+ public void noOverlappingRunsTest() {
+ when(configKey.value()).thenReturn(200);
+ TestRunnable runnable = new TestRunnable(1);
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
+ runner.start();
+
+ waitSeconds(3);
+ assertThat("Slow runnable on tight schedule runs without overlap", runnable.getRunCount(), isOneOf(2, 3));
+ }
+
+ @Test
+ public void temporaryDisableRunsTest() {
+ // start with twice per second, then disable, then start again
+ when(configKey.value()).thenReturn(500);
+ TestRunnable runnable = new TestRunnable();
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 1, TimeUnit.MILLISECONDS);
+ runner.start();
+
+ waitSeconds(2);
+ assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
+
+ runnable.resetRunCount();
+ when(configKey.value()).thenReturn(0);
+ waitSeconds(2);
+ assertThat("Runnable ran zero times per second", runnable.getRunCount(), is(0));
+
+ runnable.resetRunCount();
+ when(configKey.value()).thenReturn(500);
+ waitSeconds(2);
+ assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
+ }
+
+ static class TestRunnable implements Runnable {
+ private Integer runCount = 0;
+ private int waitSeconds = 0;
+
+ TestRunnable(int waitSeconds) {
+ this.waitSeconds = waitSeconds;
+ }
+
+ TestRunnable() {
+ }
+
+ @Override
+ public void run() {
+ runCount++;
+ if (waitSeconds > 0) {
+ try {
+ Thread.sleep(waitSeconds * 1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public int getRunCount() {
+ return this.runCount;
+ }
+
+ public void resetRunCount() {
+ this.runCount = 0;
+ }
+ }
+}
diff --git a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
index 11ebc6d..7962b38 100644
--- a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
+++ b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
@@ -43,6 +43,7 @@
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.ConfigKeyScheduledExecutionWrapper;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -197,7 +198,6 @@
protected SearchBuilder<ResourceCountVO> ResourceCountSearch;
ScheduledExecutorService _rcExecutor;
- long _resourceCountCheckInterval = 0;
Map<String, Long> accountResourceLimitMap = new HashMap<>();
Map<String, Long> domainResourceLimitMap = new HashMap<>();
Map<String, Long> projectResourceLimitMap = new HashMap<>();
@@ -220,8 +220,9 @@
@Override
public boolean start() {
- if (_resourceCountCheckInterval > 0) {
- _rcExecutor.scheduleAtFixedRate(new ResourceCountCheckTask(), _resourceCountCheckInterval, _resourceCountCheckInterval, TimeUnit.SECONDS);
+ if (ResourceCountCheckInterval.value() >= 0) {
+ ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(_rcExecutor, new ResourceCountCheckTask(), ResourceCountCheckInterval, TimeUnit.SECONDS);
+ runner.start();
}
return true;
}
@@ -258,8 +259,7 @@
snapshotSizeSearch.join("snapshots", join2, snapshotSizeSearch.entity().getSnapshotId(), join2.entity().getId(), JoinBuilder.JoinType.INNER);
snapshotSizeSearch.done();
- _resourceCountCheckInterval = ResourceCountCheckInterval.value();
- if (_resourceCountCheckInterval > 0) {
+ if (ResourceCountCheckInterval.value() >= 0) {
_rcExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ResourceCountChecker"));
}