blob: 48c61b480e44122de0240f7b2527398226a1fdba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.runtime.job.resource;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.junit.Assert;
import org.junit.Test;
public class JobCapacityControllerTest {
@Test
public void test() throws HyracksException {
IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33);
JobCapacityController capacityController = new JobCapacityController(resourceManager);
// Verifies the correctness of the allocate method.
Assert.assertTrue(capacityController.allocate(
makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
Assert.assertTrue(capacityController.allocate(
makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
Assert.assertTrue(capacityController.allocate(
makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
boolean exceedCapacity = false;
try {
capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
} catch (HyracksException e) {
exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
Assert.assertTrue(exceedCapacity);
Assert.assertTrue(capacityController.allocate(
makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
exceedCapacity = false;
try {
capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
} catch (HyracksException e) {
exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
Assert.assertTrue(exceedCapacity);
// Verifies that the release method does not leak resource.
capacityController.release(makeJobWithRequiredCapacity(4294967296L, 16));
Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize() == 4294967296L);
Assert.assertTrue(resourceManager.getCurrentCapacity().getAggregatedCores() == 33);
}
private IResourceManager makeResourceManagerWithCapacity(long memorySize, int cores) throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
resourceManager.update("node1", new NodeCapacity(memorySize, cores));
return resourceManager;
}
private JobSpecification makeJobWithRequiredCapacity(long memorySize, int cores) {
// Generates cluster capacity.
IClusterCapacity clusterCapacity = makeComputationCapacity(memorySize, cores);
// Generates a job.
JobSpecification job = mock(JobSpecification.class);
when(job.getRequiredClusterCapacity()).thenReturn(clusterCapacity);
return job;
}
private IClusterCapacity makeComputationCapacity(long memorySize, int cores) {
IClusterCapacity clusterCapacity = new ClusterCapacity();
clusterCapacity.setAggregatedMemoryByteSize(memorySize);
clusterCapacity.setAggregatedCores(cores);
return clusterCapacity;
}
}