blob: 0213a94bd6374f9ba297198f4a60d9976cb9c9c9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
/**
* Test cases for {@link DistributedSchedulingAMService}.
*/
public class TestDistributedSchedulingAMService {
// Test if the DistributedSchedulingAMService can handle both DSProtocol as
// well as AMProtocol clients
@Test
public void testRPCWrapping() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
final RMContext rmContext = new RMContextImpl() {
@Override
public AMLivelinessMonitor getAMLivelinessMonitor() {
return null;
}
@Override
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
c.setId(
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
AllocateRequest allReq =
(AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
allReq.setAskList(Arrays.asList(
ResourceRequest.newInstance(Priority.UNDEFINED, "a",
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingAMService service =
createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
// Verify that the DistrubutedSchedulingService can handle vanilla
// ApplicationMasterProtocol clients
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
RegisterApplicationMasterResponse regResp =
new RegisterApplicationMasterResponsePBImpl(
ampProxy.registerApplicationMaster(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(
RegisterApplicationMasterRequest.class)).getProto()));
Assert.assertEquals("dummyQueue", regResp.getQueue());
FinishApplicationMasterResponse finishResp =
new FinishApplicationMasterResponsePBImpl(
ampProxy.finishApplicationMaster(null,
((FinishApplicationMasterRequestPBImpl)factory
.newRecordInstance(
FinishApplicationMasterRequest.class)).getProto()
));
Assert.assertEquals(false, finishResp.getIsUnregistered());
AllocateResponse allocResp =
new AllocateResponsePBImpl(
ampProxy.allocate(null,
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto())
);
List<Container> allocatedContainers = allocResp.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(12345, allocResp.getNumClusterNodes());
// Verify that the DistrubutedSchedulingService can handle the
// DistributedSchedulingAMProtocol clients as well
RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
ProtobufRpcEngine.class);
DistributedSchedulingAMProtocolPB dsProxy =
RPC.getProxy(DistributedSchedulingAMProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
RegisterDistributedSchedulingAMResponse dsRegResp =
new RegisterDistributedSchedulingAMResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
dsRegResp.getMaxContainerResource().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinContainerResource().getMemorySize());
Assert.assertEquals(2,
dsRegResp.getIncrContainerResource().getVirtualCores());
DistributedSchedulingAllocateRequestPBImpl distAllReq =
(DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
DistributedSchedulingAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
DistributedSchedulingAllocateResponse dsAllocResp =
new DistributedSchedulingAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
dsProxy.finishApplicationMaster(null,
((FinishApplicationMasterRequestPBImpl) factory
.newRecordInstance(FinishApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(
false, dsfinishResp.getIsUnregistered());
}
private DistributedSchedulingAMService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
return new DistributedSchedulingAMService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
RegisterApplicationMasterResponse resp = factory.newRecordInstance(
RegisterApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setQueue("dummyQueue");
return resp;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
FinishApplicationMasterResponse resp = factory.newRecordInstance(
FinishApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setIsUnregistered(false);
return resp;
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
AllocateResponse response = factory.newRecordInstance(
AllocateResponse.class);
response.setNumClusterNodes(12345);
response.setAllocatedContainers(Arrays.asList(c));
return response;
}
@Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
RegisterDistributedSchedulingAMResponse resp = factory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
resp.setContainerIdStart(54321L);
resp.setMaxContainerResource(Resource.newInstance(4096, 4));
resp.setMinContainerResource(Resource.newInstance(1024, 1));
resp.setIncrContainerResource(Resource.newInstance(2048, 2));
return resp;
}
@Override
public DistributedSchedulingAllocateResponse
allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List<ResourceRequest> askList =
request.getAllocateRequest().getAskList();
List<Container> allocatedContainers = request.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;
}
};
}
}