blob: 5c059aa4550a4252de04a4936e876cc67113343e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
private final String host;
private int port; // Ephemeral port is chosen later
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
protected final boolean allowInsecurePorts;
/**
* If not null, this will be used as the socket to use to connect to the
* system portmap daemon when registering this RPC server program.
*/
private final DatagramSocket registrationSocket;
/*
* Timeout value in millisecond for the rpc connection to portmap
*/
private final int portmapUdpTimeoutMillis;
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
this(program, host, port, progNumber, lowProgVersion, highProgVersion,
registrationSocket, allowInsecurePorts, 500);
}
/**
* Constructor
*
* @param program program name
* @param host host where the Rpc server program is started
* @param port port where the Rpc server program is listening to
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
* @param registrationSocket if not null, use this socket to register
* with portmap daemon
* @param allowInsecurePorts true to allow client connections from
* unprivileged ports, false otherwise
* @param portmapUdpTimeoutMillis timeout in milliseconds for RPC connection
*/
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket, boolean allowInsecurePorts,
int portmapUdpTimeoutMillis) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
this.registrationSocket = registrationSocket;
this.allowInsecurePorts = allowInsecurePorts;
this.portmapUdpTimeoutMillis = portmapUdpTimeoutMillis;
LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
+ "connections from unprivileged ports");
}
/**
* Register this program with the local portmapper.
* @param transport transport layer for port map
* @param boundPort port number of bounded RPC program
*/
public void register(int transport, int boundPort) {
if (boundPort != port) {
LOG.info("The bound port is " + boundPort
+ ", different with configured port " + port);
port = boundPort;
}
// Register all the program versions with portmapper for a given transport
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
port);
register(mapEntry, true);
}
}
/**
* Unregister this program with the local portmapper.
* @param transport transport layer for port map
* @param boundPort port number of bounded RPC program
*/
public void unregister(int transport, int boundPort) {
if (boundPort != port) {
LOG.info("The bound port is " + boundPort
+ ", different with configured port " + port);
port = boundPort;
}
// Unregister all the program versions with portmapper for a given transport
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
port);
register(mapEntry, false);
}
}
/**
* Register the program with Portmap or Rpcbind.
* @param mapEntry port map entries
* @param set specifies registration or not
*/
protected void register(PortmapMapping mapEntry, boolean set) {
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
mappingRequest, true, registrationSocket, portmapUdpTimeoutMillis);
try {
registrationClient.run();
} catch (IOException e) {
String request = set ? "Registration" : "Unregistration";
LOG.error(request + " failure with " + host + ":" + port
+ ", portmap entry: " + mapEntry);
throw new RuntimeException(request + " failure", e);
}
}
// Start extra daemons or services
public void startDaemons() {}
public void stopDaemons() {}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcCall call = (RpcCall) info.header();
SocketAddress remoteAddress = info.remoteAddress();
if (LOG.isTraceEnabled()) {
LOG.trace(program + " procedure #" + call.getProcedure());
}
if (this.progNumber != call.getProgram()) {
LOG.warn("Invalid RPC call program " + call.getProgram());
sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx);
return;
}
int ver = call.getVersion();
if (ver < lowProgVersion || ver > highProgVersion) {
LOG.warn("Invalid RPC call version " + ver);
sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx);
return;
}
handleInternal(ctx, info);
}
public boolean doPortMonitoring(SocketAddress remoteAddress) {
if (!allowInsecurePorts) {
if (LOG.isTraceEnabled()) {
LOG.trace("Will not allow connections from unprivileged ports. "
+ "Checking for valid client port...");
}
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress;
if (inetRemoteAddress.getPort() > 1023) {
LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
+ "which is an unprivileged port. Rejecting connection.");
return false;
}
} else {
LOG.warn("Could not determine remote port of socket address '"
+ remoteAddress + "'. Rejecting connection.");
return false;
}
}
return true;
}
private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
AcceptState acceptState, ChannelHandlerContext ctx) {
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
acceptState, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
if (acceptState == AcceptState.PROG_MISMATCH) {
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
}
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
}
protected static void sendRejectedReply(RpcCall call,
SocketAddress remoteAddress, ChannelHandlerContext ctx) {
XDR out = new XDR();
RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),
RpcReply.ReplyState.MSG_DENIED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
reply.write(out);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
}
protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
@Override
public String toString() {
return "Rpc program: " + program + " at " + host + ":" + port;
}
protected abstract boolean isIdempotent(RpcCall call);
public int getPort() {
return port;
}
@VisibleForTesting
public int getPortmapUdpTimeoutMillis() {
return portmapUdpTimeoutMillis;
}
}