blob: d82a79c47d4b4514ba118b608e1e357ffbf599e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.services.security
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.RawLocalFileSystem
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.SecretManager
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager
import org.apache.hadoop.service.ServiceOperations
import org.apache.hadoop.util.Time
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.tools.CoreFileSystem
import org.apache.slider.server.appmaster.actions.ActionStopQueue
import org.apache.slider.server.appmaster.actions.QueueExecutor
import org.apache.slider.server.appmaster.actions.QueueService
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
@Slf4j
//@CompileStatic
class TestFsDelegationTokenManager {
QueueService queues;
FsDelegationTokenManager tokenManager;
Configuration conf;
UserGroupInformation currentUser;
@Before
void setup() {
queues = new QueueService();
conf = new Configuration()
conf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"TOKEN")
conf.setLong(
SliderXmlConfKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
1000)
queues.init(conf)
queues.start();
HadoopFS fs = new TestFileSystem()
CoreFileSystem coreFileSystem = new CoreFileSystem(fs, conf)
String[] groups = new String[1];
groups[0] = 'testGroup1'
currentUser = UserGroupInformation.createUserForTesting("test", groups)
UserGroupInformation.setLoginUser(currentUser)
tokenManager = new FsDelegationTokenManager(queues) {
@Override
protected int getRenewingLimit() {
return 5
}
@Override
protected org.apache.hadoop.fs.FileSystem getRemoteFileSystemForRenewal(Configuration config) throws IOException {
return new TestFileSystem();
}
@Override
protected String getRenewingActionName() {
return "TEST RENEW"
}
}
}
public static class DummySecretManager extends
AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
public DummySecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}
@Override
public DelegationTokenIdentifier createIdentifier() {
return null;
}
@Override
public byte[] createPassword(DelegationTokenIdentifier dtId) {
return new byte[1];
}
}
public class TestFileSystem extends RawLocalFileSystem {
int sequenceNum = 0;
SecretManager<DelegationTokenIdentifier> mgr =
new DummySecretManager(0, 0, 0, 0);
@Override
Token<DelegationTokenIdentifier> getDelegationToken(String renewer) throws IOException {
return new TestToken(getIdentifier(), mgr);
}
@Override
Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException {
Token[] tokens = new Token[1]
tokens[0] = new TestToken(getIdentifier(), mgr)
return tokens
}
private DelegationTokenIdentifier getIdentifier() {
def user = new Text(currentUser.getUserName())
def id = new DelegationTokenIdentifier(user, user, user)
id.setSequenceNumber(sequenceNum++)
id.setMaxDate(Time.now() + 10000)
return id
}
}
public class TestToken extends Token<DelegationTokenIdentifier> {
static long maxCount = 0;
private final AtomicLong renewCount = new AtomicLong()
private final AtomicLong totalCount = new AtomicLong()
public final AtomicBoolean expired = new AtomicBoolean(false);
public final AtomicBoolean cancelled = new AtomicBoolean(false);
TestToken(DelegationTokenIdentifier id, SecretManager<DelegationTokenIdentifier> mgr) {
super(id, mgr)
}
@Override
Text getService() {
return new Text("HDFS")
}
@Override
long renew(Configuration conf) throws IOException, InterruptedException {
totalCount.getAndIncrement();
if (maxCount > 0 && renewCount.getAndIncrement() > maxCount) {
renewCount.set(0L)
expired.set(true)
throw new IOException("Expired")
}
return Time.now() + 1000;
}
@Override
void cancel(Configuration conf) throws IOException, InterruptedException {
cancelled.set(true)
}
}
@After
void destroyService() {
ServiceOperations.stop(queues);
}
public void runQueuesToCompletion() {
new Thread(queues).start();
QueueExecutor ex = new QueueExecutor(queues)
ex.run();
}
public void runQueuesButNotToCompletion() {
new Thread(queues).start();
QueueExecutor ex = new QueueExecutor(queues)
new Thread(ex).start();
Thread.sleep(1000)
tokenManager.cancelDelegationToken(conf)
}
@Test
public void testRenew() throws Throwable {
tokenManager.acquireDelegationToken(conf)
def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
queues.schedule(stop);
runQueuesToCompletion()
TestToken token = (TestToken) currentUser.getTokens()[0]
assert token.totalCount.get() > 4
}
@Test
public void testCancel() throws Throwable {
tokenManager.acquireDelegationToken(conf)
def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
queues.schedule(stop);
runQueuesButNotToCompletion()
TestToken token = (TestToken) currentUser.getTokens()[0]
assert token.cancelled.get()
assert queues.lookupRenewingAction("TEST RENEW") == null
}
@Test
public void testRenewPastExpiry() throws Throwable {
try {
TestToken.maxCount = 3L
tokenManager.acquireDelegationToken(conf)
TestToken origToken = currentUser.getTokens()[0]
def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
queues.schedule(stop);
runQueuesToCompletion()
TestToken token = (TestToken) currentUser.getTokens()[0]
assert token != null
assert token != origToken
assert origToken.getService().equals(token.getService())
assert origToken.totalCount.get() > 4
assert origToken.expired.get()
} finally {
TestToken.maxCount = 0
}
}
}