| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.shardingsphere.elasticjob.cloud.scheduler.producer; |
| |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationService; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudAppConfigurationBuilder; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudJobConfigurationBuilder; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.DisableJobService; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.running.RunningService; |
| import org.apache.shardingsphere.elasticjob.cloud.exception.AppConfigurationException; |
| import org.apache.shardingsphere.elasticjob.cloud.exception.JobConfigurationException; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfiguration; |
| import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.ready.ReadyService; |
| import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext; |
| import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter; |
| import com.google.common.base.Optional; |
| import com.google.common.collect.Lists; |
| import org.apache.mesos.Protos; |
| import org.apache.mesos.SchedulerDriver; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnitRunner; |
| import org.unitils.util.ReflectionUtils; |
| |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| @RunWith(MockitoJUnitRunner.class) |
| public final class ProducerManagerTest { |
| |
| @Mock |
| private SchedulerDriver schedulerDriver; |
| |
| @Mock |
| private CoordinatorRegistryCenter regCenter; |
| |
| @Mock |
| private CloudAppConfigurationService appConfigService; |
| |
| @Mock |
| private CloudJobConfigurationService configService; |
| |
| @Mock |
| private ReadyService readyService; |
| |
| @Mock |
| private RunningService runningService; |
| |
| @Mock |
| private DisableJobService disableJobService; |
| |
| @Mock |
| private TransientProducerScheduler transientProducerScheduler; |
| |
| private ProducerManager producerManager; |
| |
| private final CloudAppConfiguration appConfig = CloudAppConfigurationBuilder.createCloudAppConfiguration("test_app"); |
| |
| private final CloudJobConfiguration transientJobConfig = CloudJobConfigurationBuilder.createCloudJobConfiguration("transient_test_job"); |
| |
| private final CloudJobConfiguration daemonJobConfig = CloudJobConfigurationBuilder.createCloudJobConfiguration("daemon_test_job", CloudJobExecutionType.DAEMON); |
| |
| @Before |
| public void setUp() throws NoSuchFieldException { |
| producerManager = new ProducerManager(schedulerDriver, regCenter); |
| ReflectionUtils.setFieldValue(producerManager, "appConfigService", appConfigService); |
| ReflectionUtils.setFieldValue(producerManager, "configService", configService); |
| ReflectionUtils.setFieldValue(producerManager, "readyService", readyService); |
| ReflectionUtils.setFieldValue(producerManager, "runningService", runningService); |
| ReflectionUtils.setFieldValue(producerManager, "disableJobService", disableJobService); |
| ReflectionUtils.setFieldValue(producerManager, "transientProducerScheduler", transientProducerScheduler); |
| } |
| |
| @Test |
| public void assertStartup() { |
| when(configService.loadAll()).thenReturn(Arrays.asList(transientJobConfig, daemonJobConfig)); |
| producerManager.startup(); |
| verify(configService).loadAll(); |
| verify(transientProducerScheduler).register(transientJobConfig); |
| verify(readyService).addDaemon("daemon_test_job"); |
| } |
| |
| @Test(expected = AppConfigurationException.class) |
| public void assertRegisterJobWithoutApp() { |
| when(appConfigService.load("test_app")).thenReturn(Optional.<CloudAppConfiguration>absent()); |
| producerManager.register(transientJobConfig); |
| } |
| |
| @Test(expected = JobConfigurationException.class) |
| public void assertRegisterExistedJob() { |
| when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig)); |
| when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig)); |
| producerManager.register(transientJobConfig); |
| } |
| |
| @Test(expected = JobConfigurationException.class) |
| public void assertRegisterDisabledJob() { |
| when(disableJobService.isDisabled("transient_test_job")).thenReturn(true); |
| producerManager.register(transientJobConfig); |
| } |
| |
| @Test |
| public void assertRegisterTransientJob() { |
| when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig)); |
| when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent()); |
| producerManager.register(transientJobConfig); |
| verify(configService).add(transientJobConfig); |
| verify(transientProducerScheduler).register(transientJobConfig); |
| } |
| |
| @Test |
| public void assertRegisterDaemonJob() { |
| when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig)); |
| when(configService.load("daemon_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent()); |
| producerManager.register(daemonJobConfig); |
| verify(configService).add(daemonJobConfig); |
| verify(readyService).addDaemon("daemon_test_job"); |
| } |
| |
| @Test(expected = JobConfigurationException.class) |
| public void assertUpdateNotExisted() { |
| when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent()); |
| producerManager.update(transientJobConfig); |
| } |
| |
| @Test |
| public void assertUpdateExisted() { |
| when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig)); |
| List<TaskContext> taskContexts = Arrays.asList( |
| TaskContext.from("transient_test_job@-@0@-@READY@-@SLAVE-S0@-@UUID"), TaskContext.from("transient_test_job@-@1@-@READY@-@SLAVE-S0@-@UUID")); |
| when(runningService.getRunningTasks("transient_test_job")).thenReturn(taskContexts); |
| producerManager.update(transientJobConfig); |
| verify(configService).update(transientJobConfig); |
| for (TaskContext each : taskContexts) { |
| verify(schedulerDriver).killTask(Protos.TaskID.newBuilder().setValue(each.getId()).build()); |
| } |
| verify(runningService).remove("transient_test_job"); |
| verify(readyService).remove(Lists.newArrayList("transient_test_job")); |
| } |
| |
| @Test |
| public void assertDeregisterNotExisted() { |
| when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent()); |
| producerManager.deregister("transient_test_job"); |
| verify(configService, times(0)).remove("transient_test_job"); |
| } |
| |
| @Test |
| public void assertDeregisterExisted() { |
| when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig)); |
| List<TaskContext> taskContexts = Arrays.asList( |
| TaskContext.from("transient_test_job@-@0@-@READY@-@SLAVE-S0@-@UUID"), TaskContext.from("transient_test_job@-@1@-@READY@-@SLAVE-S0@-@UUID")); |
| when(runningService.getRunningTasks("transient_test_job")).thenReturn(taskContexts); |
| producerManager.deregister("transient_test_job"); |
| for (TaskContext each : taskContexts) { |
| verify(schedulerDriver).killTask(Protos.TaskID.newBuilder().setValue(each.getId()).build()); |
| } |
| verify(disableJobService).remove("transient_test_job"); |
| verify(configService).remove("transient_test_job"); |
| verify(runningService).remove("transient_test_job"); |
| verify(readyService).remove(Lists.newArrayList("transient_test_job")); |
| } |
| |
| @Test |
| public void assertShutdown() { |
| producerManager.shutdown(); |
| verify(transientProducerScheduler).shutdown(); |
| } |
| } |