blob: eb07378493b0f8fa3b64e7d1870710493c057cb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.AbstractCloseableRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
/**
* Tests for the {@link AbstractCloseableRegistry}.
*/
public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
protected ProducerThread[] streamOpenThreads;
protected AbstractCloseableRegistry<C, T> closeableRegistry;
protected AtomicInteger unclosedCounter;
protected abstract C createCloseable();
protected abstract AbstractCloseableRegistry<C, T> createRegistry();
protected abstract ProducerThread<C, T> createProducerThread(
AbstractCloseableRegistry<C, T> registry,
AtomicInteger unclosedCounter,
int maxStreams);
public void setup(int maxStreams) {
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
this.closeableRegistry = createRegistry();
this.unclosedCounter = new AtomicInteger(0);
this.streamOpenThreads = new ProducerThread[10];
for (int i = 0; i < streamOpenThreads.length; ++i) {
streamOpenThreads[i] = createProducerThread(closeableRegistry, unclosedCounter, maxStreams);
}
}
protected void startThreads() {
for (ProducerThread t : streamOpenThreads) {
t.start();
}
}
protected void joinThreads() throws InterruptedException {
for (Thread t : streamOpenThreads) {
t.join();
}
}
@Test
public void testClose() throws Exception {
setup(Integer.MAX_VALUE);
startThreads();
for (int i = 0; i < 5; ++i) {
System.gc();
Thread.sleep(40);
}
closeableRegistry.close();
joinThreads();
Assert.assertEquals(0, unclosedCounter.get());
Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
final C testCloseable = spy(createCloseable());
try {
closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
} catch (IOException expected) {
//expected
}
Assert.assertEquals(0, unclosedCounter.get());
Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
verify(testCloseable).close();
}
@Test
public void testNonBlockingClose() throws Exception {
setup(Integer.MAX_VALUE);
final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
final OneShotLatch blockCloseLatch = new OneShotLatch();
final C spyCloseable = spy(createCloseable());
doAnswer(invocationOnMock -> {
invocationOnMock.callRealMethod();
waitRegistryClosedLatch.trigger();
blockCloseLatch.await();
return null;
}).when(spyCloseable).close();
closeableRegistry.registerCloseable(spyCloseable);
Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
Thread closer = new Thread(() -> {
try {
closeableRegistry.close();
} catch (IOException ignore) {
}
});
closer.start();
waitRegistryClosedLatch.await();
final C testCloseable = spy(createCloseable());
try {
closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
} catch (IOException ignored) {}
blockCloseLatch.trigger();
closer.join();
verify(spyCloseable).close();
verify(testCloseable).close();
Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
}
/**
* A testing producer.
*/
protected abstract static class ProducerThread<C extends Closeable, T> extends Thread {
protected final AbstractCloseableRegistry<C, T> registry;
protected final AtomicInteger refCount;
protected final int maxStreams;
protected int numStreams;
public ProducerThread(AbstractCloseableRegistry<C, T> registry, AtomicInteger refCount, int maxStreams) {
this.registry = registry;
this.refCount = refCount;
this.maxStreams = maxStreams;
this.numStreams = 0;
}
protected abstract void createAndRegisterStream() throws IOException;
@Override
public void run() {
try {
while (numStreams < maxStreams) {
createAndRegisterStream();
try {
Thread.sleep(2);
} catch (InterruptedException ignored) {}
if (maxStreams != Integer.MAX_VALUE) {
++numStreams;
}
}
} catch (Exception ex) {
// ignored
}
}
}
/**
* Testing stream which adds itself to a reference counter while not closed.
*/
protected static final class TestStream extends FSDataInputStream {
protected AtomicInteger refCount;
public TestStream(AtomicInteger refCount) {
this.refCount = refCount;
refCount.incrementAndGet();
}
@Override
public void seek(long desired) throws IOException {
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public int read() throws IOException {
return 0;
}
@Override
public synchronized void close() throws IOException {
refCount.decrementAndGet();
}
}
}