blob: bbff6bc4f8b49b71009c75e00d74f97f126c7b39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.logging;
import static org.apache.geode.internal.logging.LogWriterLevel.ALL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.LogWriter;
import org.apache.geode.SystemFailure;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.LoggingTest;
/**
* Integration tests for (new multi-threaded) {@link MergeLogFiles} utility.
*/
@Category(LoggingTest.class)
public class MergeLogFilesIntegrationTest {
/** The next integer to be written to the log */
private int next;
/**
* A bunch of threads write a strictly increasing integer to log "files" stored in byte arrays.
* The files are merged and the order is verified.
*/
@Test
public void testMultipleThreads() throws Exception {
// Spawn a bunch of threads that write to a log
WorkerGroup group = new WorkerGroup("Workers");
List workers = new ArrayList();
for (int i = 0; i < 10; i++) {
Worker worker = new Worker("Worker " + i, group);
workers.add(worker);
worker.start();
}
for (Iterator iter = workers.iterator(); iter.hasNext();) {
Worker worker = (Worker) iter.next();
ThreadUtils.join(worker, 120 * 1000);
}
if (group.exceptionOccurred()) {
fail(group.getExceptionString());
}
// Merge the log files together
Map<String, InputStream> logs = new HashMap<>();
for (int i = 0; i < workers.size(); i++) {
Worker worker = (Worker) workers.get(i);
logs.put(worker.getName(), worker.getInputStream());
}
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
MergeLogFiles.mergeLogFiles(logs, pw);
// System.out.println(sw.toString());
// Verify that the entries are sorted
BufferedReader br = new BufferedReader(new StringReader(sw.toString()));
Pattern pattern = Pattern.compile("^Worker \\d+: .* VALUE: (\\d+)");
int lastValue = -1;
while (br.ready()) {
String line = br.readLine();
if (line == null) {
break;
}
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
int value = Integer.parseInt(matcher.group(1));
assertTrue(lastValue + " <= " + value, value > lastValue);
lastValue = value;
}
}
assertEquals(999, lastValue);
}
/**
* A <code>ThreadGroup</code> for workers
*/
private static class WorkerGroup extends ThreadGroup {
/**
* Did an uncaught exception occur in one of this group's threads?
*/
private boolean exceptionOccurred;
/**
* A <code>StringBuffer</code> containing a description of the uncaught exceptions thrown by the
* worker threads.
*/
private final StringBuffer sb;
/**
* Creates a new <code>WorkerGroup</code> with the given name
*/
WorkerGroup(String name) {
super(name);
sb = new StringBuffer();
}
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof VirtualMachineError) {
SystemFailure.setFailure((VirtualMachineError) e); // don't throw
}
sb.append("Uncaught exception in thread ");
sb.append(t.getName());
sb.append("\n");
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw, true));
sb.append(sw.toString());
sb.append("\n");
exceptionOccurred = true;
}
/**
* Returns whether or not an uncaught exception occurred in one of the worker threads.
*/
public boolean exceptionOccurred() {
return exceptionOccurred;
}
/**
* Returns a string describing the uncaught exception(s) that occurred in the worker threads.
*/
String getExceptionString() {
return sb.toString();
}
}
/**
* Writes a strictly increasing number to a log "file" stored in a byte array. Waits a random
* amount of time between writing entries.
*/
private class Worker extends Thread {
/** The input stream for reading from the log file */
private InputStream in;
/** A random number generator */
private final Random random;
/**
* Creates a new <code>Worker</code> with the given name
*/
public Worker(String name, ThreadGroup group) {
super(group, name);
random = new Random();
}
@Override
public void run() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
LogWriter logger =
new LocalLogWriter(ALL.intLevel(), new PrintStream(baos, true));
for (int i = 0; i < 100; i++) {
int n;
synchronized (MergeLogFilesIntegrationTest.this) {
n = next++;
// Have to log with the lock to guarantee ordering
logger.info("VALUE: " + n);
try {
// Make sure that no two entries are at the same
// millisecond. Since Windows doesn't have millisecond
// granularity, we have to wait at least ten. Sigh.
// DJM - even at 10, some entries have the same timestamp
// on some PCs.
Thread.sleep(15);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break; // TODO
}
}
try {
Thread.sleep(random.nextInt(250));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break; // TODO
}
}
// System.out.println(baos.toString());
in = new ByteArrayInputStream(baos.toByteArray());
}
/**
* Returns an <code>InputStream</code> for reading from the log that this worker wrote.
*/
InputStream getInputStream() {
return in;
}
}
}