blob: f715d8f35c41bca92f8143363e6711af10f0e4c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.producer;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudAppConfigurationBuilder;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.CloudJobConfigurationBuilder;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.DisableJobService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.running.RunningService;
import org.apache.shardingsphere.elasticjob.cloud.exception.AppConfigurationException;
import org.apache.shardingsphere.elasticjob.cloud.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.ready.ReadyService;
import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.unitils.util.ReflectionUtils;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class ProducerManagerTest {
@Mock
private SchedulerDriver schedulerDriver;
@Mock
private CoordinatorRegistryCenter regCenter;
@Mock
private CloudAppConfigurationService appConfigService;
@Mock
private CloudJobConfigurationService configService;
@Mock
private ReadyService readyService;
@Mock
private RunningService runningService;
@Mock
private DisableJobService disableJobService;
@Mock
private TransientProducerScheduler transientProducerScheduler;
private ProducerManager producerManager;
private final CloudAppConfiguration appConfig = CloudAppConfigurationBuilder.createCloudAppConfiguration("test_app");
private final CloudJobConfiguration transientJobConfig = CloudJobConfigurationBuilder.createCloudJobConfiguration("transient_test_job");
private final CloudJobConfiguration daemonJobConfig = CloudJobConfigurationBuilder.createCloudJobConfiguration("daemon_test_job", CloudJobExecutionType.DAEMON);
@Before
public void setUp() throws NoSuchFieldException {
producerManager = new ProducerManager(schedulerDriver, regCenter);
ReflectionUtils.setFieldValue(producerManager, "appConfigService", appConfigService);
ReflectionUtils.setFieldValue(producerManager, "configService", configService);
ReflectionUtils.setFieldValue(producerManager, "readyService", readyService);
ReflectionUtils.setFieldValue(producerManager, "runningService", runningService);
ReflectionUtils.setFieldValue(producerManager, "disableJobService", disableJobService);
ReflectionUtils.setFieldValue(producerManager, "transientProducerScheduler", transientProducerScheduler);
}
@Test
public void assertStartup() {
when(configService.loadAll()).thenReturn(Arrays.asList(transientJobConfig, daemonJobConfig));
producerManager.startup();
verify(configService).loadAll();
verify(transientProducerScheduler).register(transientJobConfig);
verify(readyService).addDaemon("daemon_test_job");
}
@Test(expected = AppConfigurationException.class)
public void assertRegisterJobWithoutApp() {
when(appConfigService.load("test_app")).thenReturn(Optional.<CloudAppConfiguration>absent());
producerManager.register(transientJobConfig);
}
@Test(expected = JobConfigurationException.class)
public void assertRegisterExistedJob() {
when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig));
when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig));
producerManager.register(transientJobConfig);
}
@Test(expected = JobConfigurationException.class)
public void assertRegisterDisabledJob() {
when(disableJobService.isDisabled("transient_test_job")).thenReturn(true);
producerManager.register(transientJobConfig);
}
@Test
public void assertRegisterTransientJob() {
when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig));
when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent());
producerManager.register(transientJobConfig);
verify(configService).add(transientJobConfig);
verify(transientProducerScheduler).register(transientJobConfig);
}
@Test
public void assertRegisterDaemonJob() {
when(appConfigService.load("test_app")).thenReturn(Optional.of(appConfig));
when(configService.load("daemon_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent());
producerManager.register(daemonJobConfig);
verify(configService).add(daemonJobConfig);
verify(readyService).addDaemon("daemon_test_job");
}
@Test(expected = JobConfigurationException.class)
public void assertUpdateNotExisted() {
when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent());
producerManager.update(transientJobConfig);
}
@Test
public void assertUpdateExisted() {
when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig));
List<TaskContext> taskContexts = Arrays.asList(
TaskContext.from("transient_test_job@-@0@-@READY@-@SLAVE-S0@-@UUID"), TaskContext.from("transient_test_job@-@1@-@READY@-@SLAVE-S0@-@UUID"));
when(runningService.getRunningTasks("transient_test_job")).thenReturn(taskContexts);
producerManager.update(transientJobConfig);
verify(configService).update(transientJobConfig);
for (TaskContext each : taskContexts) {
verify(schedulerDriver).killTask(Protos.TaskID.newBuilder().setValue(each.getId()).build());
}
verify(runningService).remove("transient_test_job");
verify(readyService).remove(Lists.newArrayList("transient_test_job"));
}
@Test
public void assertDeregisterNotExisted() {
when(configService.load("transient_test_job")).thenReturn(Optional.<CloudJobConfiguration>absent());
producerManager.deregister("transient_test_job");
verify(configService, times(0)).remove("transient_test_job");
}
@Test
public void assertDeregisterExisted() {
when(configService.load("transient_test_job")).thenReturn(Optional.of(transientJobConfig));
List<TaskContext> taskContexts = Arrays.asList(
TaskContext.from("transient_test_job@-@0@-@READY@-@SLAVE-S0@-@UUID"), TaskContext.from("transient_test_job@-@1@-@READY@-@SLAVE-S0@-@UUID"));
when(runningService.getRunningTasks("transient_test_job")).thenReturn(taskContexts);
producerManager.deregister("transient_test_job");
for (TaskContext each : taskContexts) {
verify(schedulerDriver).killTask(Protos.TaskID.newBuilder().setValue(each.getId()).build());
}
verify(disableJobService).remove("transient_test_job");
verify(configService).remove("transient_test_job");
verify(runningService).remove("transient_test_job");
verify(readyService).remove(Lists.newArrayList("transient_test_job"));
}
@Test
public void assertShutdown() {
producerManager.shutdown();
verify(transientProducerScheduler).shutdown();
}
}