blob: b362beede86460eee6f0ba6ce1597424081ecdcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.mesos;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.ignite.mesos.resource.ResourceProvider;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.scheduler.Protos.OfferConstraints;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Scheduler tests.
*/
public class IgniteSchedulerSelfTest {
/** */
private IgniteScheduler scheduler;
/** */
@Before
public void setUp() throws Exception {
ClusterProperties clustProp = new ClusterProperties();
scheduler = new IgniteScheduler(clustProp, new ResourceProvider() {
@Override public String configName() {
return "config.xml";
}
@Override public String igniteUrl() {
return "ignite.jar";
}
@Override public String igniteConfigUrl() {
return "config.xml";
}
@Override public Collection<String> resourceUrl() {
return null;
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testHostRegister() throws Exception {
Protos.Offer offer = createOffer("hostname", 4, 1024);
DriverMock mock = new DriverMock();
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
assertEquals(1, mock.launchedTask.size());
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeclineByCpu() throws Exception {
Protos.Offer offer = createOffer("hostname", 4, 1024);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.cpus(2);
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
assertEquals(1, mock.launchedTask.size());
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
mock.clear();
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNull(mock.launchedTask);
Protos.OfferID declinedOffer = mock.declinedOffer;
assertEquals(offer.getId(), declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeclineByMem() throws Exception {
Protos.Offer offer = createOffer("hostname", 4, 1024);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.memory(512);
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
assertEquals(1, mock.launchedTask.size());
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
mock.clear();
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNull(mock.launchedTask);
Protos.OfferID declinedOffer = mock.declinedOffer;
assertEquals(offer.getId(), declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeclineByMemCpu() throws Exception {
Protos.Offer offer = createOffer("hostname", 1, 1024);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.cpus(4);
clustProp.memory(2000);
scheduler.setClusterProps(clustProp);
double totalMem = 0, totalCpu = 0;
for (int i = 0; i < 2; i++) {
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
assertEquals(1, mock.launchedTask.size());
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU);
totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM);
mock.clear();
}
assertEquals(2.0, totalCpu, 0);
assertEquals(2000.0, totalMem, 0);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNull(mock.launchedTask);
Protos.OfferID declinedOffer = mock.declinedOffer;
assertEquals(offer.getId(), declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeclineByCpuMinRequirements() throws Exception {
Protos.Offer offer = createOffer("hostname", 8, 10240);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.minCpuPerNode(12);
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.declinedOffer);
assertEquals(offer.getId(), mock.declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeclineByMemMinRequirements() throws Exception {
Protos.Offer offer = createOffer("hostname", 8, 10240);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.minMemoryPerNode(15000);
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.declinedOffer);
assertEquals(offer.getId(), mock.declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testHosthameConstraint() throws Exception {
Protos.Offer offer = createOffer("hostname", 8, 10240);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.hostnameConstraint(Pattern.compile("hostname"));
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.declinedOffer);
assertEquals(offer.getId(), mock.declinedOffer);
offer = createOffer("hostnameAccept", 8, 10240);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
assertEquals(1, mock.launchedTask.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testPerNode() throws Exception {
Protos.Offer offer = createOffer("hostname", 8, 1024);
DriverMock mock = new DriverMock();
ClusterProperties clustProp = new ClusterProperties();
clustProp.memoryPerNode(1024);
clustProp.cpusPerNode(2);
scheduler.setClusterProps(clustProp);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNotNull(mock.launchedTask);
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
mock.clear();
offer = createOffer("hostname", 1, 2048);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNull(mock.launchedTask);
assertNotNull(mock.declinedOffer);
assertEquals(offer.getId(), mock.declinedOffer);
mock.clear();
offer = createOffer("hostname", 4, 512);
scheduler.resourceOffers(mock, Collections.singletonList(offer));
assertNull(mock.launchedTask);
assertNotNull(mock.declinedOffer);
assertEquals(offer.getId(), mock.declinedOffer);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIgniteFramework() throws Exception {
final String mesosUserValue = "userAAAAA";
final String mesosRoleValue = "role1";
IgniteFramework igniteFramework = new IgniteFramework() {
@Override protected String getUser() {
return mesosUserValue;
}
@Override protected String getRole() {
return mesosRoleValue;
}
};
Protos.FrameworkInfo info = igniteFramework.getFrameworkInfo();
String actualUserValue = info.getUser();
String actualRoleValue = info.getRole();
assertEquals(actualUserValue, mesosUserValue);
assertEquals(actualRoleValue, mesosRoleValue);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMesosRoleValidation() throws Exception {
List<String> failedRoleValues = Arrays.asList("", ".", "..", "-testRole",
"test/Role", "test\\Role", "test Role", null);
for (String failedRoleValue : failedRoleValues)
assertFalse(IgniteFramework.isRoleValid(failedRoleValue));
}
/**
* @param resourceType Resource type.
* @return Value.
*/
private Double resources(List<Protos.Resource> resources, String resourceType) {
for (Protos.Resource resource : resources) {
if (resource.getName().equals(resourceType))
return resource.getScalar().getValue();
}
return null;
}
/**
* @param hostname Hostname
* @param cpu Cpu count.
* @param mem Mem size.
* @return Offer.
*/
private Protos.Offer createOffer(String hostname, double cpu, double mem) {
return Protos.Offer.newBuilder()
.setId(Protos.OfferID.newBuilder().setValue("1"))
.setSlaveId(Protos.SlaveID.newBuilder().setValue("1"))
.setFrameworkId(Protos.FrameworkID.newBuilder().setValue("1"))
.setHostname(hostname)
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
.setName(IgniteScheduler.CPU)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
.build())
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
.setName(IgniteScheduler.MEM)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build())
.build())
.build();
}
/**
* No-op implementation.
*/
public static class DriverMock implements SchedulerDriver {
/** */
Collection<Protos.TaskInfo> launchedTask;
/** */
Protos.OfferID declinedOffer;
/**
* Clears launched task.
*/
public void clear() {
launchedTask = null;
declinedOffer = null;
}
/** {@inheritDoc} */
@Override public Protos.Status start() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status stop(boolean failover) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status stop() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status abort() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status join() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status run() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status requestResources(Collection<Protos.Request> requests) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
Collection<Protos.TaskInfo> tasks, Protos.Filters filters) {
launchedTask = tasks;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
Collection<Protos.TaskInfo> tasks) {
launchedTask = tasks;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks,
Protos.Filters filters) {
launchedTask = tasks;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) {
launchedTask = tasks;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status killTask(Protos.TaskID taskId) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status acceptOffers(Collection<Protos.OfferID> collection,
Collection<Protos.Offer.Operation> collection1, Protos.Filters filters) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) {
declinedOffer = offerId;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status declineOffer(Protos.OfferID offerId) {
declinedOffer = offerId;
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status reviveOffers() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status reviveOffers(Collection<String> collection) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status suppressOffers() {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status suppressOffers(Collection<String> collection) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status acknowledgeStatusUpdate(Protos.TaskStatus status) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status sendFrameworkMessage(Protos.ExecutorID executorId, Protos.SlaveID slaveId,
byte[] data) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status reconcileTasks(Collection<Protos.TaskStatus> statuses) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo, Collection<String> collection,
OfferConstraints offerConstraints) {
return null;
}
/** {@inheritDoc} */
@Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo,
Collection<String> collection) {
return null;
}
}
}