| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs; |
| |
| import java.io.IOException; |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.*; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.Time; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestDelegationTokenRenewer { |
| public abstract class RenewableFileSystem extends FileSystem |
| implements Renewable { } |
| |
| private static final long RENEW_CYCLE = 1000; |
| |
| private DelegationTokenRenewer renewer; |
| Configuration conf; |
| FileSystem fs; |
| |
| @Before |
| public void setup() { |
| DelegationTokenRenewer.renewCycle = RENEW_CYCLE; |
| DelegationTokenRenewer.reset(); |
| renewer = DelegationTokenRenewer.getInstance(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testAddRemoveRenewAction() throws IOException, |
| InterruptedException { |
| Text service = new Text("myservice"); |
| Configuration conf = mock(Configuration.class); |
| |
| Token<?> token = mock(Token.class); |
| doReturn(service).when(token).getService(); |
| doAnswer(new Answer<Long>() { |
| public Long answer(InvocationOnMock invocation) { |
| return Time.now() + RENEW_CYCLE; |
| } |
| }).when(token).renew(any(Configuration.class)); |
| |
| RenewableFileSystem fs = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs).getConf(); |
| doReturn(token).when(fs).getRenewToken(); |
| |
| renewer.addRenewAction(fs); |
| |
| assertEquals("FileSystem not added to DelegationTokenRenewer", 1, |
| renewer.getRenewQueueLength()); |
| |
| Thread.sleep(RENEW_CYCLE*2); |
| verify(token, atLeast(2)).renew(eq(conf)); |
| verify(token, atMost(3)).renew(eq(conf)); |
| verify(token, never()).cancel(any(Configuration.class)); |
| renewer.removeRenewAction(fs); |
| verify(token).cancel(eq(conf)); |
| |
| verify(fs, never()).getDelegationToken(null); |
| verify(fs, never()).setDelegationToken(any(Token.class)); |
| |
| assertEquals("FileSystem not removed from DelegationTokenRenewer", 0, |
| renewer.getRenewQueueLength()); |
| } |
| |
| @Test |
| public void testAddRenewActionWithNoToken() throws IOException, |
| InterruptedException { |
| Configuration conf = mock(Configuration.class); |
| |
| RenewableFileSystem fs = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs).getConf(); |
| doReturn(null).when(fs).getRenewToken(); |
| |
| renewer.addRenewAction(fs); |
| |
| verify(fs).getRenewToken(); |
| assertEquals(0, renewer.getRenewQueueLength()); |
| } |
| |
| @Test |
| public void testGetNewTokenOnRenewFailure() throws IOException, |
| InterruptedException { |
| Text service = new Text("myservice"); |
| Configuration conf = mock(Configuration.class); |
| |
| final Token<?> token1 = mock(Token.class); |
| doReturn(service).when(token1).getService(); |
| doThrow(new IOException("boom")).when(token1).renew(eq(conf)); |
| |
| final Token<?> token2 = mock(Token.class); |
| doReturn(service).when(token2).getService(); |
| doAnswer(new Answer<Long>() { |
| public Long answer(InvocationOnMock invocation) { |
| return Time.now() + RENEW_CYCLE; |
| } |
| }).when(token2).renew(eq(conf)); |
| |
| RenewableFileSystem fs = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs).getConf(); |
| doReturn(token1).doReturn(token2).when(fs).getRenewToken(); |
| doReturn(token2).when(fs).getDelegationToken(null); |
| |
| doAnswer(new Answer<Token<?>[]>() { |
| public Token<?>[] answer(InvocationOnMock invocation) { |
| return new Token<?>[]{token2}; |
| } |
| }).when(fs).addDelegationTokens(null, null); |
| |
| renewer.addRenewAction(fs); |
| assertEquals(1, renewer.getRenewQueueLength()); |
| |
| Thread.sleep(RENEW_CYCLE); |
| verify(fs).getRenewToken(); |
| verify(token1, atLeast(1)).renew(eq(conf)); |
| verify(token1, atMost(2)).renew(eq(conf)); |
| verify(fs).addDelegationTokens(null, null); |
| verify(fs).setDelegationToken(eq(token2)); |
| assertEquals(1, renewer.getRenewQueueLength()); |
| |
| renewer.removeRenewAction(fs); |
| verify(token2).cancel(eq(conf)); |
| assertEquals(0, renewer.getRenewQueueLength()); |
| } |
| |
| @Test |
| public void testStopRenewalWhenFsGone() throws IOException, |
| InterruptedException { |
| Configuration conf = mock(Configuration.class); |
| |
| Token<?> token = mock(Token.class); |
| doReturn(new Text("myservice")).when(token).getService(); |
| doAnswer(new Answer<Long>() { |
| public Long answer(InvocationOnMock invocation) { |
| return Time.now() + RENEW_CYCLE; |
| } |
| }).when(token).renew(any(Configuration.class)); |
| |
| RenewableFileSystem fs = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs).getConf(); |
| doReturn(token).when(fs).getRenewToken(); |
| |
| renewer.addRenewAction(fs); |
| assertEquals(1, renewer.getRenewQueueLength()); |
| |
| Thread.sleep(RENEW_CYCLE); |
| verify(token, atLeast(1)).renew(eq(conf)); |
| verify(token, atMost(2)).renew(eq(conf)); |
| // drop weak ref |
| fs = null; |
| System.gc(); System.gc(); System.gc(); |
| // next renew should detect the fs as gone |
| Thread.sleep(RENEW_CYCLE); |
| verify(token, atLeast(1)).renew(eq(conf)); |
| verify(token, atMost(2)).renew(eq(conf)); |
| assertEquals(0, renewer.getRenewQueueLength()); |
| } |
| |
| @Test(timeout=4000) |
| public void testMultipleTokensDoNotDeadlock() throws IOException, |
| InterruptedException { |
| Configuration conf = mock(Configuration.class); |
| FileSystem fs = mock(FileSystem.class); |
| doReturn(conf).when(fs).getConf(); |
| |
| long distantFuture = Time.now() + 3600 * 1000; // 1h |
| Token<?> token1 = mock(Token.class); |
| doReturn(new Text("myservice1")).when(token1).getService(); |
| doReturn(distantFuture).when(token1).renew(eq(conf)); |
| |
| Token<?> token2 = mock(Token.class); |
| doReturn(new Text("myservice2")).when(token2).getService(); |
| doReturn(distantFuture).when(token2).renew(eq(conf)); |
| |
| RenewableFileSystem fs1 = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs1).getConf(); |
| doReturn(token1).when(fs1).getRenewToken(); |
| |
| RenewableFileSystem fs2 = mock(RenewableFileSystem.class); |
| doReturn(conf).when(fs2).getConf(); |
| doReturn(token2).when(fs2).getRenewToken(); |
| |
| renewer.addRenewAction(fs1); |
| renewer.addRenewAction(fs2); |
| assertEquals(2, renewer.getRenewQueueLength()); |
| |
| renewer.removeRenewAction(fs1); |
| assertEquals(1, renewer.getRenewQueueLength()); |
| renewer.removeRenewAction(fs2); |
| assertEquals(0, renewer.getRenewQueueLength()); |
| |
| verify(token1).cancel(eq(conf)); |
| verify(token2).cancel(eq(conf)); |
| } |
| } |