blob: 1c30943b823121ae064ec2cfadf24e7f9df75be3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.util;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestBufferDrainer extends DrainerTestCase {
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private AssertionError assertionError;
private IOException ioException;
@Test
public void testTooEarlyInputStreamRead() {
final BufferDrainer bufferDrainer = new BufferDrainer(null, 0);
expectedException.expect(IllegalStateException.class);
bufferDrainer.getInputBuffer();
}
@Test
public void testTooEarlyErrorStreamRead() {
final BufferDrainer bufferDrainer = new BufferDrainer(null, 0);
expectedException.expect(IllegalStateException.class);
bufferDrainer.getErrorBuffer();
}
@Test
public void testMultipleDrainBufferCalls() throws IOException {
Process process = BlockingWritesExitValueProcess.createFastWritingProcess("", "");
final BufferDrainer bufferDrainer = new BufferDrainer(process, 0);
bufferDrainer.drainBuffers();
expectedException.expect(IllegalStateException.class);
bufferDrainer.drainBuffers();
}
@Test
public void testReadSinglePass() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException {
readSinglePassAndAssert(sampleString, suggestedMaxLength);
}
});
}
private void readSinglePassAndAssert(String sampleString, int suggestedMaxLength) throws IOException {
BufferedReader sampleBufferedReader = new BufferedReader(new StringReader(sampleString));
StringBuffer storageBuffer = new StringBuffer();
boolean readAll = false;
int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, storageBuffer, suggestedMaxLength, 0, readAll);
assertTrue("Some characters should have been read but none was", sampleString.length() == 0 || charRead > 0);
assertTrue("Read character count mismatch", charRead <= sampleString.length());
assertTrue("Content read mismatch", sampleString.startsWith(storageBuffer.toString()));
}
@Test
public void testReadTillAvailable() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException {
readTillAvailableAndAssert(sampleString, suggestedMaxLength);
}
});
}
private void readTillAvailableAndAssert(String sampleString, int suggestedMaxLength) throws IOException {
BufferedReader sampleBufferedReader = new BufferedReader(new StringReader(sampleString));
StringBuffer storageBuffer = new StringBuffer();
boolean readAll = true;
int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, storageBuffer, suggestedMaxLength, 0, readAll);
assertTrue("Read character count mismatch", charRead <= sampleString.length());
assertTrue("Content read mismatch", sampleString.startsWith(storageBuffer.toString()));
}
@Test
public void testDrainBuffersImmediatelyEndingProcess() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException {
checkDrainBuffers(1, sampleString, "", suggestedMaxLength);
checkDrainBuffers(1, "", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBuffers(1, sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
});
}
@Test
public void testDrainBuffersShortProcess() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException {
checkDrainBuffers(2, sampleString, "", suggestedMaxLength);
checkDrainBuffers(2, "", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBuffers(2, sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
});
}
private void checkDrainBuffers(int runningSteps, String outputString, String errorString, int maxLength) throws IOException {
Process process = mock(Process.class);
when(process.exitValue()).thenAnswer(new Answer() {
private int invocationCounter = 0;
public Object answer(InvocationOnMock invocation) {
if (++invocationCounter == runningSteps) {
return BlockingWritesExitValueProcess.EXIT_VALUE;
}
throw new IllegalThreadStateException("Process is still running");
}
});
byte []outputByteArray = outputString.getBytes(StandardCharsets.UTF_8);
byte []errorByteArray = errorString.getBytes(StandardCharsets.UTF_8);
doReturn(new ByteArrayInputStream(outputByteArray)).when(process).getInputStream();
doReturn(new ByteArrayInputStream(errorByteArray)).when(process).getErrorStream();
checkDrainBuffers(process, outputString, errorString, maxLength);
}
@Test
public void testDrainBuffersFast() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException {
checkDrainBufferFast(sampleString, "", suggestedMaxLength);
checkDrainBufferFast("", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBufferFast(sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
});
}
private void checkDrainBufferFast(String outputString, String errorString, int suggestedMaxLength)
throws IOException, InterruptedException {
Process process = BlockingWritesExitValueProcess.createFastWritingProcess(outputString, errorString);
int timeout = calculateTimeoutForTest(false, null);
drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength);
}
@Test
public void testDrainBuffersSlowWrite() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException {
checkDrainBufferSlow( sampleString, "", suggestedMaxLength);
checkDrainBufferSlow("", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBufferSlow(sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
});
}
private void checkDrainBufferSlow(String outputString, String errorString, int suggestedMaxLength)
throws IOException, InterruptedException {
int catBufferSize = Math.max(outputString.length()/2, errorString.length()/2);
Process process = BlockingWritesExitValueProcess.createBufferLimitedProcess(outputString, errorString, catBufferSize);
int []defaultPauseTimes = {500};
int timeout = calculateTimeoutForTest(true, defaultPauseTimes);
drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength);
}
private void drainProcessAndCheckTimeout(Process process, int timeout, String outputString, String errorString, int maxLength)
throws IOException, InterruptedException {
long timeBefore = System.currentTimeMillis();
assertionError = null;
Thread t = createBufferDrainerThread(process, outputString, errorString, maxLength);
t.start();
t.join(timeout);
long timeToRun = System.currentTimeMillis() - timeBefore;
assertTrue("drainBuffer test timed out after "+ timeToRun+" ms", timeToRun < timeout);
if (assertionError != null) {
throw assertionError;
}
if (ioException != null) {
throw ioException;
}
}
private int calculateTimeoutForTest(boolean simulateSlowWriting, int[] pauseTimes) {
int basicTimeout = 1000;
if (simulateSlowWriting) {
int timeout = basicTimeout;
for (int i=0; i<4; ++i) {
int nextPause = pauseTimes == null ? 500 : pauseTimes[i % pauseTimes.length];
timeout += 2 * nextPause;
}
return timeout;
}
else {
return basicTimeout;
}
}
@Test
public void testDrainBuffersLongPause() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException {
// only run some of the tests because of the speed limit
boolean runThisTest = sampleString.length() == 0 || sampleString.length() < 1024*1024 && suggestedMaxLength > 1024;
if (runThisTest) {
checkDrainBuffersLongPause( sampleString, "", suggestedMaxLength);
checkDrainBuffersLongPause("", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBuffersLongPause(sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
}
});
}
private void checkDrainBuffersLongPause(String outputString, String errorString, int suggestedMaxLength)
throws IOException, InterruptedException {
int[] longPauseMs = {10*1000};
Process process = BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, longPauseMs);
int timeout = calculateTimeoutForTest(true, longPauseMs);
drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength);
}
@Test
public void testDrainBuffersRandomPause() throws Exception {
checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() {
public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException {
checkDrainBuffersRandomPause( sampleString, "", suggestedMaxLength);
checkDrainBuffersRandomPause("", sampleString.toLowerCase(), suggestedMaxLength);
checkDrainBuffersRandomPause(sampleString, sampleString.toLowerCase(), suggestedMaxLength);
}
});
}
private void checkDrainBuffersRandomPause(String outputString, String errorString, int suggestedMaxLength)
throws IOException, InterruptedException {
int[] pauseTimes = generateRandomPauseIntervals();
Process process = BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, pauseTimes);
int timeout = calculateTimeoutForTest(true, pauseTimes);
drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength);
}
private int[] generateRandomPauseIntervals() {
int[] longPauseMs = new int[4];
for (int i=0; i<longPauseMs.length; ++i) {
longPauseMs[i] = new Random().nextInt((2000)) + 100;
}
return longPauseMs;
}
private void checkDrainBuffers(Process process, String outputString, String errorString, int maxLength) throws IOException {
BufferDrainer bufferDrainer = new BufferDrainer(process, maxLength);
int exitValue = bufferDrainer.drainBuffers();
assertEquals("Invalid exit Value", BlockingWritesExitValueProcess.EXIT_VALUE, exitValue);
StringBuffer inputBuffer = bufferDrainer.getInputBuffer();
StringBuffer errorBuffer = bufferDrainer.getErrorBuffer();
assertTrue("Invalid input buffer length", inputBuffer.toString().length() >= Math.min(outputString.length(), maxLength));
assertTrue("Invalid input buffer", outputString.startsWith(inputBuffer.toString()));
assertTrue("Invalid error buffer", errorString.startsWith(errorBuffer.toString()));
}
@Test
public void testParallelDrainBuffers() throws Exception {
String sampleString = generateString(1024);
checkParallelDrainBuffers(20, sampleString, sampleString.toLowerCase(), false);
}
@Test
public void testParallelDrainBuffersWithFailure() throws Exception {
String sampleString = generateString(1024);
checkParallelDrainBuffers(20, sampleString, sampleString.toLowerCase(), true);
}
private void checkParallelDrainBuffers(int threadNum, String outputString, String errorString, boolean alsoAddFailingThread)
throws IOException, InterruptedException {
Thread []threads = new Thread[threadNum];
assertionError = null;
int maxTimeout = 0;
if (alsoAddFailingThread) {
BlockingWritesExitValueProcess failingProcess = BlockingWritesExitValueProcess.createFastWritingProcess("","");
createBufferDrainerThread(failingProcess, "", "", 1024).start();
failingProcess.simulateFailure();
}
for (int i=0; i<threads.length; ++i) {
String randomizedOutputString = randomSubstring(outputString);
String randomizedErrorString = randomSubstring(errorString);
int[] pauseTimes = generateRandomPauseIntervals();
Process process = BlockingWritesExitValueProcess.createPausedProcess(randomizedOutputString, randomizedErrorString,
pauseTimes);
int timeout = calculateTimeoutForTest(true, pauseTimes);
if (timeout > maxTimeout) {
maxTimeout = timeout;
}
threads[i] = createBufferDrainerThread(process, randomizedOutputString, randomizedErrorString, 1024);
threads[i].start();
}
waitThreadsToFinishInTime(threads, maxTimeout);
if (assertionError != null) {
throw assertionError;
}
if (ioException != null) {
throw ioException;
}
}
private Thread createBufferDrainerThread(Process process, String randomizedOutputString, String randomizedErrorString,
int maxLength) {
return new Thread() {
public void run() {
try {
checkDrainBuffers(process, randomizedOutputString, randomizedErrorString, maxLength);
} catch (IOException e) {
ioException = e;
} catch (AssertionError e) {
assertionError = e;
}
}
};
}
private void waitThreadsToFinishInTime(Thread[] threads, int maxTimeout) throws InterruptedException {
long timeBefore = System.currentTimeMillis();
for (Thread t : threads) {
long timeToRunSoFar = System.currentTimeMillis() - timeBefore;
long remainingTime = maxTimeout - timeToRunSoFar;
if (remainingTime > 0) {
t.join(remainingTime);
}
else {
fail("drainBuffer test timed out after "+ timeToRunSoFar+" ms");
}
}
long timeToRunTotal = System.currentTimeMillis() - timeBefore;
assertTrue("drainBuffer test timed out after "+ timeToRunTotal+" ms", timeToRunTotal < maxTimeout);
}
private String randomSubstring(String string) {
int minLength = (int)(0.8 * string.length());
int maxLength = string.length();
int length = new Random().nextInt((maxLength - minLength) + 1) + minLength;
return string.substring(0, length);
}
}