| /* |
| * Copyright 1999-2015 dangdang.com. |
| * <p> |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * </p> |
| */ |
| |
| package io.elasticjob.cloud.executor; |
| |
| import io.elasticjob.cloud.api.JobType; |
| import io.elasticjob.cloud.context.ExecutionType; |
| import io.elasticjob.cloud.executor.fixture.TestJob; |
| import io.elasticjob.cloud.exception.JobSystemException; |
| import com.google.protobuf.ByteString; |
| import org.apache.commons.lang3.SerializationUtils; |
| import org.apache.mesos.ExecutorDriver; |
| import org.apache.mesos.Protos; |
| import org.apache.mesos.Protos.TaskID; |
| import org.apache.mesos.Protos.TaskInfo; |
| import org.apache.mesos.Protos.TaskState; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnitRunner; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| import static junit.framework.TestCase.assertTrue; |
| import static org.mockito.Mockito.verify; |
| |
| @RunWith(MockitoJUnitRunner.class) |
| public final class TaskExecutorThreadTest { |
| |
| @Mock |
| private ExecutorDriver executorDriver; |
| |
| private final String taskId = String.format("%s@-@0@-@%s@-@fake_slave_id@-@0", "test_job", ExecutionType.READY); |
| |
| @Test |
| public void assertLaunchTaskWithDaemonTaskAndJavaSimpleJob() { |
| TaskInfo taskInfo = buildJavaTransientTaskInfo(); |
| TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo); |
| taskThread.run(); |
| verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(TaskState.TASK_RUNNING).build()); |
| verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build()); |
| } |
| |
| @Test |
| public void assertLaunchTaskWithTransientTaskAndSpringSimpleJob() { |
| TaskInfo taskInfo = buildSpringDaemonTaskInfo(); |
| TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo); |
| taskThread.run(); |
| verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(TaskState.TASK_RUNNING).build()); |
| } |
| |
| @Test |
| public void assertLaunchTaskWithDaemonTaskAndJavaScriptJob() { |
| TaskInfo taskInfo = buildSpringScriptTransientTaskInfo(); |
| TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo); |
| taskThread.run(); |
| verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(TaskState.TASK_RUNNING).build()); |
| verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build()); |
| } |
| |
| @Test |
| public void assertLaunchTaskWithWrongElasticJobClass() { |
| TaskInfo taskInfo = buildWrongElasticJobClass(); |
| TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo); |
| try { |
| taskThread.run(); |
| } catch (final JobSystemException ex) { |
| assertTrue(ex.getMessage().startsWith("Elastic-Job: Class 'io.elasticjob.cloud.executor.TaskExecutorThreadTest' must implements ElasticJob interface.")); |
| } |
| } |
| |
| @Test |
| public void assertLaunchTaskWithWrongClass() { |
| TaskInfo taskInfo = buildWrongClass(); |
| TaskExecutor.TaskThread taskThread = new TaskExecutor().new TaskThread(executorDriver, taskInfo); |
| try { |
| taskThread.run(); |
| } catch (final JobSystemException ex) { |
| assertTrue(ex.getMessage().startsWith("Elastic-Job: Class 'WrongClass' initialize failure, the error message is 'WrongClass'.")); |
| } |
| } |
| |
| private TaskInfo buildWrongClass() { |
| return buildTaskInfo(buildBaseJobConfigurationContextMapWithJobClassAndCron("WrongClass", "ignoredCron")).build(); |
| } |
| |
| private TaskInfo buildWrongElasticJobClass() { |
| return buildTaskInfo(buildBaseJobConfigurationContextMapWithJobClassAndCron(TaskExecutorThreadTest.class.getCanonicalName(), "ignoredCron")).build(); |
| } |
| |
| private TaskInfo buildSpringDaemonTaskInfo() { |
| return buildTaskInfo(buildSpringJobConfigurationContextMap()).build(); |
| } |
| |
| private TaskInfo buildJavaTransientTaskInfo() { |
| return buildTaskInfo(buildBaseJobConfigurationContextMapWithJobClassAndCron(TestJob.class.getCanonicalName(), "ignoredCron")).build(); |
| } |
| |
| private TaskInfo buildSpringScriptTransientTaskInfo() { |
| return buildTaskInfo(buildBaseJobConfigurationContextMap(TestJob.class.getCanonicalName(), "ignoredCron", JobType.SCRIPT)).build(); |
| } |
| |
| private TaskInfo.Builder buildTaskInfo(final Map<String, String> jobConfigurationContext) { |
| return TaskInfo.newBuilder().setData(ByteString.copyFrom(serialize(jobConfigurationContext))) |
| .setName("test_job").setTaskId(TaskID.newBuilder().setValue(taskId)).setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-S0")); |
| } |
| |
| private byte[] serialize(final Map<String, String> jobConfigurationContext) { |
| // CHECKSTYLE:OFF |
| LinkedHashMap<String, Object> result = new LinkedHashMap<>(2, 1); |
| // CHECKSTYLE:ON |
| ShardingContexts shardingContexts = new ShardingContexts(taskId, "test_job", 1, "", Collections.singletonMap(1, "a")); |
| result.put("shardingContext", shardingContexts); |
| result.put("jobConfigContext", jobConfigurationContext); |
| return SerializationUtils.serialize(result); |
| } |
| |
| private Map<String, String> buildSpringJobConfigurationContextMap() { |
| Map<String, String> context = buildBaseJobConfigurationContextMapWithJobClass(TestJob.class.getCanonicalName()); |
| context.put("beanName", "testJob"); |
| context.put("applicationContext", "applicationContext.xml"); |
| return context; |
| } |
| |
| private Map<String, String> buildBaseJobConfigurationContextMapWithJobClass(final String jobClass) { |
| return buildBaseJobConfigurationContextMapWithJobClassAndCron(jobClass, "0/1 * * * * ?"); |
| } |
| |
| private Map<String, String> buildBaseJobConfigurationContextMapWithJobClassAndCron(final String jobClass, final String cron) { |
| return buildBaseJobConfigurationContextMap(jobClass, cron, JobType.SIMPLE); |
| } |
| |
| private Map<String, String> buildBaseJobConfigurationContextMap(final String jobClass, final String cron, final JobType jobType) { |
| Map<String, String> result = new HashMap<>(); |
| result.put("jobName", "test_job"); |
| result.put("cron", cron); |
| result.put("jobClass", jobClass); |
| result.put("jobType", jobType.name()); |
| result.put("scriptCommandLine", "echo \"\""); |
| return result; |
| } |
| } |