blob: 6b66222b294867ae652163b2cdcd46051587b536 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDelegationTokenRenewer {
public abstract class RenewableFileSystem extends FileSystem
implements Renewable { }
private static final long RENEW_CYCLE = 1000;
private DelegationTokenRenewer renewer;
Configuration conf;
FileSystem fs;
@Before
public void setup() {
DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
DelegationTokenRenewer.reset();
renewer = DelegationTokenRenewer.getInstance();
}
@SuppressWarnings("unchecked")
@Test
public void testAddRemoveRenewAction() throws IOException,
InterruptedException {
Text service = new Text("myservice");
Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(service).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals("FileSystem not added to DelegationTokenRenewer", 1,
renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE*2);
verify(token, atLeast(2)).renew(eq(conf));
verify(token, atMost(3)).renew(eq(conf));
verify(token, never()).cancel(any(Configuration.class));
renewer.removeRenewAction(fs);
verify(token).cancel(eq(conf));
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
assertEquals("FileSystem not removed from DelegationTokenRenewer", 0,
renewer.getRenewQueueLength());
}
@Test
public void testAddRenewActionWithNoToken() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(null).when(fs).getRenewToken();
renewer.addRenewAction(fs);
verify(fs).getRenewToken();
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testGetNewTokenOnRenewFailure() throws IOException,
InterruptedException {
Text service = new Text("myservice");
Configuration conf = mock(Configuration.class);
final Token<?> token1 = mock(Token.class);
doReturn(service).when(token1).getService();
doThrow(new IOException("boom")).when(token1).renew(eq(conf));
final Token<?> token2 = mock(Token.class);
doReturn(service).when(token2).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token2).renew(eq(conf));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token1).doReturn(token2).when(fs).getRenewToken();
doReturn(token2).when(fs).getDelegationToken(null);
doAnswer(new Answer<Token<?>[]>() {
public Token<?>[] answer(InvocationOnMock invocation) {
return new Token<?>[]{token2};
}
}).when(fs).addDelegationTokens(null, null);
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(fs).getRenewToken();
verify(token1, atLeast(1)).renew(eq(conf));
verify(token1, atMost(2)).renew(eq(conf));
verify(fs).addDelegationTokens(null, null);
verify(fs).setDelegationToken(eq(token2));
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs);
verify(token2).cancel(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testStopRenewalWhenFsGone() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(new Text("myservice")).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
// drop weak ref
fs = null;
System.gc(); System.gc(); System.gc();
// next renew should detect the fs as gone
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test(timeout=4000)
public void testMultipleTokensDoNotDeadlock() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
FileSystem fs = mock(FileSystem.class);
doReturn(conf).when(fs).getConf();
long distantFuture = Time.now() + 3600 * 1000; // 1h
Token<?> token1 = mock(Token.class);
doReturn(new Text("myservice1")).when(token1).getService();
doReturn(distantFuture).when(token1).renew(eq(conf));
Token<?> token2 = mock(Token.class);
doReturn(new Text("myservice2")).when(token2).getService();
doReturn(distantFuture).when(token2).renew(eq(conf));
RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs1).getConf();
doReturn(token1).when(fs1).getRenewToken();
RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs2).getConf();
doReturn(token2).when(fs2).getRenewToken();
renewer.addRenewAction(fs1);
renewer.addRenewAction(fs2);
assertEquals(2, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs1);
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs2);
assertEquals(0, renewer.getRenewQueueLength());
verify(token1).cancel(eq(conf));
verify(token2).cancel(eq(conf));
}
}