blob: a70a314371f45abaceaf922705ae67507e07e066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ace.log.server.store.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
import org.apache.ace.range.RangeIterator;
import org.apache.ace.range.SortedRangeSet;
import org.apache.ace.test.utils.TestUtils;
import org.osgi.service.event.EventAdmin;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Test cases for {@link LogStoreImpl}.
*/
public class LogStoreImplConcurrencyTest {
private static final String TARGET_ID = "targetId";
private static final long STORE_ID = 12345;
private static class Reader implements Runnable {
private final String m_name;
private final CountDownLatch m_start;
private final CountDownLatch m_stop;
private final LogStoreImpl m_store;
private final ConcurrentMap<Long, Boolean> m_seen = new ConcurrentHashMap<>();
private final int m_count;
public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count) {
this(store, start, stop, count, 0);
}
public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count, int initial) {
m_name = "Reader-" + initial;
m_store = store;
m_start = start;
m_stop = stop;
m_count = count;
}
@Override
public void run() {
Random rnd = new Random();
try {
m_start.await();
System.out.printf("Reader (%s) starting to read %d records...%n", m_name, m_count);
while (m_seen.size() < m_count) {
try {
if (rnd.nextInt(1000) >= 995) {
// perform a random cleanup...
m_store.clean();
}
List<Descriptor> descriptors = m_store.getDescriptors(TARGET_ID);
for (Descriptor desc : descriptors) {
SortedRangeSet rangeSet = desc.getRangeSet();
RangeIterator rangeIter = rangeSet.iterator();
while (rangeIter.hasNext()) {
m_seen.putIfAbsent(Long.valueOf(rangeIter.next()), Boolean.TRUE);
}
}
}
catch (IOException e) {
System.out.printf("I/O exception (%s) caught: %s in %s.%n", e.getClass().getSimpleName(), e.getMessage(), getCaller(e));
}
}
System.out.printf("Reader (%s) finished with %d records read...%n", m_name, m_seen.size());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
m_stop.countDown();
System.out.println("Ending reader (" + m_name + ")");
}
}
}
private static class Writer implements Runnable {
private final String m_name;
private final CountDownLatch m_start;
private final CountDownLatch m_stop;
private final LogStoreImpl m_store;
private final ConcurrentMap<Long, Event> m_written = new ConcurrentHashMap<>();
private final int m_count;
private final int m_initValue;
private final int m_stepSize;
public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count) {
this(store, start, stop, count, 0, 1);
}
public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int count, int initial, int stepSize) {
m_name = "Writer-" + initial;
m_store = store;
m_start = start;
m_stop = stop;
m_count = count;
m_initValue = initial;
m_stepSize = stepSize;
}
@Override
public void run() {
Random rnd = new Random();
try {
m_start.await();
System.out.printf("Writer (%s) starts writing %d records...%n", m_name, m_count);
for (int i = m_initValue; i < m_count; i += m_stepSize) {
long id = i;
Event event = new Event(TARGET_ID, STORE_ID, id, id, rnd.nextInt(10));
try {
m_store.put(Arrays.asList(event));
m_written.putIfAbsent(Long.valueOf(id), event);
}
catch (Exception exception) {
// Ignore...
}
}
System.out.printf("Writer (%s) finished with %d records written...%n", m_name, m_written.size());
}
catch (InterruptedException e) {
// ok, stop...
}
finally {
m_stop.countDown();
System.out.println("Ending writer (" + m_name + ")");
}
}
}
private static String getCaller(Exception e) {
StringBuilder sb = new StringBuilder();
StackTraceElement[] st = e.getStackTrace();
int n = Math.min(st.length, 1);
int m = Math.min(st.length, 4);
for (int i = n; i < m; i++) {
if (i > n) {
sb.append(" -> ");
}
StackTraceElement ste = st[i];
sb.append(ste.getClassName()).append(".").append(ste.getMethodName()).append("(").append(ste.getLineNumber()).append(")");
}
return sb.toString();
}
private File m_baseDir;
private ExecutorService m_executor;
private CompletionService<Boolean> m_completionService;
/**
* Tests that concurrent use of a {@link LogStoreImpl} with multiple readers and multiple writers works as expected.
*/
@Test(enabled = false)
public void testConcurrentUseMultipleReaderAndMultipleWriters() throws Exception {
File storeFile = File.createTempFile("feedback", ".store");
storeFile.deleteOnExit();
final int recordCount = 10000;
final int readerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
final LogStoreImpl store = createLogStore();
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch stop = new CountDownLatch(writerCount + readerCount);
Writer[] writers = new Writer[writerCount];
for (int i = 0; i < writerCount; i++) {
writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
}
Reader[] readers = new Reader[readerCount];
for (int i = 0; i < readerCount; i++) {
readers[i] = new Reader(store, start, stop, recordCount, i);
}
// gents, start your engines...
for (int i = 0; i < readers.length; i++) {
m_completionService.submit(readers[i], Boolean.TRUE);
}
for (int i = 0; i < writers.length; i++) {
m_completionService.submit(writers[i], Boolean.TRUE);
}
// 3, 2, 1... GO...
start.countDown();
// waiting for all threads to finish...
for (int i = 0, r = 0; r < 10 && i < writerCount + readerCount; i++) {
Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
if (future == null) {
r++;
}
}
assertTrue(stop.await(5, TimeUnit.SECONDS));
int readCount = 0;
for (int i = 0; i < readers.length; i++) {
readCount += readers[i].m_seen.size();
}
int writtenCount = 0;
for (int i = 0; i < writers.length; i++) {
writtenCount += writers[i].m_written.size();
}
assertEquals(recordCount, writtenCount, "Not all records were written?");
// All readers read the exact same data, so we've got N copies of it...
assertEquals(readCount, readerCount * writtenCount, "Not all records were seen?");
verifyStoreContents(store, recordCount, writers);
}
/**
* Tests that concurrent use of a {@link LogStoreImpl} with a single reader and multiple writers works as expected.
*/
@Test(enabled = false)
public void testConcurrentUseSingleReaderAndMultipleWriters() throws Exception {
File storeFile = File.createTempFile("feedback", ".store");
storeFile.deleteOnExit();
final int recordCount = 10000;
final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
final LogStoreImpl store = createLogStore();
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch stop = new CountDownLatch(writerCount + 1);
Writer[] writers = new Writer[writerCount];
for (int i = 0; i < writerCount; i++) {
writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
}
Reader reader = new Reader(store, start, stop, recordCount);
// gents, start your engines...
m_completionService.submit(reader, Boolean.TRUE);
for (int i = 0; i < writers.length; i++) {
m_completionService.submit(writers[i], Boolean.TRUE);
}
// 3, 2, 1... GO...
start.countDown();
// waiting for all threads to finish...
for (int i = 0, r = 0; r < 10 && i < writerCount + 1; i++) {
Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
if (future == null) {
r++;
}
}
assertTrue(stop.await(5, TimeUnit.SECONDS));
int writtenCount = 0;
for (int i = 0; i < writers.length; i++) {
writtenCount += writers[i].m_written.size();
}
int readCount = reader.m_seen.size();
assertEquals(recordCount, writtenCount, "Not all records were written?");
assertEquals(readCount, writtenCount, "Not all records were seen?");
verifyStoreContents(store, recordCount, writers);
}
/**
* Tests that concurrent use of a {@link LogStoreImpl} with a single reader and writer works as expected.
*/
@Test
public void testConcurrentUseSingleReaderAndSingleWriter() throws Exception {
File storeFile = File.createTempFile("feedback", ".store");
storeFile.deleteOnExit();
final int recordCount = 10000;
final LogStoreImpl store = createLogStore();
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch stop = new CountDownLatch(2);
Writer writer = new Writer(store, start, stop, recordCount);
Reader reader = new Reader(store, start, stop, recordCount);
// gents, start your engines...
m_completionService.submit(writer, Boolean.TRUE);
m_completionService.submit(reader, Boolean.TRUE);
// 3, 2, 1... GO...
start.countDown();
// waiting both threads to finish...
assertTrue(stop.await(120, TimeUnit.SECONDS));
int writeCount = writer.m_written.size();
int readCount = reader.m_seen.size();
assertEquals(recordCount, writeCount, "Not all records were written?");
assertEquals(readCount, writeCount, "Not all records were seen?");
verifyStoreContents(store, recordCount, writer);
}
@Test
public void testTimedWrite() throws Exception {
File storeFile = File.createTempFile("feedback", ".store");
storeFile.deleteOnExit();
final int recordCount = 10000;
final LogStoreImpl store = createLogStore();
long start = System.nanoTime();
for (int i = 0; i < recordCount; i++) {
store.put(Arrays.asList(new Event("1,2,3,4,5")));
}
long end = System.nanoTime();
System.out.printf("Writing %d records took %.3f ms.%n", recordCount, (end - start) / 1.0e6);
}
@BeforeMethod(alwaysRun = true)
protected void setUp() throws Exception {
m_baseDir = File.createTempFile("logstore", "txt");
m_baseDir.delete();
m_baseDir.mkdirs();
m_executor = Executors.newCachedThreadPool();
m_completionService = new ExecutorCompletionService<>(m_executor);
}
@AfterMethod(alwaysRun = true)
protected void tearDown() throws InterruptedException {
m_executor.shutdownNow();
m_executor.awaitTermination(5, TimeUnit.SECONDS);
}
private LogStoreImpl createLogStore() throws Exception {
LogStoreImpl logStore = new LogStoreImpl(m_baseDir, "log");
TestUtils.configureObject(logStore, EventAdmin.class);
logStore.start();
return logStore;
}
private void verifyStoreContents(final LogStoreImpl store, final int count, Writer... writers) throws IOException {
// Verify the written file...
List<Descriptor> descriptors = store.getDescriptors();
long expectedID = 0;
for (Descriptor desc : descriptors) {
SortedRangeSet rangeSet = desc.getRangeSet();
RangeIterator rangeIter = rangeSet.iterator();
while (rangeIter.hasNext()) {
long id = rangeIter.next();
Event expectedEntry = null;
for (int i = 0; (expectedEntry == null) && i < writers.length; i++) {
expectedEntry = writers[i].m_written.remove(id);
}
assertNotNull(expectedEntry, "Event ID #" + id + " never written?!");
// Test continuation of written data...
assertEquals(expectedEntry.getID(), expectedID++, "Entry ID mismatch?!");
}
}
}
}