blob: 6dc0c766ec9d0d1ef2a87d9650066fcaebf60cda [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller;
import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
public class TestValueQueue {
Logger LOG = LoggerFactory.getLogger(TestValueQueue.class);
private static class FillInfo {
final int num;
final String key;
FillInfo(int num, String key) {
this.num = num;
this.key = key;
}
}
private static class MockFiller implements QueueRefiller<String> {
final LinkedBlockingQueue<FillInfo> fillCalls =
new LinkedBlockingQueue<FillInfo>();
@Override
public void fillQueueForKey(String keyName, Queue<String> keyQueue,
int numValues) throws IOException {
fillCalls.add(new FillInfo(numValues, keyName));
for(int i = 0; i < numValues; i++) {
keyQueue.add("test");
}
}
public FillInfo getTop() throws InterruptedException {
return fillCalls.poll(500, TimeUnit.MILLISECONDS);
}
}
private void waitForRefill(final ValueQueue<?> valueQueue,
final String queueName, final int queueSize)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int size = valueQueue.getSize(queueName);
if (size != queueSize) {
LOG.info("Current ValueQueue size is " + size);
return false;
}
return true;
}
}, 100, 3000);
}
/**
* Verifies that Queue is initially filled to "numInitValues"
*/
@Test(timeout=30000)
public void testInitFill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
vq.shutdown();
}
/**
* Verifies that Queue is initialized (Warmed-up) for provided keys
*/
@Test(timeout=30000)
public void testWarmUp() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
vq.initializeQueuesForKeys("k1", "k2", "k3");
FillInfo[] fillInfos =
{filler.getTop(), filler.getTop(), filler.getTop()};
Assert.assertEquals(5, fillInfos[0].num);
Assert.assertEquals(5, fillInfos[1].num);
Assert.assertEquals(5, fillInfos[2].num);
Assert.assertEquals(Sets.newHashSet("k1", "k2", "k3"),
Sets.newHashSet(fillInfos[0].key,
fillInfos[1].key,
fillInfos[2].key));
vq.shutdown();
}
/**
* Verifies that the refill task is executed after "checkInterval" if
* num values below "lowWatermark"
*/
@Test(timeout=30000)
public void testRefill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(100, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (10) and an async refill (91)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(10, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 100);
// Refill task should add 91 values to get to a full queue (10 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(91, filler.getTop().num);
vq.shutdown();
}
/**
* Verifies that the No refill Happens after "checkInterval" if
* num values above "lowWatermark"
*/
@Test(timeout=30000)
public void testNoRefill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (5) and an async refill (6)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(5, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 6 values to get to a full queue (5 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(6, filler.getTop().num);
// Take another value, queue is still above the watermark
Assert.assertEquals("test", vq.getNext("k1"));
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertEquals(null, filler.getTop());
vq.shutdown();
}
/**
* Verify getAtMost when SyncGeneration Policy = ALL
*/
@Test(timeout=30000)
public void testgetAtMostPolicyALL() throws Exception {
MockFiller filler = new MockFiller();
final ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop());
// Synchronous call:
// 1. Synchronously fill returned list
// 2. Start another async task to fill the queue in the cache
Assert.assertEquals("Failed in sync call.", 10,
vq.getAtMost("k1", 10).size());
Assert.assertEquals("Sync call filler got wrong number.", 10,
filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
// Drain completely after filled by the async thread
vq.drain("k1");
Assert.assertEquals("Failed to drain completely after async.", 0,
vq.getSize("k1"));
// Synchronous call
Assert.assertEquals("Failed to get all 19.", 19,
vq.getAtMost("k1", 19).size());
Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num);
vq.shutdown();
}
/**
* Verify getAtMost when SyncGeneration Policy = ALL
*/
@Test(timeout=30000)
public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.ATLEAST_ONE, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Queue is empty, sync will return a single value and trigger a refill
Assert.assertEquals(1, vq.getAtMost("k1", 10).size());
Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown();
}
/**
* Verify getAtMost when SyncGeneration Policy = LOW_WATERMARK
*/
@Test(timeout=30000)
public void testgetAtMostPolicyLOW_WATERMARK() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.LOW_WATERMARK, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Queue is empty, sync will return 3 values and trigger a refill
Assert.assertEquals(3, vq.getAtMost("k1", 10).size());
Assert.assertEquals(3, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown();
}
@Test(timeout=30000)
public void testDrain() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop());
vq.shutdown();
}
}