blob: a0bd3e4750e3ab1945824ec935564eaf6815b572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.io.function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.BaseStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Tests {@link IOBaseStream}.
*/
public class IOBaseStreamTest {
/**
* Implements IOBaseStream with generics.
*/
private static class IOBaseStreamFixture<T, S extends IOBaseStreamFixture<T, S, B>, B extends BaseStream<T, B>> implements IOBaseStream<T, S, B> {
private final B baseStream;
private IOBaseStreamFixture(final B baseStream) {
this.baseStream = baseStream;
}
@Override
public B unwrap() {
return baseStream;
}
@SuppressWarnings("unchecked") // We are this here
@Override
public S wrap(final B delegate) {
return delegate == baseStream ? (S) this : (S) new IOBaseStreamFixture<T, S, B>(delegate);
}
}
/**
* Implements IOBaseStream with a concrete type.
*/
private static final class IOBaseStreamPathFixture<B extends BaseStream<Path, B>> extends IOBaseStreamFixture<Path, IOBaseStreamPathFixture<B>, B> {
private IOBaseStreamPathFixture(final B baseStream) {
super(baseStream);
}
@Override
public IOBaseStreamPathFixture<B> wrap(final B delegate) {
return delegate == unwrap() ? this : new IOBaseStreamPathFixture<>(delegate);
}
}
private static final class MyRuntimeException extends RuntimeException {
private static final long serialVersionUID = 1L;
public MyRuntimeException(final String message) {
super(message);
}
}
/** Sanity check */
private BaseStream<Path, ? extends BaseStream<Path, ?>> baseStream;
/** Generic version */
private IOBaseStreamFixture<Path, ? extends IOBaseStreamFixture<Path, ?, ?>, ?> ioBaseStream;
/** Concrete version */
private IOBaseStreamPathFixture<? extends BaseStream<Path, ?>> ioBaseStreamPath;
/** Adapter version */
private IOStream<Path> ioBaseStreamAdapter;
@BeforeEach
public void beforeEach() {
baseStream = createStreamOfPaths();
ioBaseStream = createIOBaseStream();
ioBaseStreamPath = createIOBaseStreamPath();
ioBaseStreamAdapter = createIOBaseStreamAdapter();
}
private IOBaseStreamFixture<Path, ?, Stream<Path>> createIOBaseStream() {
return new IOBaseStreamFixture<>(createStreamOfPaths());
}
private IOStream<Path> createIOBaseStreamAdapter() {
return IOStreamAdapter.adapt(createStreamOfPaths());
}
private IOBaseStreamPathFixture<Stream<Path>> createIOBaseStreamPath() {
return new IOBaseStreamPathFixture<>(createStreamOfPaths());
}
private Stream<Path> createStreamOfPaths() {
return Stream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B);
}
@Test
@AfterEach
public void testClose() {
baseStream.close();
ioBaseStream.close();
ioBaseStreamPath.close();
ioBaseStream.asBaseStream().close();
ioBaseStreamPath.asBaseStream().close();
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testIsParallel() {
assertFalse(baseStream.isParallel());
assertFalse(ioBaseStream.isParallel());
assertFalse(ioBaseStream.asBaseStream().isParallel());
assertFalse(ioBaseStreamPath.asBaseStream().isParallel());
assertFalse(ioBaseStreamPath.isParallel());
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testIteratorPathIO() throws IOException {
final AtomicReference<Path> ref = new AtomicReference<>();
ioBaseStream.iterator().forEachRemaining(e -> ref.set(e.toRealPath()));
assertEquals(TestConstants.ABS_PATH_B.toRealPath(), ref.get());
//
ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.set(e.getFileName()));
assertEquals(TestConstants.ABS_PATH_B.getFileName(), ref.get());
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testIteratorSimple() throws IOException {
final AtomicInteger ref = new AtomicInteger();
baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
}
@SuppressWarnings("resource")
@Test
public void testOnClose() {
// Stream
testOnClose(baseStream);
testOnClose(ioBaseStream.asBaseStream());
testOnClose(ioBaseStreamPath.asBaseStream());
}
@SuppressWarnings("resource")
private <T, S extends BaseStream<T, S>> void testOnClose(final BaseStream<T, S> stream) {
final AtomicReference<String> refA = new AtomicReference<>();
final AtomicReference<String> refB = new AtomicReference<>();
stream.onClose(() -> refA.set("A"));
stream.onClose(() -> {
throw new MyRuntimeException("B");
});
stream.onClose(() -> {
throw new MyRuntimeException("C");
});
stream.onClose(() -> refB.set("D"));
final MyRuntimeException e = assertThrows(MyRuntimeException.class, stream::close);
assertEquals("A", refA.get());
assertEquals("D", refB.get());
assertEquals("B", e.getMessage());
final Throwable[] suppressed = e.getSuppressed();
assertNotNull(suppressed);
assertEquals(1, suppressed.length);
assertEquals("C", suppressed[0].getMessage());
}
@SuppressWarnings("resource")
@Test
public void testParallel() throws IOException {
final AtomicInteger ref = new AtomicInteger();
baseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
final BaseStream<Path, ?> parallel = ioBaseStreamPath.asBaseStream().parallel();
parallel.iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
assertTrue(parallel.isParallel());
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testParallelParallel() {
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
testParallelParallel(stream);
}
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
testParallelParallel(stream);
}
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
testParallelParallel(stream);
}
try (final IOBaseStreamFixture<Path, ?, Stream<Path>> stream = createIOBaseStream()) {
testParallelParallel(stream.asBaseStream());
}
}
@SuppressWarnings("resource")
private void testParallelParallel(final BaseStream<?, ?> stream) {
final BaseStream<?, ?> seq = stream.sequential();
assertFalse(seq.isParallel());
final BaseStream<?, ?> p1 = seq.parallel();
assertTrue(p1.isParallel());
final BaseStream<?, ?> p2 = p1.parallel();
assertTrue(p1.isParallel());
assertSame(p1, p2);
}
@SuppressWarnings("resource")
private void testParallelParallel(final IOBaseStream<?, ?, ?> stream) {
final IOBaseStream<?, ?, ?> seq = stream.sequential();
assertFalse(seq.isParallel());
final IOBaseStream<?, ?, ?> p1 = seq.parallel();
assertTrue(p1.isParallel());
final IOBaseStream<?, ?, ?> p2 = p1.parallel();
assertTrue(p1.isParallel());
assertSame(p1, p2);
}
@SuppressWarnings("resource")
@Test
public void testSequential() throws IOException {
final AtomicInteger ref = new AtomicInteger();
baseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
ioBaseStreamPath.asBaseStream().sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testSequentialSequential() {
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
testSequentialSequential(stream);
}
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
testSequentialSequential(stream);
}
try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
testSequentialSequential(stream.asBaseStream());
}
}
@SuppressWarnings("resource")
private void testSequentialSequential(final BaseStream<?, ?> stream) {
final BaseStream<?, ?> p = stream.parallel();
assertTrue(p.isParallel());
final BaseStream<?, ?> seq1 = p.sequential();
assertFalse(seq1.isParallel());
final BaseStream<?, ?> seq2 = seq1.sequential();
assertFalse(seq1.isParallel());
assertSame(seq1, seq2);
}
@SuppressWarnings("resource")
private void testSequentialSequential(final IOBaseStream<?, ?, ?> stream) {
final IOBaseStream<?, ?, ?> p = stream.parallel();
assertTrue(p.isParallel());
final IOBaseStream<?, ?, ?> seq1 = p.sequential();
assertFalse(seq1.isParallel());
final IOBaseStream<?, ?, ?> seq2 = seq1.sequential();
assertFalse(seq1.isParallel());
assertSame(seq1, seq2);
}
@SuppressWarnings("resource") // @AfterEach
@Test
public void testSpliterator() {
final AtomicInteger ref = new AtomicInteger();
baseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
ioBaseStreamPath.asBaseStream().spliterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
}
@SuppressWarnings("resource")
@Test
public void testUnordered() throws IOException {
final AtomicInteger ref = new AtomicInteger();
baseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
ioBaseStreamPath.asBaseStream().unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
}
@SuppressWarnings("resource")
@Test
public void testUnwrap() {
final AtomicInteger ref = new AtomicInteger();
baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(2, ref.get());
ioBaseStream.unwrap().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(4, ref.get());
ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
assertEquals(6, ref.get());
}
@Test
public void testWrap() {
final Stream<Path> stream = createStreamOfPaths();
@SuppressWarnings("resource")
final IOStream<Path> wrap = ioBaseStreamAdapter.wrap(stream);
assertNotNull(wrap);
assertEquals(stream, wrap.unwrap());
}
}