blob: 9381b76658f0796ce64243dd8dbdf51895bc3518 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.rpc;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.tools.Duration;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.ErrorStrings;
import org.apache.slider.core.exceptions.ServiceNotReadyException;
import org.apache.slider.core.exceptions.SliderException;
import static org.apache.slider.common.SliderXmlConfKeys.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
public class RpcBinder {
protected static final Logger log =
LoggerFactory.getLogger(RpcBinder.class);
/**
* Create a protobuf server bonded to the specific socket address
* @param addr address to listen to; 0.0.0.0 as hostname acceptable
* @param conf config
* @param secretManager token secret handler
* @param numHandlers threads to service requests
* @param blockingService service to handle
* @param portRangeConfig range of ports
* @return the IPC server itself
* @throws IOException
*/
public static Server createProtobufServer(InetSocketAddress addr,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers,
BlockingService blockingService,
String portRangeConfig) throws
IOException {
Class<SliderClusterProtocolPB> sliderClusterAPIClass = registerSliderAPI(
conf);
RPC.Server server = new RPC.Builder(conf).setProtocol(sliderClusterAPIClass)
.setInstance(blockingService)
.setBindAddress(addr.getHostName())
.setPort(addr.getPort())
.setNumHandlers(numHandlers)
.setVerbose(false)
.setSecretManager(secretManager)
.setPortRangeConfig(
portRangeConfig)
.build();
log.debug(
"Adding protocol " + sliderClusterAPIClass.getCanonicalName() + " to the server");
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, sliderClusterAPIClass,
blockingService);
return server;
}
/**
* Add the protobuf engine to the configuration. Harmless and inexpensive
* if repeated.
* @param conf configuration to patch
* @return the protocol class
*/
public static Class<SliderClusterProtocolPB> registerSliderAPI(
Configuration conf) {
Class<SliderClusterProtocolPB> sliderClusterAPIClass =
SliderClusterProtocolPB.class;
RPC.setProtocolEngine(conf, sliderClusterAPIClass, ProtobufRpcEngine.class);
//quick sanity check here
assert verifyBondedToProtobuf(conf, sliderClusterAPIClass);
return sliderClusterAPIClass;
}
/**
* Verify that the conf is set up for protobuf transport of Slider RPC
* @param conf configuration
* @param sliderClusterAPIClass class for the API
* @return true if the RPC engine is protocol buffers
*/
public static boolean verifyBondedToProtobuf(Configuration conf,
Class<SliderClusterProtocolPB> sliderClusterAPIClass) {
return conf.getClass("rpc.engine." + sliderClusterAPIClass.getName(),
RpcEngine.class) .equals(ProtobufRpcEngine.class);
}
/**
* Connect to a server. May include setting up retry policies
* @param addr
* @param currentUser
* @param conf
* @param rpcTimeout
* @return
* @throws IOException
*/
public static SliderClusterProtocol connectToServer(InetSocketAddress addr,
UserGroupInformation currentUser,
Configuration conf,
int rpcTimeout) throws IOException {
Class<SliderClusterProtocolPB> sliderClusterAPIClass =
registerSliderAPI(conf);
final RetryPolicy retryPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
KEY_IPC_CLIENT_RETRY_POLICY_ENABLED,
IPC_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
KEY_IPC_CLIENT_RETRY_POLICY_SPEC,
IPC_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
ServiceNotReadyException.class);
log.debug("Connecting to Slider AM at {}", addr);
ProtocolProxy<SliderClusterProtocolPB> protoProxy =
RPC.getProtocolProxy(sliderClusterAPIClass,
1,
addr,
currentUser,
conf,
NetUtils.getDefaultSocketFactory(conf),
rpcTimeout,
retryPolicy);
SliderClusterProtocolPB endpoint = protoProxy.getProxy();
return new SliderClusterProtocolProxy(endpoint, addr);
}
/**
* This loops for a limited period trying to get the Proxy -
* by doing so it handles AM failover
* @param conf configuration to patch and use
* @param rmClient client of the resource manager
* @param application application to work with
* @param connectTimeout timeout for the whole proxy operation to timeout
* (milliseconds). Use 0 to indicate "do not attempt to wait" -fail fast.
* @param rpcTimeout timeout for RPCs to block during communications
* @return the proxy
* @throws IOException IO problems
* @throws YarnException Slider-generated exceptions related to the binding
* failing. This can include the application finishing or timeouts
* @throws InterruptedException if a sleep operation waiting for
* the cluster to respond is interrupted.
*/
@SuppressWarnings("NestedAssignment")
public static SliderClusterProtocol getProxy(final Configuration conf,
final ApplicationClientProtocol rmClient,
ApplicationReport application,
final int connectTimeout,
final int rpcTimeout)
throws IOException, YarnException, InterruptedException {
ApplicationId appId;
appId = application.getApplicationId();
Duration timeout = new Duration(connectTimeout);
timeout.start();
Exception exception = null;
YarnApplicationState state = null;
while (application != null &&
(state = application.getYarnApplicationState()).equals(
YarnApplicationState.RUNNING)) {
try {
return getProxy(conf, application, rpcTimeout);
} catch (IOException e) {
if (connectTimeout <= 0 || timeout.getLimitExceeded()) {
throw e;
}
exception = e;
} catch (YarnException e) {
if (connectTimeout <= 0 || timeout.getLimitExceeded()) {
throw e;
}
exception = e;
}
//at this point: app failed to work
log.debug("Could not connect to {}. Waiting for getting the latest AM address...",
appId);
Thread.sleep(1000);
//or get the app report
application =
rmClient.getApplicationReport(
GetApplicationReportRequest.newInstance(appId)).getApplicationReport();
}
//get here if the app is no longer running. Raise a specific
//exception but init it with the previous failure
throw new BadClusterStateException(
exception,
ErrorStrings.E_FINISHED_APPLICATION, appId, state );
}
/**
* Get a proxy from the application report
* @param conf config to use
* @param application app report
* @param rpcTimeout timeout in RPC operations
* @return the proxy
* @throws IOException
* @throws SliderException
* @throws InterruptedException
*/
public static SliderClusterProtocol getProxy(final Configuration conf,
final ApplicationReport application,
final int rpcTimeout)
throws IOException, SliderException, InterruptedException {
String host = application.getHost();
int port = application.getRpcPort();
org.apache.hadoop.yarn.api.records.Token clientToAMToken =
application.getClientToAMToken();
return createProxy(conf, host, port, clientToAMToken, rpcTimeout);
}
/**
*
* @param conf config to use
* @param host hosname
* @param port port
* @param clientToAMToken auth token: only used in a secure cluster.
* converted via {@link ConverterUtils#convertFromYarn(org.apache.hadoop.yarn.api.records.Token, InetSocketAddress)}
* @param rpcTimeout timeout in RPC operations
* @return the proxy
* @throws SliderException
* @throws IOException
* @throws InterruptedException
*/
public static SliderClusterProtocol createProxy(final Configuration conf,
String host,
int port,
org.apache.hadoop.yarn.api.records.Token clientToAMToken,
final int rpcTimeout) throws
SliderException,
IOException,
InterruptedException {
String address = host + ":" + port;
if (SliderUtils.isUnset(host) || 0 == port) {
throw new SliderException(SliderExitCodes.EXIT_CONNECTIVITY_PROBLEM,
"Slider instance "
+ " isn't providing a valid address for the" +
" Slider RPC protocol: " + address);
}
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
final UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
currentUser.getUserName());
final InetSocketAddress serviceAddr =
NetUtils.createSocketAddrForHost(host, port);
SliderClusterProtocol realProxy;
log.debug("Connecting to {}", serviceAddr);
if (UserGroupInformation.isSecurityEnabled()) {
Preconditions.checkArgument(clientToAMToken != null,
"Null clientToAMToken");
Token<ClientToAMTokenIdentifier> token =
ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr);
newUgi.addToken(token);
realProxy =
newUgi.doAs(new PrivilegedExceptionAction<SliderClusterProtocol>() {
@Override
public SliderClusterProtocol run() throws IOException {
return connectToServer(serviceAddr, newUgi, conf, rpcTimeout);
}
});
} else {
realProxy = connectToServer(serviceAddr, newUgi, conf, rpcTimeout);
}
return realProxy;
}
}