blob: 3c54e672641b712909ad3afb54251422b827782a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.logging;
import static org.apache.geode.logging.internal.spi.LogWriterLevel.ALL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ErrorCollector;
import org.apache.geode.LogWriter;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.LoggingTest;
/**
* Integration tests for (new multi-threaded) {@link MergeLogFiles} utility.
*/
@Category(LoggingTest.class)
public class MergeLogFilesIntegrationTest {
private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
private final Object lock = new Object();
/** The next integer to be written to the log */
private int next;
@Rule
public ErrorCollector errorCollector = new ErrorCollector();
/**
* A bunch of threads write a strictly increasing integer to log "files" stored in byte arrays.
* The files are merged and the order is verified.
*/
@Test
public void testMultipleThreads() throws Exception {
// Spawn a bunch of threads that write to a log
WorkerGroup group = new WorkerGroup("Workers");
Collection<Worker> workers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Worker worker = new Worker("Worker " + i, group);
workers.add(worker);
worker.start();
}
for (Worker worker : workers) {
ThreadUtils.join(worker, TIMEOUT_MILLIS);
}
// Merge the log files together
Map<String, InputStream> logs = new HashMap<>();
for (Worker worker : workers) {
logs.put(worker.getName(), worker.getInputStream());
}
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
MergeLogFiles.mergeLogFiles(logs, pw);
// Verify that the entries are sorted
BufferedReader br = new BufferedReader(new StringReader(sw.toString()));
Pattern pattern = Pattern.compile("^Worker \\d+: .* VALUE: (\\d+)");
int lastValue = -1;
while (br.ready()) {
String line = br.readLine();
if (line == null) {
break;
}
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
int value = Integer.parseInt(matcher.group(1));
assertThat(value).isGreaterThan(lastValue);
lastValue = value;
}
}
assertThat(lastValue).isEqualTo(999);
}
/**
* A {@code ThreadGroup} for workers
*/
private class WorkerGroup extends ThreadGroup {
/**
* Creates a new {@code WorkerGroup} with the given name
*/
private WorkerGroup(String name) {
super(name);
}
@Override
public void uncaughtException(Thread t, Throwable e) {
errorCollector.addError(e);
}
}
/**
* Writes a strictly increasing number to a log "file" stored in a byte array. Waits a random
* amount of time between writing entries.
*/
private class Worker extends Thread {
/** The input stream for reading from the log file */
private InputStream in;
/** A random number generator */
private final Random random;
/**
* Creates a new {@code Worker} with the given name
*/
private Worker(String name, ThreadGroup group) {
super(group, name);
random = new Random();
}
@Override
public void run() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
LogWriter logWriter = new LocalLogWriter(ALL.intLevel(), new PrintStream(baos, true));
for (int i = 0; i < 100; i++) {
synchronized (lock) {
int value = next++;
// Have to log with the lock to guarantee ordering
logWriter.info("VALUE: " + value);
try {
// Make sure that no two entries are at the same
// millisecond. Since Windows doesn't have millisecond
// granularity, we have to wait at least ten. Sigh.
// DJM - even at 10, some entries have the same timestamp
// on some PCs.
Thread.sleep(15);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
try {
Thread.sleep(random.nextInt(250));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
in = new ByteArrayInputStream(baos.toByteArray());
}
/**
* Returns an {@code InputStream} for reading from the log that this worker wrote.
*/
private InputStream getInputStream() {
return in;
}
}
}