blob: 9aaea00325a6fa345538bb2a209f54523c770728 [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.elasticjob.cloud.scheduler.mesos;
import io.elasticjob.cloud.scheduler.fixture.CloudJobConfigurationBuilder;
import io.elasticjob.cloud.context.ExecutionType;
import io.elasticjob.cloud.scheduler.config.app.CloudAppConfiguration;
import io.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import io.elasticjob.cloud.scheduler.fixture.CloudAppConfigurationBuilder;
import io.elasticjob.cloud.context.TaskContext;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.SchedulingResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VMAssignmentResult;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.functions.Action1;
import com.netflix.fenzo.plugins.VMLeaseObject;
import org.apache.mesos.Protos;
import org.codehaus.jettison.json.JSONException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class AppConstraintEvaluatorTest {
private static final double SUFFICIENT_CPU = 1.0 * 13;
private static final double INSUFFICIENT_CPU = 1.0 * 11;
private static final double SUFFICIENT_MEM = 128.0 * 13;
private static final double INSUFFICIENT_MEM = 128.0 * 11;
private static FacadeService facadeService;
private TaskScheduler taskScheduler;
@BeforeClass
public static void init() throws Exception {
facadeService = mock(FacadeService.class);
AppConstraintEvaluator.init(facadeService);
}
@Before
public void setUp() throws Exception {
taskScheduler = new TaskScheduler.Builder().withLeaseOfferExpirySecs(1000000000L).withLeaseRejectAction(new Action1<VirtualMachineLease>() {
@Override
public void call(final VirtualMachineLease virtualMachineLease) {
}
}).build();
}
@After
public void tearDown() throws Exception {
AppConstraintEvaluator.getInstance().clearAppRunningState();
}
@Test
public void assertFirstLaunch() throws Exception {
SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, SUFFICIENT_CPU, SUFFICIENT_MEM), getLease(1, SUFFICIENT_CPU, SUFFICIENT_MEM)));
assertThat(result.getResultMap().size(), is(2));
assertThat(result.getFailures().size(), is(0));
assertThat(getAssignedTaskNumber(result), is(20));
}
@Test
public void assertFirstLaunchLackCpu() throws Exception {
SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, SUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, SUFFICIENT_MEM)));
assertThat(result.getResultMap().size(), is(2));
assertThat(getAssignedTaskNumber(result), is(18));
}
@Test
public void assertFirstLaunchLackMem() throws Exception {
SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, SUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, SUFFICIENT_CPU, INSUFFICIENT_MEM)));
assertThat(result.getResultMap().size(), is(2));
assertThat(getAssignedTaskNumber(result), is(18));
}
@Test
public void assertExistExecutorOnS0() throws Exception {
when(facadeService.loadExecutorInfo()).thenReturn(ImmutableList.of(new MesosStateService.ExecutorStateInfo("foo-app@-@S0", "S0")));
AppConstraintEvaluator.getInstance().loadAppRunningState();
SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, INSUFFICIENT_MEM)));
assertThat(result.getResultMap().size(), is(2));
assertTrue(getAssignedTaskNumber(result) > 18);
}
@Test
public void assertGetExecutorError() throws Exception {
when(facadeService.loadExecutorInfo()).thenThrow(JSONException.class);
AppConstraintEvaluator.getInstance().loadAppRunningState();
SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, INSUFFICIENT_MEM)));
assertThat(result.getResultMap().size(), is(2));
assertThat(getAssignedTaskNumber(result), is(18));
}
@Test
public void assertLackJobConfig() throws Exception {
when(facadeService.load("test")).thenReturn(Optional.<CloudJobConfiguration>absent());
SchedulingResult result = taskScheduler.scheduleOnce(Collections.singletonList(getTask("test")), Collections.singletonList(getLease(0, 1.5, 192)));
assertThat(result.getResultMap().size(), is(1));
assertThat(getAssignedTaskNumber(result), is(1));
}
@Test
public void assertLackAppConfig() throws Exception {
when(facadeService.load("test")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("test")));
when(facadeService.loadAppConfig("test_app")).thenReturn(Optional.<CloudAppConfiguration>absent());
SchedulingResult result = taskScheduler.scheduleOnce(Collections.singletonList(getTask("test")), Collections.singletonList(getLease(0, 1.5, 192)));
assertThat(result.getResultMap().size(), is(1));
assertThat(getAssignedTaskNumber(result), is(1));
}
private VirtualMachineLease getLease(final int index, final double cpus, final double mem) {
return new VMLeaseObject(Protos.Offer.newBuilder()
.setId(Protos.OfferID.newBuilder().setValue("offer" + index))
.setSlaveId(Protos.SlaveID.newBuilder().setValue("S" + index))
.setHostname("slave" + index)
.setFrameworkId(Protos.FrameworkID.newBuilder().setValue("f1"))
.addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus)))
.addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(mem)))
.build());
}
private List<TaskRequest> getTasks() {
List<TaskRequest> result = new ArrayList<>(20);
for (int i = 0; i < 20; i++) {
String jobName;
String appName;
if (i % 2 == 0) {
jobName = String.format("foo-%d", i);
appName = "foo-app";
} else {
jobName = String.format("bar-%d", i);
appName = "bar-app";
}
result.add(getTask(jobName));
when(facadeService.load(jobName)).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration(jobName, appName)));
}
when(facadeService.loadAppConfig("foo-app")).thenReturn(Optional.of(CloudAppConfigurationBuilder.createCloudAppConfiguration("foo-app")));
when(facadeService.loadAppConfig("bar-app")).thenReturn(Optional.of(CloudAppConfigurationBuilder.createCloudAppConfiguration("bar-app")));
return result;
}
private TaskRequest getTask(final String jobName) {
TaskRequest result = mock(TaskRequest.class);
when(result.getCPUs()).thenReturn(1.0d);
when(result.getMemory()).thenReturn(128.0d);
when(result.getHardConstraints()).thenAnswer(new Answer<List<? extends ConstraintEvaluator>>() {
@Override
public List<? extends ConstraintEvaluator> answer(final InvocationOnMock invocationOnMock) throws Throwable {
return ImmutableList.of(AppConstraintEvaluator.getInstance());
}
});
when(result.getId()).thenReturn(new TaskContext(jobName, Collections.singletonList(0), ExecutionType.READY).getId());
return result;
}
private int getAssignedTaskNumber(final SchedulingResult schedulingResult) {
int result = 0;
for (VMAssignmentResult each : schedulingResult.getResultMap().values()) {
result += each.getTasksAssigned().size();
}
return result;
}
}