Do failover only if target items were running (#2134)
* Fix failover triggered too sensitive
* Handle legacy crashed running items
* Persist job instance id into running node
* Do failover in legacy crashed listener
* Complete FailoverListenerManagerTest
* Complete ShardingServiceTest
* Complete ExecutionServiceTest
* Handle failovering items at first
* Complete FailoverListenerManagerTest
* Complete FailoverServiceTest
* Complete ExecutionServiceTest
* Complete FailoverListenerManagerTest
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 1281795..f33d88e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -17,20 +17,28 @@
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent.Type;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Failover listener manager.
@@ -45,6 +53,10 @@
private final FailoverService failoverService;
+ private final ExecutionService executionService;
+
+ private final InstanceService instanceService;
+
private final ConfigurationNode configNode;
private final InstanceNode instanceNode;
@@ -55,6 +67,8 @@
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
failoverService = new FailoverService(regCenter, jobName);
+ executionService = new ExecutionService(regCenter, jobName);
+ instanceService = new InstanceService(regCenter, jobName);
configNode = new ConfigurationNode(jobName);
instanceNode = new InstanceNode(jobName);
}
@@ -63,6 +77,7 @@
public void start() {
addDataListener(new JobCrashedJobListener());
addDataListener(new FailoverSettingsChangedJobListener());
+ addDataListener(new LegacyCrashedRunningItemListener());
}
private boolean isFailoverEnabled() {
@@ -103,4 +118,47 @@
}
}
}
+
+ class LegacyCrashedRunningItemListener implements DataChangedEventListener {
+
+ @Override
+ public void onChange(final DataChangedEvent event) {
+ if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
+ return;
+ }
+ Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
+ if (!isTheOnlyInstance(availableJobInstances)) {
+ return;
+ }
+ Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
+ Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
+ if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
+ return;
+ }
+ for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
+ if (!availableJobInstances.contains(entry.getValue())) {
+ int item = entry.getKey();
+ failoverService.setCrashedFailoverFlagDirectly(item);
+ failoverService.clearFailoveringItem(item);
+ executionService.clearRunningInfo(Collections.singletonList(item));
+ allRunningItems.remove(item);
+ }
+ }
+ for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
+ if (!availableJobInstances.contains(entry.getValue())) {
+ failoverService.setCrashedFailoverFlag(entry.getKey());
+ executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
+ }
+ }
+ failoverService.failoverIfNecessary();
+ }
+
+ private boolean isCurrentInstanceOnline(final DataChangedEvent event) {
+ return Type.ADDED == event.getType() && event.getKey().endsWith(instanceNode.getLocalInstancePath());
+ }
+
+ private boolean isTheOnlyInstance(final Set<JobInstance> availableJobInstances) {
+ return Collections.singleton(JobRegistry.getInstance().getJobInstance(jobName)).equals(availableJobInstances);
+ }
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
index 23bc437..c512830 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
@@ -17,19 +17,24 @@
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingNode;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
-import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
/**
* Failover service.
@@ -43,10 +48,13 @@
private final ShardingService shardingService;
+ private final ConfigurationService configService;
+
public FailoverService(final CoordinatorRegistryCenter regCenter, final String jobName) {
this.jobName = jobName;
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
+ configService = new ConfigurationService(regCenter, jobName);
}
/**
@@ -57,6 +65,7 @@
public void setCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
+ jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(item));
}
}
@@ -168,6 +177,32 @@
}
/**
+ * Get all failovering items.
+ *
+ * @return all failovering items
+ */
+ public Map<Integer, JobInstance> getAllFailoveringItems() {
+ int shardingTotalCount = configService.load(true).getShardingTotalCount();
+ Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
+ for (int i = 0; i < shardingTotalCount; i++) {
+ String data = jobNodeStorage.getJobNodeData(FailoverNode.getExecutingFailoverNode(i));
+ if (!Strings.isNullOrEmpty(data)) {
+ result.put(i, new JobInstance(data));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Clear failovering item.
+ *
+ * @param item item
+ */
+ public void clearFailoveringItem(final int item) {
+ jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(item));
+ }
+
+ /**
* Remove failover info.
*/
public void removeFailoverInfo() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
index 7bafdc0..f41dac7 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
@@ -17,7 +17,9 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
+import com.google.common.base.Strings;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -26,7 +28,9 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
/**
* Execution service.
@@ -52,11 +56,17 @@
*/
public void registerJobBegin(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, true);
- if (!configService.load(true).isMonitorExecution()) {
+ JobConfiguration jobConfiguration = configService.load(true);
+ if (!jobConfiguration.isMonitorExecution()) {
return;
}
+ String jobInstanceId = JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId();
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
- jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
+ if (jobConfiguration.isFailover()) {
+ jobNodeStorage.fillJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
+ } else {
+ jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
+ }
}
}
@@ -131,6 +141,23 @@
}
/**
+ * Get all running items with instance.
+ *
+ * @return running items with instance.
+ */
+ public Map<Integer, JobInstance> getAllRunningItems() {
+ int shardingTotalCount = configService.load(true).getShardingTotalCount();
+ Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
+ for (int i = 0; i < shardingTotalCount; i++) {
+ String data = jobNodeStorage.getJobNodeData(ShardingNode.getRunningNode(i));
+ if (!Strings.isNullOrEmpty(data)) {
+ result.put(i, new JobInstance(data));
+ }
+ }
+ return result;
+ }
+
+ /**
* Set misfire flag if sharding items still running.
*
* @param items sharding items need to be set misfire flag
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index 946c481..cbf7938 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -202,13 +202,17 @@
List<Integer> result = new LinkedList<>();
int shardingTotalCount = configService.load(true).getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
- if (jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
+ if (isRunningItem(i) && jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
result.add(i);
}
}
return result;
}
+ private boolean isRunningItem(final int item) {
+ return jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(item));
+ }
+
/**
* Get sharding items from localhost job server.
*
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index f5d9106..e2c7a8f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -21,8 +21,11 @@
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
+import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -38,6 +41,8 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -61,6 +66,15 @@
@Mock
private FailoverService failoverService;
+ @Mock
+ private InstanceService instanceService;
+
+ @Mock
+ private ExecutionService executionService;
+
+ @Mock
+ private InstanceNode instanceNode;
+
private final FailoverListenerManager failoverListenerManager = new FailoverListenerManager(null, "test_job");
@Before
@@ -69,12 +83,15 @@
ReflectionUtils.setFieldValue(failoverListenerManager, "configService", configService);
ReflectionUtils.setFieldValue(failoverListenerManager, "shardingService", shardingService);
ReflectionUtils.setFieldValue(failoverListenerManager, "failoverService", failoverService);
+ ReflectionUtils.setFieldValue(failoverListenerManager, "instanceService", instanceService);
+ ReflectionUtils.setFieldValue(failoverListenerManager, "executionService", executionService);
+ ReflectionUtils.setFieldValue(failoverListenerManager, "instanceNode", instanceNode);
}
@Test
public void assertStart() {
failoverListenerManager.start();
- verify(jobNodeStorage, times(2)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
+ verify(jobNodeStorage, times(3)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
}
@Test
@@ -119,6 +136,8 @@
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
+ when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
+ when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
verify(failoverService).setCrashedFailoverFlag(0);
verify(failoverService).setCrashedFailoverFlag(2);
@@ -132,6 +151,8 @@
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
+ when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
+ when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
verify(failoverService).setCrashedFailoverFlagDirectly(1);
verify(failoverService).failoverIfNecessary();
@@ -161,4 +182,25 @@
failoverListenerManager.new FailoverSettingsChangedJobListener().onChange(new DataChangedEvent(Type.UPDATED, "/test_job/config", LiteYamlConstants.getJobYamlWithFailover(false)));
verify(failoverService).removeFailoverInfo();
}
+
+ @Test
+ public void assertLegacyCrashedRunningItemListenerWhenRunningItemsArePresent() {
+ JobInstance jobInstance = new JobInstance("127.0.0.1@-@1");
+ JobRegistry.getInstance().addJobInstance("test_job", jobInstance);
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
+ when(instanceNode.getLocalInstancePath()).thenReturn("instances/127.0.0.1@-@1");
+ when(instanceService.getAvailableJobInstances()).thenReturn(Collections.singletonList(jobInstance));
+ Map<Integer, JobInstance> allRunningItems = new LinkedHashMap<>(2, 1);
+ allRunningItems.put(0, new JobInstance("127.0.0.1@-@2"));
+ allRunningItems.put(1, new JobInstance("127.0.0.1@-@2"));
+ when(executionService.getAllRunningItems()).thenReturn(allRunningItems);
+ when(failoverService.getAllFailoveringItems()).thenReturn(Collections.singletonMap(1, new JobInstance("127.0.0.1@-@2")));
+ failoverListenerManager.new LegacyCrashedRunningItemListener().onChange(new DataChangedEvent(Type.ADDED, "/test_job/instances/127.0.0.1@-@1", ""));
+ verify(failoverService).setCrashedFailoverFlagDirectly(1);
+ verify(failoverService).clearFailoveringItem(1);
+ verify(executionService).clearRunningInfo(Collections.singletonList(1));
+ verify(failoverService).setCrashedFailoverFlag(0);
+ verify(executionService).clearRunningInfo(Collections.singletonList(0));
+ verify(failoverService).failoverIfNecessary();
+ }
}
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
index ce6d512..a346a72 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
@@ -17,7 +17,9 @@
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
@@ -33,6 +35,7 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -56,6 +59,9 @@
@Mock
private ShardingService shardingService;
+ @Mock
+ private ConfigurationService configService;
+
private final FailoverService failoverService = new FailoverService(null, "test_job");
@Before
@@ -63,6 +69,7 @@
ReflectionUtils.setFieldValue(failoverService, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(failoverService, "shardingService", shardingService);
ReflectionUtils.setFieldValue(failoverService, "jobName", "test_job");
+ ReflectionUtils.setFieldValue(failoverService, "configService", configService);
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
}
@@ -84,7 +91,7 @@
@Test
public void assertSetCrashedFailoverFlagDirectly() {
- failoverService.setCrashedFailoverFlag(0);
+ failoverService.setCrashedFailoverFlagDirectly(0);
verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
}
@@ -242,6 +249,24 @@
}
@Test
+ public void assertGetAllFailoveringItems() {
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).build());
+ String jobInstanceId = "127.0.0.1@-@1";
+ when(jobNodeStorage.getJobNodeData("sharding/0/failovering")).thenReturn(jobInstanceId);
+ when(jobNodeStorage.getJobNodeData("sharding/2/failovering")).thenReturn(jobInstanceId);
+ Map<Integer, JobInstance> actual = failoverService.getAllFailoveringItems();
+ assertThat(actual.size(), is(2));
+ assertThat(actual.get(0), is(new JobInstance(jobInstanceId)));
+ assertThat(actual.get(2), is(new JobInstance(jobInstanceId)));
+ }
+
+ @Test
+ public void assertClearFailoveringItem() {
+ failoverService.clearFailoveringItem(0);
+ verify(jobNodeStorage).removeJobNodeIfExisted("sharding/0/failovering");
+ }
+
+ @Test
public void assertRemoveFailoverInfo() {
when(jobNodeStorage.getJobNodeChildrenKeys("sharding")).thenReturn(Arrays.asList("0", "1", "2"));
failoverService.removeFailoverInfo();
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
index dddaaed..2f88cd3 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -76,11 +77,25 @@
@Test
public void assertRegisterJobBeginWithMonitorExecution() {
+ String jobInstanceId = "127.0.0.1@-@1";
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance(jobInstanceId));
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
executionService.registerJobBegin(getShardingContext());
- verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/running", "");
- verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", "");
- verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", "");
+ verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/running", jobInstanceId);
+ verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", jobInstanceId);
+ verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", jobInstanceId);
+ assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
+ }
+
+ @Test
+ public void assertRegisterJobBeginWithFailoverEnabled() {
+ String jobInstanceId = "127.0.0.1@-@1";
+ JobRegistry.getInstance().addJobInstance("test_job", new JobInstance(jobInstanceId));
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
+ executionService.registerJobBegin(getShardingContext());
+ verify(jobNodeStorage).fillJobNode("sharding/0/running", jobInstanceId);
+ verify(jobNodeStorage).fillJobNode("sharding/1/running", jobInstanceId);
+ verify(jobNodeStorage).fillJobNode("sharding/2/running", jobInstanceId);
assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
}
@@ -162,6 +177,18 @@
}
@Test
+ public void assertGetAllRunningItems() {
+ when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).build());
+ String jobInstanceId = "127.0.0.1@-@1";
+ when(jobNodeStorage.getJobNodeData("sharding/0/running")).thenReturn(jobInstanceId);
+ when(jobNodeStorage.getJobNodeData("sharding/2/running")).thenReturn(jobInstanceId);
+ Map<Integer, JobInstance> actual = executionService.getAllRunningItems();
+ assertThat(actual.size(), is(2));
+ assertThat(actual.get(0), is(new JobInstance(jobInstanceId)));
+ assertThat(actual.get(2), is(new JobInstance(jobInstanceId)));
+ }
+
+ @Test
public void assertMisfireIfNotRunning() {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
when(jobNodeStorage.isJobNodeExisted("sharding/0/running")).thenReturn(false);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 98137de..2a35366 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -250,8 +250,9 @@
when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").build());
when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
- when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
+ when(jobNodeStorage.isJobNodeExisted("sharding/0/running")).thenReturn(true);
when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+ when(jobNodeStorage.isJobNodeExisted("sharding/2/running")).thenReturn(true);
assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Arrays.asList(0, 2)));
JobRegistry.getInstance().shutdown("test_job");
}