blob: a4d6a012a8118e9863a504adc5e4096d65f97674 [file]
/*
* Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.bucket;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
public class TimeBasedBucketManagerTest
{
private static final String APPLICATION_PATH_PREFIX = "target/TimeBasedBucketManagerTest";
private static final long BUCKET_SPAN = 60000; //1 minute
private static TestBucketManager<DummyEvent> manager;
private static String applicationPath;
private static class TestBucketManager<T extends Event & Bucketable> extends TimeBasedBucketManagerImpl<T>
{
TestBucketManager()
{
super();
}
}
@Test
public void testExpiration() throws InterruptedException
{
DummyEvent event1 = new DummyEvent(1, manager.startOfBucketsInMillis + 10 * BUCKET_SPAN);
long bucket1 = manager.getBucketKeyFor(event1);
DummyEvent event2 = new DummyEvent(1, manager.startOfBucketsInMillis + (manager.noOfBuckets + 10) * BUCKET_SPAN);
long bucket2 = manager.getBucketKeyFor(event2);
Assert.assertEquals("bucket index", bucket1 % manager.noOfBuckets, bucket2 % manager.noOfBuckets);
bucket1 = manager.getBucketKeyFor(event1);
Assert.assertEquals("expired event", bucket1, -1);
long rBucket2 = manager.getBucketKeyFor(event2);
Assert.assertEquals("valid event", bucket2, rBucket2);
}
@Test
public void testClone() throws CloneNotSupportedException, InterruptedException
{
AbstractTimeBasedBucketManager<DummyEvent> clonedManager = manager.clone();
Assert.assertNotNull(clonedManager);
Assert.assertNotNull(clonedManager.getBucketStore());
Assert.assertTrue(clonedManager.bucketStore.equals(manager.bucketStore));
Assert.assertTrue(clonedManager.writeEventKeysOnly==manager.writeEventKeysOnly);
Assert.assertTrue(clonedManager.noOfBuckets==manager.noOfBuckets);
Assert.assertTrue(clonedManager.noOfBucketsInMemory==manager.noOfBucketsInMemory);
Assert.assertTrue(clonedManager.maxNoOfBucketsInMemory==manager.maxNoOfBucketsInMemory);
Assert.assertTrue(clonedManager.millisPreventingBucketEviction== manager.millisPreventingBucketEviction);
Assert.assertTrue(clonedManager.committedWindow==manager.committedWindow);
Assert.assertTrue(clonedManager.getMaxTimesPerBuckets().length== manager.getMaxTimesPerBuckets().length);
}
@BeforeClass
public static void setup() throws Exception
{
applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
manager = new TestBucketManager<DummyEvent>();
manager.setBucketSpanInMillis(BUCKET_SPAN);
ExpirableHdfsBucketStore<DummyEvent> bucketStore = new ExpirableHdfsBucketStore<DummyEvent>();
manager.setBucketStore(bucketStore);
bucketStore.setConfiguration(0, applicationPath, Sets.newHashSet(0), 0);
bucketStore.setup();
manager.startService(new BucketManagerTest.TestStorageManagerListener());
}
@AfterClass
public static void teardown() throws IOException
{
manager.shutdownService();
Path root = new Path(applicationPath);
FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
fs.delete(root, true);
}
}