blob: c75c722bec5028592e9dd8bf79ce1fde85ba3c81 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationUtilsClient;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
public class TestDelegationTokenRemoteFetcher {
private static final Logger LOG = Logger
.getLogger(TestDelegationTokenRemoteFetcher.class);
private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta";
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
private int httpPort;
private URI serviceUrl;
private FileSystem fileSys;
private Configuration conf;
private ServerBootstrap bootstrap;
private Token<DelegationTokenIdentifier> testToken;
private volatile AssertionError assertionError;
@Before
public void init() throws Exception {
conf = new Configuration();
fileSys = FileSystem.getLocal(conf);
httpPort = NetUtils.getFreeSocketPort();
serviceUrl = new URI("http://localhost:" + httpPort);
testToken = createToken(serviceUrl);
}
@After
public void clean() throws IOException {
if (fileSys != null)
fileSys.delete(new Path(tokenFile), true);
if (bootstrap != null)
bootstrap.releaseExternalResources();
}
/**
* try to fetch token without http server with IOException
*/
@Test
public void testTokenFetchFail() throws Exception {
try {
DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
tokenFile });
fail("Token fetcher shouldn't start in absense of NN");
} catch (IOException ex) {
}
}
/**
* try to fetch token without http server with IOException
*/
@Test
public void testTokenRenewFail() throws AuthenticationException {
try {
DelegationUtilsClient.renewDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
} catch (IOException ex) {
}
}
/**
* try cancel token without http server with IOException
*/
@Test
public void expectedTokenCancelFail() throws AuthenticationException {
try {
DelegationUtilsClient.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
} catch (IOException ex) {
}
}
/**
* try fetch token and get http response with error
*/
@Test
public void expectedTokenRenewErrorHttpResponse()
throws AuthenticationException, URISyntaxException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
try {
DelegationUtilsClient.renewDelegationToken(connectionFactory, new URI(
serviceUrl.toString() + "/exception"), createToken(serviceUrl));
fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+ " NN URL");
} catch (IOException ex) {
}
if (assertionError != null)
throw assertionError;
}
/**
*
*/
@Test
public void testCancelTokenFromHttp() throws IOException,
AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationUtilsClient.cancelDelegationToken(connectionFactory, serviceUrl,
testToken);
if (assertionError != null)
throw assertionError;
}
/**
* Call renew token using http server return new expiration time
*/
@Test
public void testRenewTokenFromHttp() throws IOException,
NumberFormatException, AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
assertTrue("testRenewTokenFromHttp error",
Long.parseLong(EXP_DATE) == DelegationUtilsClient.renewDelegationToken(
connectionFactory, serviceUrl, testToken));
if (assertionError != null)
throw assertionError;
}
/**
* Call fetch token using http server
*/
@Test
public void expectedTokenIsRetrievedFromHttp() throws Exception {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
tokenFile });
Path p = new Path(fileSys.getWorkingDirectory(), tokenFile);
Credentials creds = Credentials.readTokenStorageFile(p, conf);
Iterator<Token<?>> itr = creds.getAllTokens().iterator();
assertTrue("token not exist error", itr.hasNext());
Token<?> fetchedToken = itr.next();
Assert.assertArrayEquals("token wrong identifier error",
testToken.getIdentifier(), fetchedToken.getIdentifier());
Assert.assertArrayEquals("token wrong password error",
testToken.getPassword(), fetchedToken.getPassword());
if (assertionError != null)
throw assertionError;
}
private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
byte[] pw = "hadoop".getBytes();
byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
"renewer"), new Text("realuser")).getBytes();
Text service = new Text(serviceUri.toString());
return new Token<DelegationTokenIdentifier>(ident, pw,
WebHdfsConstants.HFTP_TOKEN_KIND, service);
}
private interface Handler {
void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException;
}
private class FetchHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
Credentials creds = new Credentials();
creds.addToken(new Text(serviceUrl), token);
DataOutputBuffer out = new DataOutputBuffer();
creds.write(out);
int fileLength = out.getData().length;
ChannelBuffer cbuffer = ChannelBuffers.buffer(fileLength);
cbuffer.writeBytes(out.getData());
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(fileLength));
response.setContent(cbuffer);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class RenewHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
byte[] bytes = EXP_DATE.getBytes();
ChannelBuffer cbuffer = ChannelBuffers.buffer(bytes.length);
cbuffer.writeBytes(bytes);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(bytes.length));
response.setContent(cbuffer);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class ExceptionHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.METHOD_NOT_ALLOWED);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class CancelHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private final class CredentialsLogicHandler extends
SimpleChannelUpstreamHandler {
private final Token<DelegationTokenIdentifier> token;
private final String serviceUrl;
private final ImmutableMap<String, Handler> routes = ImmutableMap.of(
"/exception", new ExceptionHandler(),
"/cancelDelegationToken", new CancelHandler(),
"/getDelegationToken", new FetchHandler() ,
"/renewDelegationToken", new RenewHandler());
public CredentialsLogicHandler(Token<DelegationTokenIdentifier> token,
String serviceUrl) {
this.token = token;
this.serviceUrl = serviceUrl;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() == HttpMethod.OPTIONS) {
// Mimic SPNEGO authentication
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.OK);
response.addHeader("Set-Cookie", "hadoop-auth=1234");
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
} else if (request.getMethod() != GET) {
e.getChannel().close();
}
UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
.iterator();
while (iter.hasNext()) {
Map.Entry<String, Handler> entry = iter.next();
if (request.getUri().contains(entry.getKey())) {
Handler handler = entry.getValue();
try {
handler.handle(e.getChannel(), token, serviceUrl);
} catch (AssertionError ee) {
TestDelegationTokenRemoteFetcher.this.assertionError = ee;
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.BAD_REQUEST);
response.setContent(ChannelBuffers.copiedBuffer(ee.getMessage(),
Charset.defaultCharset()));
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
}
return;
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Channel ch = e.getChannel();
Throwable cause = e.getCause();
if (LOG.isDebugEnabled())
LOG.debug(cause.getMessage());
ch.close().addListener(ChannelFutureListener.CLOSE);
}
}
private ServerBootstrap startHttpServer(int port,
final Token<DelegationTokenIdentifier> token, final URI url) {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(65536), new HttpResponseEncoder(),
new CredentialsLogicHandler(token, url.toString()));
}
});
bootstrap.bind(new InetSocketAddress("localhost", port));
return bootstrap;
}
}