blob: 3472bc6490214829bf405ffa93b3fbc7aed8c899 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.util;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.util.Random;
/**
* A {@code ByteArrayInputStream} where the full data is not available at the beginning but produced on the fly. Simulates
* an external process generating the data.
*
* <p>
* {@code simulateSlowWriting} simulates when the process writes data in chunks instead of writing all the data
* at the beginning.
* <p>
* It is possible to specify the pause times before writing the next chunk.
* <p>
* It is possible to specify {@code catBufferSize}, if the buffer contains more unread data, the process reports
* itself as running.
*
*/
public class BlockingInputStream extends ByteArrayInputStream {
private static final int CHUNK_WRITE_DELAY_MS = 500;
private static final double CHUNK_WRITE_MIN_RATIO = 0.25;
private int readPosition;
private int writePosition;
private boolean simulateSlowWriting;
private int bufferSize;
private long lastTimeDataWritten;
private int pauseIndex;
private int[] pauseTimes = {CHUNK_WRITE_DELAY_MS};
private boolean failed;
BlockingInputStream(byte buf[], boolean simulateSlowWriting) {
super(buf);
this.bufferSize = buf.length;
this.simulateSlowWriting = simulateSlowWriting;
writeFirstChunk(buf, simulateSlowWriting);
}
void setBufferSize(int bufferSize) {
if (!simulateSlowWriting) {
throw new IllegalArgumentException("Cannot specify bufferSize");
}
this.bufferSize = bufferSize;
}
void setPauseTimes(int[] pauseTimes) {
this.pauseTimes = pauseTimes;
}
private void writeFirstChunk(byte[] buf, boolean simulateSlowWriting) {
if (simulateSlowWriting) {
writeNextChunk(0);
}
else {
writeNextChunk(buf.length);
}
}
@Override
public synchronized int read(byte b[], int off, int len) {
final int bytesReadyToRead = writePosition - readPosition;
final int bytesToRead = Math.min(len, bytesReadyToRead);
final int readBytes = super.read(b, off, bytesToRead);
final boolean someBytesRead = readBytes != -1;
if (someBytesRead) {
readPosition += readBytes;
}
return readBytes;
}
@Override
public synchronized int available() {
return writePosition - readPosition;
}
@VisibleForTesting
boolean checkBlockedAndTryWriteNextChunk() {
boolean fullBufferReady = writePosition == buf.length;
boolean readFullBuffer = readPosition == buf.length;
boolean thereAreNoBytesInBuffer = writePosition - readPosition == 0;
boolean isBlocked = failed || (!readFullBuffer && (!fullBufferReady || thereAreNoBytesInBuffer ));
tryWriteNextChunk();
return isBlocked;
}
private boolean readyToWriteNextChunk() {
return System.currentTimeMillis() - lastTimeDataWritten > getPauseTimeMs(pauseIndex);
}
private int getPauseTimeMs(int index) {
return pauseTimes[index % pauseTimes.length];
}
private void tryWriteNextChunk() {
if (!failed && writePosition < buf.length && available() < bufferSize && readyToWriteNextChunk()) {
int maxSize = Math.min(bufferSize - available(), buf.length - writePosition);
int minSize = Math.min((int)(CHUNK_WRITE_MIN_RATIO * buf.length +1), maxSize);
int nextChunkSize = new Random().nextInt((maxSize - minSize) + 1) + minSize;
writeNextChunk(nextChunkSize);
}
}
private void writeNextChunk(int chunkSize) {
writePosition += chunkSize;
lastTimeDataWritten = System.currentTimeMillis();
if (chunkSize > 0) {
++pauseIndex;
}
}
void simulateFailure() {
failed = true;
}
}