blob: acad2f56e6ae126039965969806af5d1c0ffde3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.executor.type;
import org.apache.shardingsphere.elasticjob.cloud.executor.AbstractElasticJobExecutor;
import org.apache.shardingsphere.elasticjob.cloud.executor.ShardingContexts;
import org.apache.shardingsphere.elasticjob.cloud.fixture.config.TestSimpleJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.event.type.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.cloud.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.cloud.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.cloud.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultExecutorServiceHandler;
import org.apache.shardingsphere.elasticjob.cloud.executor.handler.impl.DefaultJobExceptionHandler;
import org.apache.shardingsphere.elasticjob.cloud.fixture.ShardingContextsBuilder;
import org.apache.shardingsphere.elasticjob.cloud.fixture.job.JobCaller;
import org.apache.shardingsphere.elasticjob.cloud.fixture.job.TestSimpleJob;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.unitils.util.ReflectionUtils;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class SimpleJobExecutorTest {
@Mock
private JobCaller jobCaller;
@Mock
private JobFacade jobFacade;
private SimpleJobExecutor simpleJobExecutor;
@Before
public void setUp() throws NoSuchFieldException {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration());
simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
}
@Test
public void assertNewExecutorWithDefaultHandlers() throws NoSuchFieldException {
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestSimpleJobConfiguration("ErrorHandler", Object.class.getName()));
SimpleJobExecutor simpleJobExecutor = new SimpleJobExecutor(new TestSimpleJob(jobCaller), jobFacade);
assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, AbstractElasticJobExecutor.class.getDeclaredField("executorService")),
instanceOf(new DefaultExecutorServiceHandler().createExecutorService("test_job").getClass()));
assertThat(ReflectionUtils.getFieldValue(simpleJobExecutor, AbstractElasticJobExecutor.class.getDeclaredField("jobExceptionHandler")),
instanceOf(DefaultJobExceptionHandler.class));
}
@Test(expected = JobSystemException.class)
public void assertExecuteWhenCheckMaxTimeDiffSecondsIntolerable() throws JobExecutionEnvironmentException {
doThrow(JobExecutionEnvironmentException.class).when(jobFacade).checkJobExecutionEnvironment();
try {
simpleJobExecutor.execute();
} finally {
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobCaller, times(0)).execute();
}
}
@Test
public void assertExecuteWhenPreviousJobStillRunning() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.<Integer, String>emptyMap());
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true);
simpleJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED,
"Previous job 'test_job' - shardingItems '[]' is still running, misfired job will start after previous job completed.");
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobFacade).getShardingContexts();
verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
verify(jobCaller, times(0)).execute();
}
@Test
public void assertExecuteWhenShardingItemsIsEmpty() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.<Integer, String>emptyMap());
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
simpleJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, "Sharding item for job 'test_job' is empty.");
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobFacade).getShardingContexts();
verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
verify(jobCaller, times(0)).execute();
}
@Test(expected = JobSystemException.class)
public void assertExecuteWhenRunOnceAndThrowExceptionForSingleShardingItem() throws JobExecutionEnvironmentException {
assertExecuteWhenRunOnceAndThrowException(ShardingContextsBuilder.getSingleShardingContexts());
}
@Test
public void assertExecuteWhenRunOnceAndThrowExceptionForMultipleShardingItems() throws JobExecutionEnvironmentException {
assertExecuteWhenRunOnceAndThrowException(ShardingContextsBuilder.getMultipleShardingContexts());
}
private void assertExecuteWhenRunOnceAndThrowException(final ShardingContexts shardingContexts) throws JobExecutionEnvironmentException {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
doThrow(RuntimeException.class).when(jobCaller).execute();
try {
simpleJobExecutor.execute();
} finally {
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_RUNNING, "");
String errorMessage;
String lineSeparator = System.getProperty("line.separator");
if (1 == shardingContexts.getShardingItemParameters().size()) {
errorMessage = "{0=java.lang.RuntimeException" + lineSeparator + "}";
} else {
errorMessage = "{0=java.lang.RuntimeException" + lineSeparator + ", 1=java.lang.RuntimeException" + lineSeparator + "}";
}
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_ERROR, errorMessage);
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobFacade).getShardingContexts();
verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
verify(jobFacade).registerJobBegin(shardingContexts);
verify(jobCaller, times(shardingContexts.getShardingTotalCount())).execute();
verify(jobFacade).registerJobCompleted(shardingContexts);
}
}
@Test
public void assertExecuteWhenRunOnceSuccessForSingleShardingItems() {
assertExecuteWhenRunOnceSuccess(ShardingContextsBuilder.getSingleShardingContexts());
}
@Test
public void assertExecuteWhenRunOnceSuccessForMultipleShardingItems() {
assertExecuteWhenRunOnceSuccess(ShardingContextsBuilder.getMultipleShardingContexts());
}
private void assertExecuteWhenRunOnceSuccess(final ShardingContexts shardingContexts) {
ElasticJobVerify.prepareForIsNotMisfire(jobFacade, shardingContexts);
simpleJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, "");
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
verify(jobCaller, times(shardingContexts.getShardingTotalCount())).execute();
}
@Test
public void assertExecuteWhenRunOnceWithMisfireIsEmpty() {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
simpleJobExecutor.execute();
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
verify(jobCaller, times(2)).execute();
}
@Test
public void assertExecuteWhenRunOnceWithMisfireIsNotEmptyButIsNotEligibleForJobRunning() {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
simpleJobExecutor.execute();
ElasticJobVerify.verifyForIsNotMisfire(jobFacade, shardingContexts);
verify(jobCaller, times(2)).execute();
verify(jobFacade, times(0)).clearMisfire(shardingContexts.getShardingItemParameters().keySet());
}
@Test
public void assertExecuteWhenRunOnceWithMisfire() throws JobExecutionEnvironmentException {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(true, false);
simpleJobExecutor.execute();
verify(jobFacade).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, "Job 'test_job' execute begin.");
verify(jobFacade, times(2)).postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_RUNNING, "");
verify(jobFacade).checkJobExecutionEnvironment();
verify(jobFacade).getShardingContexts();
verify(jobFacade).misfireIfRunning(shardingContexts.getShardingItemParameters().keySet());
verify(jobFacade, times(2)).registerJobBegin(shardingContexts);
verify(jobCaller, times(4)).execute();
verify(jobFacade, times(2)).registerJobCompleted(shardingContexts);
}
@Test(expected = JobSystemException.class)
public void assertBeforeJobExecutedFailure() {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
doThrow(RuntimeException.class).when(jobFacade).beforeJobExecuted(shardingContexts);
try {
simpleJobExecutor.execute();
} finally {
verify(jobCaller, times(0)).execute();
}
}
@Test(expected = JobSystemException.class)
public void assertAfterJobExecutedFailure() {
ShardingContexts shardingContexts = ShardingContextsBuilder.getMultipleShardingContexts();
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
when(jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())).thenReturn(false);
doThrow(RuntimeException.class).when(jobFacade).afterJobExecuted(shardingContexts);
try {
simpleJobExecutor.execute();
} finally {
verify(jobCaller, times(2)).execute();
}
}
}