blob: bedbb5da4e9ba9bba68afba512072037a91405e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.PAssert.PCollectionContentsAssert.MatcherCheckerFn;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Test case for {@link PAssert}. */
@RunWith(JUnit4.class)
public class PAssertTest implements Serializable {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
private static class NotSerializableObject {
@Override
public boolean equals(Object other) {
return (other instanceof NotSerializableObject);
}
@Override
public int hashCode() {
return 73;
}
}
private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
private NotSerializableObjectCoder() {}
private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
@JsonCreator
public static NotSerializableObjectCoder of() {
return INSTANCE;
}
@Override
public void encode(NotSerializableObject value, OutputStream outStream)
throws CoderException, IOException {}
@Override
public NotSerializableObject decode(InputStream inStream) throws CoderException, IOException {
return new NotSerializableObject();
}
@Override
public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value) {
return true;
}
@Override
public void registerByteSizeObserver(
NotSerializableObject value, ElementByteSizeObserver observer) throws Exception {
observer.update(0L);
}
}
private void throwNestedError() {
throw new RuntimeException("Nested error");
}
private void throwWrappedError() {
try {
throwNestedError();
} catch (Exception e) {
throw new RuntimeException("Wrapped error", e);
}
}
@Test
public void testFailureWithExceptionEncodedDecoded() throws IOException {
Throwable error;
try {
throwWrappedError();
throw new IllegalStateException("Should have failed");
} catch (Throwable e) {
error = e;
}
SuccessOrFailure failure =
SuccessOrFailure.failure(PAssert.PAssertionSite.capture("here"), error);
SuccessOrFailure res = CoderUtils.clone(SerializableCoder.of(SuccessOrFailure.class), failure);
assertEquals(
"Encode-decode failed SuccessOrFailure",
Throwables.getStackTraceAsString(failure.assertionError()),
Throwables.getStackTraceAsString(res.assertionError()));
}
@Test
public void testSuccessEncodedDecoded() throws IOException {
SuccessOrFailure success = SuccessOrFailure.success();
SerializableCoder<SuccessOrFailure> coder = SerializableCoder.of(SuccessOrFailure.class);
byte[] encoded = CoderUtils.encodeToByteArray(coder, success);
SuccessOrFailure res = CoderUtils.decodeFromByteArray(coder, encoded);
assertEquals("Encode-decode successful SuccessOrFailure", success.isSuccess(), res.isSuccess());
assertEquals(
"Encode-decode successful SuccessOrFailure",
success.assertionError(),
res.assertionError());
}
/**
* A {@link PAssert} about the contents of a {@link PCollection} must not require the contents of
* the {@link PCollection} to be serializable.
*/
@Test
@Category(ValidatesRunner.class)
public void testContainsInAnyOrderNotSerializable() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline.apply(
Create.of(new NotSerializableObject(), new NotSerializableObject())
.withCoder(NotSerializableObjectCoder.of()));
PAssert.that(pcollection)
.containsInAnyOrder(new NotSerializableObject(), new NotSerializableObject());
pipeline.run();
}
/**
* A {@link PAssert} about the contents of a {@link PCollection} is allows to be verified by an
* arbitrary {@link SerializableFunction}, though.
*/
@Test
@Category(ValidatesRunner.class)
public void testSerializablePredicate() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline.apply(
Create.of(new NotSerializableObject(), new NotSerializableObject())
.withCoder(NotSerializableObjectCoder.of()));
PAssert.that(pcollection)
.satisfies(
contents -> {
return null; // no problem!
});
pipeline.run();
}
/**
* A {@link PAssert} about the contents of a {@link PCollection} is allows to be verified by an
* arbitrary {@link SerializableFunction}, though.
*/
@Test
@Category(ValidatesRunner.class)
public void testWindowedSerializablePredicate() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(new NotSerializableObject(), new Instant(250L)),
TimestampedValue.of(new NotSerializableObject(), new Instant(500L)))
.withCoder(NotSerializableObjectCoder.of()))
.apply(Window.into(FixedWindows.of(Duration.millis(300L))));
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(0L), new Instant(300L)))
.satisfies(
contents -> {
assertThat(Iterables.isEmpty(contents), is(false));
return null; // no problem!
});
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(300L), new Instant(600L)))
.satisfies(
contents -> {
assertThat(Iterables.isEmpty(contents), is(false));
return null; // no problem!
});
pipeline.run();
}
/**
* Test that we throw an error at pipeline construction time when the user mistakenly uses {@code
* PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
*/
@SuppressWarnings({
"deprecation", // test of deprecated function
"EqualsIncompatibleType"
})
@Test
public void testPAssertEqualsSingletonUnsupported() throws Exception {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("isEqualTo");
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton(pcollection).equals(42);
}
/**
* Test that we throw an error at pipeline construction time when the user mistakenly uses {@code
* PAssert.that().equals()} instead of the test method {@code .containsInAnyOrder}.
*/
@SuppressWarnings({
"deprecation", // test of deprecated function
"EqualsIncompatibleType"
})
@Test
public void testPAssertEqualsIterableUnsupported() throws Exception {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("containsInAnyOrder");
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.that(pcollection).equals(42);
}
/**
* Test that {@code PAssert.thatSingleton().hashCode()} is unsupported. See {@link
* #testPAssertEqualsSingletonUnsupported}.
*/
@SuppressWarnings("deprecation") // test of deprecated function
@Test
public void testPAssertHashCodeSingletonUnsupported() throws Exception {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(".hashCode() is not supported.");
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton(pcollection).hashCode();
}
/**
* Test that {@code PAssert.thatIterable().hashCode()} is unsupported. See {@link
* #testPAssertEqualsIterableUnsupported}.
*/
@SuppressWarnings("deprecation") // test of deprecated function
@Test
public void testPAssertHashCodeIterableUnsupported() throws Exception {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(".hashCode() is not supported.");
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.that(pcollection).hashCode();
}
/** Basic test for {@code isEqualTo}. */
@Test
@Category(ValidatesRunner.class)
public void testIsEqualTo() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
PAssert.thatSingleton(pcollection).isEqualTo(43);
pipeline.run();
}
/** Basic test for {@code isEqualTo}. */
@Test
@Category(ValidatesRunner.class)
public void testWindowedIsEqualTo() throws Exception {
PCollection<Integer> pcollection =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(43, new Instant(250L)),
TimestampedValue.of(22, new Instant(-250L))))
.apply(Window.into(FixedWindows.of(Duration.millis(500L))));
PAssert.thatSingleton(pcollection)
.inOnlyPane(new IntervalWindow(new Instant(0L), new Instant(500L)))
.isEqualTo(43);
PAssert.thatSingleton(pcollection)
.inOnlyPane(new IntervalWindow(new Instant(-500L), new Instant(0L)))
.isEqualTo(22);
pipeline.run();
}
/** Basic test for {@code notEqualTo}. */
@Test
@Category(ValidatesRunner.class)
public void testNotEqualTo() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
PAssert.thatSingleton(pcollection).notEqualTo(42);
pipeline.run();
}
/** Test that we throw an error for false assertion on singleton. */
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testPAssertEqualsSingletonFalse() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44);
Throwable thrown = runExpectingAssertionFailure(pipeline);
String message = thrown.getMessage();
assertThat(message, containsString("The value was not equal to 44"));
assertThat(message, containsString("Expected: <44>"));
assertThat(message, containsString("but: was <42>"));
}
/** Test that we throw an error for false assertion on singleton. */
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton(pcollection).isEqualTo(44);
Throwable thrown = runExpectingAssertionFailure(pipeline);
String message = thrown.getMessage();
assertThat(message, containsString("Create.Values/Read(CreateSource).out"));
assertThat(message, containsString("Expected: <44>"));
assertThat(message, containsString("but: was <42>"));
}
/** Tests that {@code containsInAnyOrder} is actually order-independent. */
@Test
@Category(ValidatesRunner.class)
public void testContainsInAnyOrder() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
pipeline.run();
}
/** Tests that {@code containsInAnyOrder} is actually order-independent. */
@Test
@Category(ValidatesRunner.class)
public void testGlobalWindowContainsInAnyOrder() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3);
pipeline.run();
}
/** Tests that windowed {@code containsInAnyOrder} is actually order-independent. */
@Test
@Category(ValidatesRunner.class)
public void testWindowedContainsInAnyOrder() throws Exception {
PCollection<Integer> pcollection =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(1, new Instant(100L)),
TimestampedValue.of(2, new Instant(200L)),
TimestampedValue.of(3, new Instant(300L)),
TimestampedValue.of(4, new Instant(400L))))
.apply(
Window.into(
SlidingWindows.of(Duration.millis(200L))
.every(Duration.millis(100L))
.withOffset(Duration.millis(50L))));
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(-50L), new Instant(150L)))
.containsInAnyOrder(1);
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(50L), new Instant(250L)))
.containsInAnyOrder(2, 1);
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(150L), new Instant(350L)))
.containsInAnyOrder(2, 3);
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(250L), new Instant(450L)))
.containsInAnyOrder(4, 3);
PAssert.that(pcollection)
.inWindow(new IntervalWindow(new Instant(350L), new Instant(550L)))
.containsInAnyOrder(4);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testEmpty() {
PCollection<Long> vals = pipeline.apply(Create.empty(VarLongCoder.of()));
PAssert.that(vals).empty();
pipeline.run();
}
/** Tests that {@code containsInAnyOrder} fails when and how it should. */
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testContainsInAnyOrderFalse() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
Throwable exc = runExpectingAssertionFailure(pipeline);
Pattern expectedPattern =
Pattern.compile(
"Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
// A loose pattern, but should get the job done.
assertTrue(
"Expected error message from PAssert with substring matching "
+ expectedPattern
+ " but the message was \""
+ exc.getMessage()
+ "\"",
expectedPattern.matcher(exc.getMessage()).find());
}
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testEmptyFalse() throws Exception {
PCollection<Long> vals = pipeline.apply(GenerateSequence.from(0).to(5));
PAssert.that("Vals should have been empty", vals).empty();
Throwable thrown = runExpectingAssertionFailure(pipeline);
String message = thrown.getMessage();
assertThat(message, containsString("Vals should have been empty"));
assertThat(message, containsString("Expected: iterable over [] in any order"));
}
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testEmptyFalseDefaultReasonString() throws Exception {
PCollection<Long> vals = pipeline.apply(GenerateSequence.from(0).to(5));
PAssert.that(vals).empty();
Throwable thrown = runExpectingAssertionFailure(pipeline);
String message = thrown.getMessage();
assertThat(message, containsString("GenerateSequence/Read(BoundedCountingSource).out"));
assertThat(message, containsString("Expected: iterable over [] in any order"));
}
@Test
public void testAssertionSiteIsCaptured() {
// This check should return a failure.
SuccessOrFailure res =
PAssert.doChecks(
PAssert.PAssertionSite.capture("Captured assertion message."),
10,
new MatcherCheckerFn(SerializableMatchers.contains(11)));
String stacktrace = Throwables.getStackTraceAsString(res.assertionError());
assertEquals(false, res.isSuccess());
assertThat(stacktrace, containsString("PAssertionSite.capture"));
}
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testAssertionSiteIsCapturedWithMessage() throws Exception {
PCollection<Long> vals = pipeline.apply(GenerateSequence.from(0).to(5));
assertThatCollectionIsEmptyWithMessage(vals);
Throwable thrown = runExpectingAssertionFailure(pipeline);
assertThat(thrown.getMessage(), containsString("Should be empty"));
assertThat(thrown.getMessage(), containsString("Expected: iterable over [] in any order"));
String stacktrace = Throwables.getStackTraceAsString(thrown);
assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage"));
assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage"));
}
@Test
@Category({ValidatesRunner.class, UsesFailureMessage.class})
public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
PCollection<Long> vals = pipeline.apply(GenerateSequence.from(0).to(5));
assertThatCollectionIsEmptyWithoutMessage(vals);
Throwable thrown = runExpectingAssertionFailure(pipeline);
assertThat(thrown.getMessage(), containsString("Expected: iterable over [] in any order"));
String stacktrace = Throwables.getStackTraceAsString(thrown);
assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage"));
assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage"));
}
private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> vals) {
PAssert.that("Should be empty", vals).empty();
}
private static void assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) {
PAssert.that(vals).empty();
}
private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
// We cannot use thrown.expect(AssertionError.class) because the AssertionError
// is first caught by JUnit and causes a test failure.
try {
pipeline.run();
} catch (Throwable exc) {
return exc;
}
fail("assertion should have failed");
throw new RuntimeException("unreachable");
}
@Test
public void countAssertsSucceeds() {
PCollection<Integer> create = pipeline.apply("FirstCreate", Create.of(1, 2, 3));
PAssert.that(create).containsInAnyOrder(1, 2, 3);
PAssert.thatSingleton(create.apply(Sum.integersGlobally())).isEqualTo(6);
PAssert.thatMap(pipeline.apply("CreateMap", Create.of(KV.of(1, 2))))
.isEqualTo(Collections.singletonMap(1, 2));
assertThat(PAssert.countAsserts(pipeline), equalTo(3));
}
@Test
public void countAssertsMultipleCallsIndependent() {
PCollection<Integer> create = pipeline.apply("FirstCreate", Create.of(1, 2, 3));
PAssert.that(create).containsInAnyOrder(1, 2, 3);
PAssert.thatSingleton(create.apply(Sum.integersGlobally())).isEqualTo(6);
assertThat(PAssert.countAsserts(pipeline), equalTo(2));
PAssert.thatMap(pipeline.apply("CreateMap", Create.of(KV.of(1, 2))))
.isEqualTo(Collections.singletonMap(1, 2));
assertThat(PAssert.countAsserts(pipeline), equalTo(3));
}
}