Do failover only if target items were running (#2134)

* Fix failover triggered too sensitive

* Handle legacy crashed running items

* Persist job instance id into running node

* Do failover in legacy crashed listener

* Complete FailoverListenerManagerTest

* Complete ShardingServiceTest

* Complete ExecutionServiceTest

* Handle failovering items at first

* Complete FailoverListenerManagerTest

* Complete FailoverServiceTest

* Complete ExecutionServiceTest

* Complete FailoverListenerManagerTest
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
index 1281795..f33d88e 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java
@@ -17,20 +17,28 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.failover;
 
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
 import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
+import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
 import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent.Type;
 import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Failover listener manager.
@@ -45,6 +53,10 @@
     
     private final FailoverService failoverService;
     
+    private final ExecutionService executionService;
+    
+    private final InstanceService instanceService;
+    
     private final ConfigurationNode configNode;
     
     private final InstanceNode instanceNode;
@@ -55,6 +67,8 @@
         configService = new ConfigurationService(regCenter, jobName);
         shardingService = new ShardingService(regCenter, jobName);
         failoverService = new FailoverService(regCenter, jobName);
+        executionService = new ExecutionService(regCenter, jobName);
+        instanceService = new InstanceService(regCenter, jobName);
         configNode = new ConfigurationNode(jobName);
         instanceNode = new InstanceNode(jobName);
     }
@@ -63,6 +77,7 @@
     public void start() {
         addDataListener(new JobCrashedJobListener());
         addDataListener(new FailoverSettingsChangedJobListener());
+        addDataListener(new LegacyCrashedRunningItemListener());
     }
     
     private boolean isFailoverEnabled() {
@@ -103,4 +118,47 @@
             }
         }
     }
+    
+    class LegacyCrashedRunningItemListener implements DataChangedEventListener {
+        
+        @Override
+        public void onChange(final DataChangedEvent event) {
+            if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
+                return;
+            }
+            Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
+            if (!isTheOnlyInstance(availableJobInstances)) {
+                return;
+            }
+            Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
+            Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
+            if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
+                return;
+            }
+            for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
+                if (!availableJobInstances.contains(entry.getValue())) {
+                    int item = entry.getKey();
+                    failoverService.setCrashedFailoverFlagDirectly(item);
+                    failoverService.clearFailoveringItem(item);
+                    executionService.clearRunningInfo(Collections.singletonList(item));
+                    allRunningItems.remove(item);
+                }
+            }
+            for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
+                if (!availableJobInstances.contains(entry.getValue())) {
+                    failoverService.setCrashedFailoverFlag(entry.getKey());
+                    executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
+                }
+            }
+            failoverService.failoverIfNecessary();
+        }
+        
+        private boolean isCurrentInstanceOnline(final DataChangedEvent event) {
+            return Type.ADDED == event.getType() && event.getKey().endsWith(instanceNode.getLocalInstancePath());
+        }
+        
+        private boolean isTheOnlyInstance(final Set<JobInstance> availableJobInstances) {
+            return Collections.singleton(JobRegistry.getInstance().getJobInstance(jobName)).equals(availableJobInstances);
+        }
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
index 23bc437..c512830 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java
@@ -17,19 +17,24 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.failover;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingNode;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
-import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Failover service.
@@ -43,10 +48,13 @@
     
     private final ShardingService shardingService;
     
