blob: 4716bab830c09c61b2f4ff22e58b44fc6bba507f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
public class TestDistributedSchedulingService {
// Test if the DistributedSchedulingService can handle both DSProtocol as
// well as AMProtocol clients
@Test
public void testRPCWrapping() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
final RMContext rmContext = new RMContextImpl() {
@Override
public AMLivelinessMonitor getAMLivelinessMonitor() {
return null;
}
@Override
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
c.setId(
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
AllocateRequest allReq =
(AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
allReq.setAskList(Arrays.asList(
ResourceRequest.newInstance(Priority.UNDEFINED, "a",
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
// Verify that the DistrubutedSchedulingService can handle vanilla
// ApplicationMasterProtocol clients
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =
RPC.getProxy(ApplicationMasterProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
RegisterApplicationMasterResponse regResp =
new RegisterApplicationMasterResponsePBImpl(
ampProxy.registerApplicationMaster(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(
RegisterApplicationMasterRequest.class)).getProto()));
Assert.assertEquals("dummyQueue", regResp.getQueue());
FinishApplicationMasterResponse finishResp =
new FinishApplicationMasterResponsePBImpl(
ampProxy.finishApplicationMaster(null,
((FinishApplicationMasterRequestPBImpl)factory
.newRecordInstance(
FinishApplicationMasterRequest.class)).getProto()
));
Assert.assertEquals(false, finishResp.getIsUnregistered());
AllocateResponse allocResp =
new AllocateResponsePBImpl(
ampProxy.allocate(null,
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto())
);
List<Container> allocatedContainers = allocResp.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(12345, allocResp.getNumClusterNodes());
// Verify that the DistrubutedSchedulingService can handle the
// DistributedSchedulerProtocol clients as well
RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
ProtobufRpcEngine.class);
DistributedSchedulerProtocolPB dsProxy =
RPC.getProxy(DistributedSchedulerProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
DistSchedRegisterResponse dsRegResp =
new DistSchedRegisterResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
Assert.assertEquals(4,
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinAllocatableCapabilty().getMemorySize());
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
DistSchedAllocateRequestPBImpl distAllReq =
(DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
DistSchedAllocateRequest.class);
distAllReq.setAllocateRequest(allReq);
distAllReq.setAllocatedContainers(Arrays.asList(c));
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
dsProxy.finishApplicationMaster(null,
((FinishApplicationMasterRequestPBImpl) factory
.newRecordInstance(FinishApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(
false, dsfinishResp.getIsUnregistered());
}
private DistributedSchedulingService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
return new DistributedSchedulingService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
RegisterApplicationMasterResponse resp = factory.newRecordInstance(
RegisterApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setQueue("dummyQueue");
return resp;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
FinishApplicationMasterResponse resp = factory.newRecordInstance(
FinishApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setIsUnregistered(false);
return resp;
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
AllocateResponse response = factory.newRecordInstance(
AllocateResponse.class);
response.setNumClusterNodes(12345);
response.setAllocatedContainers(Arrays.asList(c));
return response;
}
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
DistSchedRegisterResponse resp = factory.newRecordInstance(
DistSchedRegisterResponse.class);
resp.setContainerIdStart(54321L);
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
return resp;
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
DistSchedAllocateRequest request) throws YarnException, IOException {
List<ResourceRequest> askList =
request.getAllocateRequest().getAskList();
List<Container> allocatedContainers = request.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;
}
};
}
}