blob: abe464826b3b955d356c8741393f902bca47cd9b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.gateway.audit;
import org.apache.hadoop.gateway.audit.log4j.appender.JdbmQueue;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
public class JdbmQueueTest {
private File file;
private JdbmQueue<String> queue;
@Before
public void setup() throws IOException {
file = new File( "target/JdbmQueueTest" );
cleanup();
queue = new JdbmQueue<String>( file );
}
@After
public void cleanup() throws IOException {
if( queue != null ) {
queue.close();
queue = null;
}
LogManager.shutdown();
String absolutePath = "target/audit";
File db = new File( absolutePath + ".db" );
if( db.exists() ) {
assertThat( "Failed to delete audit store db file.", db.delete(), is( true ) );
}
File lg = new File( absolutePath + ".lg" );
if( lg.exists() ) {
assertThat( "Failed to delete audit store lg file.", lg.delete(), is( true ) );
}
PropertyConfigurator.configure( ClassLoader.getSystemResourceAsStream( "audit-log4j.properties" ) );
}
@Test
public void testSimple() throws IOException, InterruptedException {
System.out.println( "Running " + Thread.currentThread().getStackTrace()[1].getClassName() + "#" + Thread.currentThread().getStackTrace()[1].getMethodName() );
String one = UUID.randomUUID().toString();
String two = UUID.randomUUID().toString();
String three = UUID.randomUUID().toString();
String four = UUID.randomUUID().toString();
queue.enqueue( one );
assertThat( queue.dequeue(), is( one ) );
queue.enqueue( two );
queue.enqueue( three );
assertThat( queue.dequeue(), is( two ) );
assertThat( queue.dequeue(), is( three ) );
final AtomicInteger counter = new AtomicInteger( 0 );
queue.enqueue( four );
queue.process( new JdbmQueue.Consumer<String>() {
@Override
public boolean consume( String s ) {
counter.incrementAndGet();
return true;
}
} );
assertThat( counter.get(), is( 1 ) );
}
@Ignore
@Test
public void testPerformanceAndStorageFootprint() throws IOException, InterruptedException {
System.out.println( "Running " + Thread.currentThread().getStackTrace()[1].getClassName() + "#" + Thread.currentThread().getStackTrace()[1].getMethodName() );
String fill = createFillString( 100 );
File dbFile = new File( file.getAbsolutePath() + ".db" );
File lgFile = new File( file.getAbsolutePath() + ".lg" );
String s = null;
long writeCount = 0;
long writeTime = 0;
long before;
int iterations = 10000;
for( int i=0; i<iterations; i++ ) {
s = UUID.randomUUID().toString() + ":" + fill;
before = System.currentTimeMillis();
queue.enqueue( s );
writeTime += ( System.currentTimeMillis() - before );
writeCount++;
}
System.out.println( String.format( "Line: len=%d", s.length() ) );
System.out.println( String.format( "Perf: avg=%.4fs, tot=%.2fs, cnt=%d", ( (double)writeTime / (double)writeCount / 1000.0 ), (double)writeTime/1000.0, writeCount ) );
System.out.println( String.format(
"File: db=%s, lg=%s, tot=%s, per=%s",
humanReadableSize( dbFile.length() ),
humanReadableSize( lgFile.length() ),
humanReadableSize( dbFile.length() + lgFile.length() ),
humanReadableSize( ( ( dbFile.length() + lgFile.length() ) / writeCount ) ) ) );
}
@Ignore
@Test
public void testFileGrowth() throws IOException, InterruptedException {
System.out.println( "Running " + Thread.currentThread().getStackTrace()[1].getClassName() + "#" + Thread.currentThread().getStackTrace()[1].getMethodName() );
String fill = createFillString( 100 );
File dbFile = new File( file.getAbsolutePath() + ".db" );
File lgFile = new File( file.getAbsolutePath() + ".lg" );
String s = null;
long writeCount = 0;
long writeTime = 0;
long before;
int iterations = 10000;
for( int i=0; i<iterations; i++ ) {
s = UUID.randomUUID().toString() + ":" + fill;
before = System.currentTimeMillis();
queue.enqueue( s );
assertThat( queue.dequeue(), is( s ) );
writeTime += ( System.currentTimeMillis() - before );
writeCount++;
}
System.out.println( String.format( "Line: len=%d", s.length() ) );
System.out.println( String.format( "Perf: avg=%.4fs, tot=%.2fs, cnt=%d", ( (double)writeTime / (double)writeCount / 1000.0 ), (double)writeTime/1000.0, writeCount ) );
System.out.println( String.format(
"File: db=%s, lg=%s, tot=%s, per=%s",
humanReadableSize( dbFile.length() ),
humanReadableSize( lgFile.length() ),
humanReadableSize( dbFile.length() + lgFile.length() ),
humanReadableSize( ( ( dbFile.length() + lgFile.length() ) / writeCount ) ) ) );
}
@Test( timeout = 120000 )
public void testConcurrentConsumer() throws InterruptedException, IOException {
System.out.println( "Running " + Thread.currentThread().getStackTrace()[1].getClassName() + "#" + Thread.currentThread().getStackTrace()[1].getMethodName() );
int iterations = 100;
HashSet<String> consumed = new HashSet<String>();
Consumer consumer = new Consumer( consumed );
consumer.start();
Producer producer1 = new Producer( iterations );
producer1.start();
Producer producer2 = new Producer( iterations );
producer2.start();
producer1.join();
producer2.join();
while (consumed.size() < iterations * 2) {
Thread.sleep( 5 );
}
queue.stop();
consumer.join();
assertThat( consumed, hasSize( iterations * 2 ) );
}
@Test( timeout=120000 )
public void testConcurrentProcessor() throws InterruptedException, IOException {
System.out.println( "Running " + Thread.currentThread().getStackTrace()[1].getClassName() + "#" + Thread.currentThread().getStackTrace()[1].getMethodName() );
int iterations = 100;
HashSet<String> consumed = new HashSet<String>();
Processor consumer = new Processor( consumed );
consumer.start();
Producer producer1 = new Producer( iterations );
producer1.start();
Producer producer2 = new Producer( iterations );
producer2.start();
producer1.join();
producer2.join();
while (consumed.size() < iterations * 2) {
Thread.sleep( 5 );
}
queue.stop();
consumer.join();
assertThat( consumed, hasSize( iterations * 2 ) );
}
public class Producer extends Thread {
public int iterations;
public Producer( int iterations ) {
this.iterations = iterations;
}
public void run() {
try {
for( int i = 0; i < iterations; i++ ) {
queue.enqueue( UUID.randomUUID().toString() );
}
} catch ( Throwable t ) {
t.printStackTrace();
}
}
}
public class Consumer extends Thread {
public Set<String> consumed;
public Consumer( Set<String> consumed ) {
this.consumed = consumed;
}
public void run() {
try {
while( true ) {
String s = queue.dequeue();
if( s == null ) {
return;
} else if( consumed.contains( s ) ) {
System.out.println( "DUPLICATE " + s );
System.exit( 1 );
} else {
consumed.add( s );
}
}
} catch ( Throwable t ) {
t.printStackTrace();
}
}
}
public class Processor extends Thread {
public Set<String> consumed;
public Processor( Set<String> consumed ) {
this.consumed = consumed;
}
public void run() {
try {
final AtomicBoolean done = new AtomicBoolean( false );
while( !done.get() ) {
queue.process( new JdbmQueue.Consumer<String>() {
@Override
public boolean consume( String s ) {
if( s == null ) {
done.set( true );
} else if( consumed.contains( s ) ) {
System.out.println( "DUPLICATE " + s );
System.exit( 1 );
} else {
consumed.add( s );
}
return true;
}
} );
}
} catch ( Throwable t ) {
t.printStackTrace();
}
}
}
public static String humanReadableSize( long size ) {
if(size <= 0) return "0";
final String[] units = new String[] { "B", "KB", "MB", "GB", "TB" };
int digitGroups = (int) (Math.log10(size)/Math.log10(1024));
return new DecimalFormat("#,##0.#").format(size/Math.pow(1024, digitGroups)) + " " + units[digitGroups];
}
public static String createFillString( int size ) {
StringBuilder s = new StringBuilder();
for( int i=0; i<size; i++ ) {
s.append( UUID.randomUUID().toString() );
s.append( "+" );
}
return s.toString();
}
}