+    private final ConfigurationService configService;
+    
     public FailoverService(final CoordinatorRegistryCenter regCenter, final String jobName) {
         this.jobName = jobName;
         jobNodeStorage = new JobNodeStorage(regCenter, jobName);
         shardingService = new ShardingService(regCenter, jobName);
+        configService = new ConfigurationService(regCenter, jobName);
     }
     
     /**
@@ -57,6 +65,7 @@
     public void setCrashedFailoverFlag(final int item) {
         if (!isFailoverAssigned(item)) {
             jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
+            jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(item));
         }
     }
 
@@ -168,6 +177,32 @@
     }
     
     /**
+     * Get all failovering items.
+     *
+     * @return all failovering items
+     */
+    public Map<Integer, JobInstance> getAllFailoveringItems() {
+        int shardingTotalCount = configService.load(true).getShardingTotalCount();
+        Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
+        for (int i = 0; i < shardingTotalCount; i++) {
+            String data = jobNodeStorage.getJobNodeData(FailoverNode.getExecutingFailoverNode(i));
+            if (!Strings.isNullOrEmpty(data)) {
+                result.put(i, new JobInstance(data));
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * Clear failovering item.
+     *
+     * @param item item
+     */
+    public void clearFailoveringItem(final int item) {
+        jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(item));
+    }
+    
+    /**
      * Remove failover info.
      */
     public void removeFailoverInfo() {
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
index 7bafdc0..f41dac7 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java
@@ -17,7 +17,9 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
 
+import com.google.common.base.Strings;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -26,7 +28,9 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Execution service.
@@ -52,11 +56,17 @@
      */
     public void registerJobBegin(final ShardingContexts shardingContexts) {
         JobRegistry.getInstance().setJobRunning(jobName, true);
-        if (!configService.load(true).isMonitorExecution()) {
+        JobConfiguration jobConfiguration = configService.load(true);
+        if (!jobConfiguration.isMonitorExecution()) {
             return;
         }
+        String jobInstanceId = JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId();
         for (int each : shardingContexts.getShardingItemParameters().keySet()) {
-            jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
+            if (jobConfiguration.isFailover()) {
+                jobNodeStorage.fillJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
+            } else {
+                jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
+            }
         }
     }
     
@@ -131,6 +141,23 @@
     }
     
     /**
+     * Get all running items with instance.
+     *
+     * @return running items with instance.
+     */
+    public Map<Integer, JobInstance> getAllRunningItems() {
+        int shardingTotalCount = configService.load(true).getShardingTotalCount();
+        Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
+        for (int i = 0; i < shardingTotalCount; i++) {
+            String data = jobNodeStorage.getJobNodeData(ShardingNode.getRunningNode(i));
+            if (!Strings.isNullOrEmpty(data)) {
+                result.put(i, new JobInstance(data));
+            }
+        }
+        return result;
+    }
+    
+    /**
      * Set misfire flag if sharding items still running.
      * 
      * @param items sharding items need to be set misfire flag
diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
index 946c481..cbf7938 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java
@@ -202,13 +202,17 @@
         List<Integer> result = new LinkedList<>();
         int shardingTotalCount = configService.load(true).getShardingTotalCount();
         for (int i = 0; i < shardingTotalCount; i++) {
-            if (jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
+            if (isRunningItem(i) && jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
                 result.add(i);
             }
         }
         return result;
     }
     
+    private boolean isRunningItem(final int item) {
+        return jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(item));
+    }
+    
     /**
      * Get sharding items from localhost job server.
      * 
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
index f5d9106..e2c7a8f 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java
@@ -21,8 +21,11 @@
 import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
+import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
+import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
 import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -38,6 +41,8 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -61,6 +66,15 @@
     @Mock
     private FailoverService failoverService;
     
+    @Mock
+    private InstanceService instanceService;
+    
+    @Mock
+    private ExecutionService executionService;
+    
+    @Mock
+    private InstanceNode instanceNode;
+    
     private final FailoverListenerManager failoverListenerManager = new FailoverListenerManager(null, "test_job");
     
     @Before
@@ -69,12 +83,15 @@
         ReflectionUtils.setFieldValue(failoverListenerManager, "configService", configService);
         ReflectionUtils.setFieldValue(failoverListenerManager, "shardingService", shardingService);
         ReflectionUtils.setFieldValue(failoverListenerManager, "failoverService", failoverService);
+        ReflectionUtils.setFieldValue(failoverListenerManager, "instanceService", instanceService);
+        ReflectionUtils.setFieldValue(failoverListenerManager, "executionService", executionService);
+        ReflectionUtils.setFieldValue(failoverListenerManager, "instanceNode", instanceNode);
     }
     
     @Test
     public void assertStart() {
         failoverListenerManager.start();
-        verify(jobNodeStorage, times(2)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
+        verify(jobNodeStorage, times(3)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
     }
     
     @Test
@@ -119,6 +136,8 @@
         JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
         when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
+        when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
+        when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
         failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
         verify(failoverService).setCrashedFailoverFlag(0);
         verify(failoverService).setCrashedFailoverFlag(2);
@@ -132,6 +151,8 @@
         JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
         when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
+        when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
+        when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
         failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
         verify(failoverService).setCrashedFailoverFlagDirectly(1);
         verify(failoverService).failoverIfNecessary();
@@ -161,4 +182,25 @@
         failoverListenerManager.new FailoverSettingsChangedJobListener().onChange(new DataChangedEvent(Type.UPDATED, "/test_job/config", LiteYamlConstants.getJobYamlWithFailover(false)));
         verify(failoverService).removeFailoverInfo();
     }
+    
+    @Test
+    public void assertLegacyCrashedRunningItemListenerWhenRunningItemsArePresent() {
+        JobInstance jobInstance = new JobInstance("127.0.0.1@-@1");
+        JobRegistry.getInstance().addJobInstance("test_job", jobInstance);
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
+        when(instanceNode.getLocalInstancePath()).thenReturn("instances/127.0.0.1@-@1");
+        when(instanceService.getAvailableJobInstances()).thenReturn(Collections.singletonList(jobInstance));
+        Map<Integer, JobInstance> allRunningItems = new LinkedHashMap<>(2, 1);
+        allRunningItems.put(0, new JobInstance("127.0.0.1@-@2"));
+        allRunningItems.put(1, new JobInstance("127.0.0.1@-@2"));
+        when(executionService.getAllRunningItems()).thenReturn(allRunningItems);
+        when(failoverService.getAllFailoveringItems()).thenReturn(Collections.singletonMap(1, new JobInstance("127.0.0.1@-@2")));
+        failoverListenerManager.new LegacyCrashedRunningItemListener().onChange(new DataChangedEvent(Type.ADDED, "/test_job/instances/127.0.0.1@-@1", ""));
+        verify(failoverService).setCrashedFailoverFlagDirectly(1);
+        verify(failoverService).clearFailoveringItem(1);
+        verify(executionService).clearRunningInfo(Collections.singletonList(1));
+        verify(failoverService).setCrashedFailoverFlag(0);
+        verify(executionService).clearRunningInfo(Collections.singletonList(0));
+        verify(failoverService).failoverIfNecessary();
+    }
 }
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
index ce6d512..a346a72 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverServiceTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.failover;
 
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
+import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
 import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
@@ -33,6 +35,7 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -56,6 +59,9 @@
     @Mock
     private ShardingService shardingService;
     
+    @Mock
+    private ConfigurationService configService;
+    
     private final FailoverService failoverService = new FailoverService(null, "test_job");
     
     @Before
@@ -63,6 +69,7 @@
         ReflectionUtils.setFieldValue(failoverService, "jobNodeStorage", jobNodeStorage);
         ReflectionUtils.setFieldValue(failoverService, "shardingService", shardingService);
         ReflectionUtils.setFieldValue(failoverService, "jobName", "test_job");
+        ReflectionUtils.setFieldValue(failoverService, "configService", configService);
         JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
     }
     
@@ -84,7 +91,7 @@
 
     @Test
     public void assertSetCrashedFailoverFlagDirectly() {
-        failoverService.setCrashedFailoverFlag(0);
+        failoverService.setCrashedFailoverFlagDirectly(0);
         verify(jobNodeStorage).createJobNodeIfNeeded("leader/failover/items/0");
     }
     
@@ -242,6 +249,24 @@
     }
     
     @Test
+    public void assertGetAllFailoveringItems() {
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).build());
+        String jobInstanceId = "127.0.0.1@-@1";
+        when(jobNodeStorage.getJobNodeData("sharding/0/failovering")).thenReturn(jobInstanceId);
+        when(jobNodeStorage.getJobNodeData("sharding/2/failovering")).thenReturn(jobInstanceId);
+        Map<Integer, JobInstance> actual = failoverService.getAllFailoveringItems();
+        assertThat(actual.size(), is(2));
+        assertThat(actual.get(0), is(new JobInstance(jobInstanceId)));
+        assertThat(actual.get(2), is(new JobInstance(jobInstanceId)));
+    }
+    
+    @Test
+    public void assertClearFailoveringItem() {
+        failoverService.clearFailoveringItem(0);
+        verify(jobNodeStorage).removeJobNodeIfExisted("sharding/0/failovering");
+    }
+    
+    @Test
     public void assertRemoveFailoverInfo() {
         when(jobNodeStorage.getJobNodeChildrenKeys("sharding")).thenReturn(Arrays.asList("0", "1", "2"));
         failoverService.removeFailoverInfo();
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
index dddaaed..2f88cd3 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
 
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
 import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
 import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
 import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -76,11 +77,25 @@
     
     @Test
     public void assertRegisterJobBeginWithMonitorExecution() {
+        String jobInstanceId = "127.0.0.1@-@1";
+        JobRegistry.getInstance().addJobInstance("test_job", new JobInstance(jobInstanceId));
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
         executionService.registerJobBegin(getShardingContext());
-        verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/running", "");
-        verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", "");
-        verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", "");
+        verify(jobNodeStorage).fillEphemeralJobNode("sharding/0/running", jobInstanceId);
+        verify(jobNodeStorage).fillEphemeralJobNode("sharding/1/running", jobInstanceId);
+        verify(jobNodeStorage).fillEphemeralJobNode("sharding/2/running", jobInstanceId);
+        assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
+    }
+    
+    @Test
+    public void assertRegisterJobBeginWithFailoverEnabled() {
+        String jobInstanceId = "127.0.0.1@-@1";
+        JobRegistry.getInstance().addJobInstance("test_job", new JobInstance(jobInstanceId));
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
+        executionService.registerJobBegin(getShardingContext());
+        verify(jobNodeStorage).fillJobNode("sharding/0/running", jobInstanceId);
+        verify(jobNodeStorage).fillJobNode("sharding/1/running", jobInstanceId);
+        verify(jobNodeStorage).fillJobNode("sharding/2/running", jobInstanceId);
         assertTrue(JobRegistry.getInstance().isJobRunning("test_job"));
     }
     
@@ -162,6 +177,18 @@
     }
     
     @Test
+    public void assertGetAllRunningItems() {
+        when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).build());
+        String jobInstanceId = "127.0.0.1@-@1";
+        when(jobNodeStorage.getJobNodeData("sharding/0/running")).thenReturn(jobInstanceId);
+        when(jobNodeStorage.getJobNodeData("sharding/2/running")).thenReturn(jobInstanceId);
+        Map<Integer, JobInstance> actual = executionService.getAllRunningItems();
+        assertThat(actual.size(), is(2));
+        assertThat(actual.get(0), is(new JobInstance(jobInstanceId)));
+        assertThat(actual.get(2), is(new JobInstance(jobInstanceId)));
+    }
+    
+    @Test
     public void assertMisfireIfNotRunning() {
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").monitorExecution(true).build());
         when(jobNodeStorage.isJobNodeExisted("sharding/0/running")).thenReturn(false);
diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
index 98137de..2a35366 100644
--- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
+++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingServiceTest.java
@@ -250,8 +250,9 @@
         when(serverService.isEnableServer("127.0.0.1")).thenReturn(true);
         when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").build());
         when(jobNodeStorage.getJobNodeData("sharding/0/instance")).thenReturn("127.0.0.1@-@0");
-        when(jobNodeStorage.getJobNodeData("sharding/1/instance")).thenReturn("127.0.0.1@-@1");
+        when(jobNodeStorage.isJobNodeExisted("sharding/0/running")).thenReturn(true);
         when(jobNodeStorage.getJobNodeData("sharding/2/instance")).thenReturn("127.0.0.1@-@0");
+        when(jobNodeStorage.isJobNodeExisted("sharding/2/running")).thenReturn(true);
         assertThat(shardingService.getCrashedShardingItems("127.0.0.1@-@0"), is(Arrays.asList(0, 2)));
         JobRegistry.getInstance().shutdown("test_job");
     }