blob: 89303bcf470f9ca10fcf23828e94e85577c3dc17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.failover;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudJobConfigurationBuilder;
import org.apache.shardingsphere.elasticjob.cloud.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.context.JobContext;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.TaskNode;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.running.RunningService;
import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.unitils.util.ReflectionUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@RunWith(MockitoJUnitRunner.class)
public final class FailoverServiceTest {
@Mock
private CoordinatorRegistryCenter regCenter;
@Mock
private CloudJobConfigurationService configService;
@Mock
private RunningService runningService;
@Mock
private List<String> mockedFailoverQueue;
private FailoverService failoverService;
@Before
public void setUp() throws NoSuchFieldException {
failoverService = new FailoverService(regCenter);
ReflectionUtils.setFieldValue(failoverService, "configService", configService);
ReflectionUtils.setFieldValue(failoverService, "runningService", runningService);
}
@Test
public void assertAddWhenJobIsOverQueueSize() {
Mockito.when(regCenter.getNumChildren(FailoverNode.ROOT)).thenReturn(BootstrapEnvironment.getInstance().getFrameworkConfiguration().getJobStateQueueSize() + 1);
TaskNode taskNode = TaskNode.builder().type(ExecutionType.FAILOVER).build();
failoverService.add(TaskContext.from(taskNode.getTaskNodeValue()));
Mockito.verify(regCenter, Mockito.times(0)).persist("/state/failover/test_job/" + taskNode.getTaskNodePath(), taskNode.getTaskNodeValue());
}
@Test
public void assertAddWhenExisted() {
TaskNode taskNode = TaskNode.builder().type(ExecutionType.FAILOVER).build();
Mockito.when(regCenter.isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath())).thenReturn(true);
failoverService.add(TaskContext.from(taskNode.getTaskNodeValue()));
Mockito.verify(regCenter).isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath());
Mockito.verify(regCenter, Mockito.times(0)).persist("/state/failover/test_job/" + taskNode.getTaskNodePath(), taskNode.getTaskNodeValue());
}
@Test
public void assertAddWhenNotExistedAndTaskIsRunning() {
TaskNode taskNode = TaskNode.builder().type(ExecutionType.FAILOVER).build();
Mockito.when(regCenter.isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath())).thenReturn(false);
Mockito.when(runningService.isTaskRunning(TaskContext.MetaInfo.from(taskNode.getTaskNodePath()))).thenReturn(true);
failoverService.add(TaskContext.from(taskNode.getTaskNodeValue()));
Mockito.verify(regCenter).isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath());
Mockito.verify(runningService).isTaskRunning(TaskContext.MetaInfo.from(taskNode.getTaskNodePath()));
Mockito.verify(regCenter, Mockito.times(0)).persist("/state/failover/test_job/" + taskNode.getTaskNodePath(), taskNode.getTaskNodeValue());
}
@Test
public void assertAddWhenNotExistedAndTaskIsNotRunning() {
TaskNode taskNode = TaskNode.builder().type(ExecutionType.FAILOVER).build();
Mockito.when(regCenter.isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath())).thenReturn(false);
Mockito.when(runningService.isTaskRunning(TaskContext.MetaInfo.from(taskNode.getTaskNodePath()))).thenReturn(false);
failoverService.add(TaskContext.from(taskNode.getTaskNodeValue()));
Mockito.verify(regCenter).isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath());
Mockito.verify(runningService).isTaskRunning(TaskContext.MetaInfo.from(taskNode.getTaskNodePath()));
Mockito.verify(regCenter).persist("/state/failover/test_job/" + taskNode.getTaskNodePath(), taskNode.getTaskNodeValue());
}
@Test
public void assertGetAllEligibleJobContextsWithoutRootNode() {
Mockito.when(regCenter.isExisted("/state/failover")).thenReturn(false);
Assert.assertTrue(failoverService.getAllEligibleJobContexts().isEmpty());
Mockito.verify(regCenter).isExisted("/state/failover");
}
@Test
public void assertGetAllEligibleJobContextsWithRootNode() {
Mockito.when(regCenter.isExisted("/state/failover")).thenReturn(true);
Mockito.when(regCenter.getChildrenKeys("/state/failover")).thenReturn(Arrays.asList("task_empty_job", "not_existed_job", "eligible_job"));
Mockito.when(regCenter.getChildrenKeys("/state/failover/task_empty_job")).thenReturn(Collections.<String>emptyList());
Mockito.when(regCenter.getChildrenKeys("/state/failover/not_existed_job")).thenReturn(Arrays.asList(
TaskNode.builder().jobName("not_existed_job").build().getTaskNodePath(), TaskNode.builder().jobName("not_existed_job").shardingItem(1).build().getTaskNodePath()));
String eligibleJobNodePath1 = TaskNode.builder().jobName("eligible_job").build().getTaskNodePath();
String eligibleJobNodePath2 = TaskNode.builder().jobName("eligible_job").shardingItem(1).build().getTaskNodePath();
Mockito.when(regCenter.getChildrenKeys("/state/failover/eligible_job")).thenReturn(Arrays.asList(eligibleJobNodePath1, eligibleJobNodePath2));
Mockito.when(configService.load("not_existed_job")).thenReturn(Optional.<CloudJobConfiguration>absent());
Mockito.when(configService.load("eligible_job")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("eligible_job")));
Mockito.when(runningService.isTaskRunning(TaskContext.MetaInfo.from(eligibleJobNodePath1))).thenReturn(true);
Mockito.when(runningService.isTaskRunning(TaskContext.MetaInfo.from(eligibleJobNodePath2))).thenReturn(false);
Collection<JobContext> actual = failoverService.getAllEligibleJobContexts();
Assert.assertThat(actual.size(), Is.is(1));
Assert.assertThat(actual.iterator().next().getAssignedShardingItems().size(), Is.is(1));
Assert.assertThat(actual.iterator().next().getAssignedShardingItems().get(0), Is.is(1));
Mockito.verify(regCenter).isExisted("/state/failover");
Mockito.verify(regCenter).remove("/state/failover/task_empty_job");
Mockito.verify(regCenter).remove("/state/failover/not_existed_job");
}
@Test
public void assertRemove() {
String jobNodePath1 = TaskNode.builder().type(ExecutionType.FAILOVER).build().getTaskNodePath();
String jobNodePath2 = TaskNode.builder().shardingItem(1).type(ExecutionType.FAILOVER).build().getTaskNodePath();
failoverService.remove(Arrays.asList(TaskContext.MetaInfo.from(jobNodePath1), TaskContext.MetaInfo.from(jobNodePath2)));
Mockito.verify(regCenter).remove("/state/failover/test_job/" + jobNodePath1);
Mockito.verify(regCenter).remove("/state/failover/test_job/" + jobNodePath2);
}
@Test
public void assertGetTaskId() {
TaskNode taskNode = TaskNode.builder().type(ExecutionType.FAILOVER).build();
failoverService.add(TaskContext.from(taskNode.getTaskNodeValue()));
Mockito.when(regCenter.isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath())).thenReturn(true);
Mockito.when(regCenter.get("/state/failover/test_job/" + taskNode.getTaskNodePath())).thenReturn(taskNode.getTaskNodeValue());
Assert.assertThat(failoverService.getTaskId(taskNode.getMetaInfo()).get(), Is.is(taskNode.getTaskNodeValue()));
Mockito.verify(regCenter, Mockito.times(2)).isExisted("/state/failover/test_job/" + taskNode.getTaskNodePath());
}
@Test
public void assertGetAllFailoverTasksWithoutRootNode() {
Mockito.when(regCenter.isExisted(FailoverNode.ROOT)).thenReturn(false);
Assert.assertTrue(failoverService.getAllFailoverTasks().isEmpty());
Mockito.verify(regCenter).isExisted(FailoverNode.ROOT);
}
@Test
public void assertGetAllFailoverTasksWhenRootNodeHasNoChild() {
Mockito.when(regCenter.isExisted(FailoverNode.ROOT)).thenReturn(true);
Mockito.when(regCenter.getChildrenKeys(FailoverNode.ROOT)).thenReturn(Collections.<String>emptyList());
Assert.assertTrue(failoverService.getAllFailoverTasks().isEmpty());
Mockito.verify(regCenter).isExisted(FailoverNode.ROOT);
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.ROOT);
}
@Test
public void assertGetAllFailoverTasksWhenJobNodeHasNoChild() {
Mockito.when(regCenter.isExisted(FailoverNode.ROOT)).thenReturn(true);
Mockito.when(regCenter.getChildrenKeys(FailoverNode.ROOT)).thenReturn(Lists.newArrayList("test_job"));
Mockito.when(regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job"))).thenReturn(Collections.<String>emptyList());
Assert.assertTrue(failoverService.getAllFailoverTasks().isEmpty());
Mockito.verify(regCenter).isExisted(FailoverNode.ROOT);
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.ROOT);
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job"));
}
@Test
public void assertGetAllFailoverTasksWithRootNode() {
String uuid1 = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
String uuid3 = UUID.randomUUID().toString();
Mockito.when(regCenter.isExisted(FailoverNode.ROOT)).thenReturn(true);
Mockito.when(regCenter.getChildrenKeys(FailoverNode.ROOT)).thenReturn(Lists.newArrayList("test_job_1", "test_job_2"));
Mockito.when(regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job_1"))).thenReturn(Lists.newArrayList("test_job_1@-@0", "test_job_1@-@1"));
Mockito.when(regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job_2"))).thenReturn(Lists.newArrayList("test_job_2@-@0"));
Mockito.when(regCenter.get(FailoverNode.getFailoverTaskNodePath("test_job_1@-@0"))).thenReturn(uuid1);
Mockito.when(regCenter.get(FailoverNode.getFailoverTaskNodePath("test_job_1@-@1"))).thenReturn(uuid2);
Mockito.when(regCenter.get(FailoverNode.getFailoverTaskNodePath("test_job_2@-@0"))).thenReturn(uuid3);
Map<String, Collection<FailoverTaskInfo>> result = failoverService.getAllFailoverTasks();
Assert.assertThat(result.size(), Is.is(2));
Assert.assertThat(result.get("test_job_1").size(), Is.is(2));
Assert.assertThat(result.get("test_job_1").toArray(new FailoverTaskInfo[]{})[0].getTaskInfo().toString(), Is.is("test_job_1@-@0"));
Assert.assertThat(result.get("test_job_1").toArray(new FailoverTaskInfo[]{})[0].getOriginalTaskId(), Is.is(uuid1));
Assert.assertThat(result.get("test_job_1").toArray(new FailoverTaskInfo[]{})[1].getTaskInfo().toString(), Is.is("test_job_1@-@1"));
Assert.assertThat(result.get("test_job_1").toArray(new FailoverTaskInfo[]{})[1].getOriginalTaskId(), Is.is(uuid2));
Assert.assertThat(result.get("test_job_2").size(), Is.is(1));
Assert.assertThat(result.get("test_job_2").iterator().next().getTaskInfo().toString(), Is.is("test_job_2@-@0"));
Assert.assertThat(result.get("test_job_2").iterator().next().getOriginalTaskId(), Is.is(uuid3));
Mockito.verify(regCenter).isExisted(FailoverNode.ROOT);
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.ROOT);
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job_1"));
Mockito.verify(regCenter).getChildrenKeys(FailoverNode.getFailoverJobNodePath("test_job_2"));
Mockito.verify(regCenter).get(FailoverNode.getFailoverTaskNodePath("test_job_1@-@0"));
Mockito.verify(regCenter).get(FailoverNode.getFailoverTaskNodePath("test_job_1@-@1"));
Mockito.verify(regCenter).get(FailoverNode.getFailoverTaskNodePath("test_job_2@-@0"));
}
}