blob: a945814ae4bcb699ffadcd5dc3a0aa5da35c7dc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled("Tests are time-based")
public class TestLeakyBucketThrottler {
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void testOutputStreamInterface() throws IOException {
// throttle rate at 1 MB/sec
final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final byte[] data = new byte[1024 * 1024 * 4];
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final OutputStream throttledOut = throttler.newThrottledOutputStream(baos)) {
final long start = System.currentTimeMillis();
throttledOut.write(data);
throttler.close();
final long millis = System.currentTimeMillis() - start;
// should take 4 sec give or take
assertTrue(millis > 3000);
assertTrue(millis < 6000);
}
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void testInputStreamInterface() throws IOException {
final byte[] data = new byte[1024 * 1024 * 4];
// throttle rate at 1 MB/sec
try ( final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final ByteArrayInputStream bais = new ByteArrayInputStream(data);
final InputStream throttledIn = throttler.newThrottledInputStream(bais);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final byte[] buffer = new byte[4096];
final long start = System.currentTimeMillis();
int len;
while ((len = throttledIn.read(buffer)) > 0) {
baos.write(buffer, 0, len);
}
final long millis = System.currentTimeMillis() - start;
// should take 4 sec give or take
assertTrue(millis > 3000);
assertTrue(millis < 6000);
}
}
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void testDirectInterface() throws IOException, InterruptedException {
// throttle rate at 1 MB/sec
try (final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
// create 3 threads, each sending ~2 MB
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 3; i++) {
final Thread t = new WriterThread(i, throttler, baos);
threads.add(t);
}
final long start = System.currentTimeMillis();
for (final Thread t : threads) {
t.start();
}
for (final Thread t : threads) {
t.join();
}
final long elapsed = System.currentTimeMillis() - start;
throttler.close();
// To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to
// allow for busy-ness and the fact that we could write a tiny bit more than the limit.
assertTrue(elapsed > 5000);
assertTrue(elapsed < 7000);
// ensure bytes were copied out appropriately
assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size());
assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]);
}
}
private static class WriterThread extends Thread {
private final int idx;
private final byte[] data = new byte[1024 * 1024 * 2 + 1];
private final LeakyBucketStreamThrottler throttler;
private final OutputStream out;
public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) {
this.idx = idx;
this.throttler = throttler;
this.out = out;
this.data[this.data.length - 1] = (byte) 'A';
}
@Override
public void run() {
long startMillis = System.currentTimeMillis();
long bytesWritten = 0L;
try {
throttler.copy(new ByteArrayInputStream(data), out);
} catch (IOException e) {
e.printStackTrace();
return;
}
long now = System.currentTimeMillis();
long millisElapsed = now - startMillis;
bytesWritten += data.length;
float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F;
System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec");
}
}
}