blob: 34deb296dd6d4bf615647cb67c124c425dd04bc5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol.*;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestTajoResourceManager {
private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
TajoConf tajoConf;
long queryIdTime = System.currentTimeMillis();
int numWorkers = 5;
float workerDiskSlots = 5.0f;
int workerMemoryMB = 512 * 10;
WorkerResourceAllocationResponse response;
private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
tajoConf = new org.apache.tajo.conf.TajoConf();
tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
tajoWorkerResourceManager.init(tajoConf);
tajoWorkerResourceManager.start();
for(int i = 0; i < numWorkers; i++) {
ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
.setAvailableProcessors(1)
.setFreeMemoryMB(workerMemoryMB)
.setMaxMemoryMB(workerMemoryMB)
.setTotalMemoryMB(workerMemoryMB)
.build();
ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder()
.setFreeHeap(workerMemoryMB)
.setMaxHeap(workerMemoryMB)
.setTotalHeap(workerMemoryMB)
.build();
ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
.setAbsolutePath("/")
.setFreeSpace(0)
.setTotalSpace(0)
.setUsableSpace(0)
.build();
List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>();
disks.add(disk);
ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
.setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
.setTaskRunnerMode(BOOL_TRUE)
.setDiskSlots(workerDiskSlots)
.setMemoryResourceMB(workerMemoryMB)
.setJvmHeap(jvmHeap)
.setSystem(system)
.addAllDisk(disks)
.setRunningTaskNum(0)
.build();
NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
.setTajoWorkerHost("host" + (i + 1))
.setTajoQueryMasterPort(21000)
.setTajoWorkerHttpPort(28080 + i)
.setPeerRpcPort(12345)
.setServerStatus(serverStatus)
.build();
tajoWorkerResourceManager.getResourceTracker().heartbeat(null, tajoHeartbeat, NullCallback.get());
}
return tajoWorkerResourceManager;
}
@Test
public void testHeartbeat() throws Exception {
TajoWorkerResourceManager tajoWorkerResourceManager = null;
try {
tajoWorkerResourceManager = initResourceManager(false);
assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
}
} finally {
if (tajoWorkerResourceManager != null) {
tajoWorkerResourceManager.stop();
}
}
}
@Test
public void testMemoryResource() throws Exception {
TajoWorkerResourceManager tajoWorkerResourceManager = null;
try {
tajoWorkerResourceManager = initResourceManager(false);
final int minMemory = 256;
final int maxMemory = 512;
float diskSlots = 1.0f;
QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setResourceRequestPriority(ResourceRequestPriority.MEMORY)
.setNumContainers(60)
.setQueryId(queryId.getProto())
.setMaxDiskSlotPerContainer(diskSlots)
.setMinDiskSlotPerContainer(diskSlots)
.setMinMemoryMBPerContainer(minMemory)
.setMaxMemoryMBPerContainer(maxMemory)
.build();
final CountDownLatch barrier = new CountDownLatch(1);
final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@Override
public void run(WorkerResourceAllocationResponse response) {
TestTajoResourceManager.this.response = response;
barrier.countDown();
}
};
tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
assertTrue(barrier.await(3, TimeUnit.SECONDS));
// assert after callback
int totalUsedMemory = 0;
int totalUsedDisks = 0;
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(0, resource.getAvailableMemoryMB());
assertEquals(0, resource.getAvailableDiskSlots(), 0);
assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
totalUsedMemory += resource.getUsedMemoryMB();
totalUsedDisks += resource.getUsedDiskSlots();
}
assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size());
for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
assertTrue(
eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory);
containerIds.add(eachResource.getContainerId());
}
for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
assertEquals(0, resource.getUsedMemoryMB());
assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
}
} finally {
if (tajoWorkerResourceManager != null) {
tajoWorkerResourceManager.stop();
}
}
}
@Test
public void testMemoryNotCommensurable() throws Exception {
TajoWorkerResourceManager tajoWorkerResourceManager = null;
try {
tajoWorkerResourceManager = initResourceManager(false);
final int minMemory = 200;
final int maxMemory = 500;
float diskSlots = 1.0f;
QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
int requiredContainers = 60;
int numAllocatedContainers = 0;
int loopCount = 0;
while(true) {
WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setResourceRequestPriority(ResourceRequestPriority.MEMORY)
.setNumContainers(requiredContainers - numAllocatedContainers)
.setQueryId(queryId.getProto())
.setMaxDiskSlotPerContainer(diskSlots)
.setMinDiskSlotPerContainer(diskSlots)
.setMinMemoryMBPerContainer(minMemory)
.setMaxMemoryMBPerContainer(maxMemory)
.build();
final CountDownLatch barrier = new CountDownLatch(1);
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@Override
public void run(WorkerResourceAllocationResponse response) {
TestTajoResourceManager.this.response = response;
barrier.countDown();
}
};
tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
assertTrue(barrier.await(3, TimeUnit.SECONDS));
numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
//release resource
for(WorkerAllocatedResource eachResource:
TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
assertTrue(
eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory);
tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId());
}
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(0, resource.getUsedMemoryMB());
assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
}
loopCount++;
if(loopCount == 2) {
assertEquals(requiredContainers, numAllocatedContainers);
break;
}
}
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(0, resource.getUsedMemoryMB());
assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
}
} finally {
if (tajoWorkerResourceManager != null) {
tajoWorkerResourceManager.stop();
}
}
}
@Test
public void testDiskResource() throws Exception {
TajoWorkerResourceManager tajoWorkerResourceManager = null;
try {
tajoWorkerResourceManager = initResourceManager(false);
final float minDiskSlots = 1.0f;
final float maxDiskSlots = 2.0f;
int memoryMB = 256;
QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setResourceRequestPriority(ResourceRequestPriority.DISK)
.setNumContainers(60)
.setQueryId(queryId.getProto())
.setMaxDiskSlotPerContainer(maxDiskSlots)
.setMinDiskSlotPerContainer(minDiskSlots)
.setMinMemoryMBPerContainer(memoryMB)
.setMaxMemoryMBPerContainer(memoryMB)
.build();
final CountDownLatch barrier = new CountDownLatch(1);
final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@Override
public void run(WorkerResourceAllocationResponse response) {
TestTajoResourceManager.this.response = response;
barrier.countDown();
}
};
tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
assertTrue(barrier.await(3, TimeUnit.SECONDS));
for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
containerIds.add(eachResource.getContainerId());
}
// assert after callback
int totalUsedDisks = 0;
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
//each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
assertEquals(0, resource.getAvailableDiskSlots(), 0);
assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
assertEquals(256 * 3, resource.getUsedMemoryMB());
totalUsedDisks += resource.getUsedDiskSlots();
}
assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
WorkerResource resource = worker.getResource();
assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
assertEquals(0, resource.getUsedMemoryMB());
assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
}
} finally {
if (tajoWorkerResourceManager != null) {
tajoWorkerResourceManager.stop();
}
}
}
}