blob: d4f79b56a477fa600601b26da287c8d5c61e898f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.test.Whitebox;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
import static org.junit.Assert.fail;
/**
* Test class for ReencryptionHandler.
*/
public class TestReencryptionHandler {
protected static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestReencryptionHandler.class);
@Rule
public Timeout globalTimeout = new Timeout(180 * 1000);
@Before
public void setup() {
GenericTestUtils.setLogLevel(ReencryptionHandler.LOG, Level.TRACE);
}
private ReencryptionHandler mockReencryptionhandler(final Configuration conf)
throws IOException {
// mock stuff to create a mocked ReencryptionHandler
FileSystemTestHelper helper = new FileSystemTestHelper();
Path targetFile = new Path(new File(helper.getTestRootDir())
.getAbsolutePath(), "test.jks");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + targetFile.toUri());
final EncryptionZoneManager ezm = Mockito.mock(EncryptionZoneManager.class);
final KeyProvider kp = KMSUtil.createKeyProvider(conf,
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
Mockito.when(ezm.getProvider()).thenReturn(
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
FSDirectory fsd = Mockito.mock(FSDirectory.class);
FSNamesystem fns = Mockito.mock(FSNamesystem.class);
Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
return new ReencryptionHandler(ezm, conf);
}
@Test
public void testThrottle() throws Exception {
final Configuration conf = new Configuration();
conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
0.5);
final ReencryptionHandler rh = mockReencryptionhandler(conf);
// mock StopWatches so all = 30s, locked = 20s. With ratio = .5, throttle
// should wait for 30 * 0.5 - 20 = 5s.
final StopWatch mockAll = Mockito.mock(StopWatch.class);
Mockito.when(mockAll.now(TimeUnit.MILLISECONDS)).thenReturn((long) 30000);
Mockito.when(mockAll.reset()).thenReturn(mockAll);
final StopWatch mockLocked = Mockito.mock(StopWatch.class);
Mockito.when(mockLocked.now(TimeUnit.MILLISECONDS))
.thenReturn((long) 20000);
Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
Whitebox.setInternalState(rh, "taskQueue", queue);
final StopWatch sw = new StopWatch().start();
rh.getTraverser().throttle();
sw.stop();
assertTrue("should have throttled for at least 8 second",
sw.now(TimeUnit.MILLISECONDS) > 8000);
assertTrue("should have throttled for at most 12 second",
sw.now(TimeUnit.MILLISECONDS) < 12000);
}
@Test
public void testThrottleNoOp() throws Exception {
final Configuration conf = new Configuration();
conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
0.5);
final ReencryptionHandler rh = mockReencryptionhandler(conf);
// mock StopWatches so all = 30s, locked = 10s. With ratio = .5, throttle
// should not happen.
StopWatch mockAll = Mockito.mock(StopWatch.class);
Mockito.when(mockAll.now()).thenReturn(new Long(30000));
Mockito.when(mockAll.reset()).thenReturn(mockAll);
StopWatch mockLocked = Mockito.mock(StopWatch.class);
Mockito.when(mockLocked.now()).thenReturn(new Long(10000));
Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
Whitebox.setInternalState(rh, "taskQueue", queue);
final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
submissions = new HashMap<>();
Whitebox.setInternalState(rh, "submissions", submissions);
StopWatch sw = new StopWatch().start();
rh.getTraverser().throttle();
sw.stop();
assertTrue("should not have throttled",
sw.now(TimeUnit.MILLISECONDS) < 1000);
}
@Test
public void testThrottleConfigs() throws Exception {
final Configuration conf = new Configuration();
conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
-1.0);
try {
mockReencryptionhandler(conf);
fail("Should not be able to init");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(" is not positive", e);
}
conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
0.0);
try {
mockReencryptionhandler(conf);
fail("Should not be able to init");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(" is not positive", e);
}
}
@Test
public void testThrottleAccumulatingTasks() throws Exception {
final Configuration conf = new Configuration();
final ReencryptionHandler rh = mockReencryptionhandler(conf);
// mock tasks piling up
final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
submissions = new HashMap<>();
final ReencryptionUpdater.ZoneSubmissionTracker zst =
new ReencryptionUpdater.ZoneSubmissionTracker();
submissions.put(new Long(1), zst);
Future mock = Mockito.mock(Future.class);
for (int i = 0; i < Runtime.getRuntime().availableProcessors() * 3; ++i) {
zst.addTask(mock);
}
Thread removeTaskThread = new Thread() {
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
LOG.info("removeTaskThread interrupted.");
Thread.currentThread().interrupt();
}
zst.getTasks().clear();
}
};
Whitebox.setInternalState(rh, "submissions", submissions);
final StopWatch sw = new StopWatch().start();
removeTaskThread.start();
rh.getTraverser().throttle();
sw.stop();
LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
assertTrue("should have throttled for at least 3 second",
sw.now(TimeUnit.MILLISECONDS) >= 3000);
}
}