blob: 24a5fe0e4fad45b764a51f121964adb67e232784 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Tests for {@link TestDataflowRunner}. */
@RunWith(JUnit4.class)
public class TestDataflowRunnerTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
@Mock private DataflowClient mockClient;
private TestDataflowPipelineOptions options;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
options.setAppName("TestAppName");
options.setProject("test-project");
options.setTempLocation("gs://test/temp/location");
options.setTempRoot("gs://test");
options.setGcpCredential(new TestCredential());
options.setRunner(TestDataflowRunner.class);
options.setPathValidatorClass(NoopPathValidator.class);
}
@Test
public void testToString() {
assertEquals(
"TestDataflowRunner#TestAppName", TestDataflowRunner.fromOptions(options).toString());
}
@Test
public void testRunBatchJobThatSucceeds() throws Exception {
Pipeline p = Pipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
assertEquals(mockJob, runner.run(p, mockRunner));
}
/**
* Job success on Dataflow means that it handled transient errors (if any) successfully by
* retrying failed bundles.
*/
@Test
public void testRunBatchJobThatSucceedsDespiteTransientErrors() throws Exception {
Pipeline p = Pipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenAnswer(
invocation -> {
JobMessage message = new JobMessage();
message.setMessageText("TransientError");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
((JobMessagesHandler) invocation.getArguments()[1]).process(Arrays.asList(message));
return State.DONE;
});
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
assertEquals(mockJob, runner.run(p, mockRunner));
}
/**
* Tests that when a batch job terminates in a failure state even if all assertions passed, it
* throws an error to that effect.
*/
@Test
public void testRunBatchJobThatFails() throws Exception {
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.FAILED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, false /* tentative */));
expectedException.expect(RuntimeException.class);
runner.run(p, mockRunner);
// Note that fail throws an AssertionError which is why it is placed out here
// instead of inside the try-catch block.
fail("AssertionError expected");
}
@Test
public void testBatchPipelineFailsIfException() throws Exception {
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.RUNNING);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenAnswer(
invocation -> {
JobMessage message = new JobMessage();
message.setMessageText("FooException");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
((JobMessagesHandler) invocation.getArguments()[1]).process(Arrays.asList(message));
return State.CANCELLED;
});
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
assertThat(expected.getMessage(), containsString("FooException"));
verify(mockJob, never()).cancel();
return;
}
// Note that fail throws an AssertionError which is why it is placed out here
// instead of inside the try-catch block.
fail("AssertionError expected");
}
/** A streaming job that terminates with no error messages is a success. */
@Test
public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.run(p, mockRunner);
}
@Test
public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockStreamingMetricResponse(ImmutableMap.of()));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.run(p, mockRunner);
}
/**
* Tests that a streaming job with a false {@link PAssert} fails.
*
* <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it
* is detected only by failure job messages. With fuller metric support, this can detect a PAssert
* failure via metrics and raise an {@link AssertionError} in just that case.
*/
@Test
public void testRunStreamingJobThatFails() throws Exception {
testStreamingPipelineFailsIfException();
}
private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
throws Exception {
List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
return buildJobMetrics(metrics);
}
private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
MetricStructuredName name = new MetricStructuredName();
name.setName(success ? "PAssertSuccess" : "PAssertFailure");
name.setContext(tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.of());
MetricUpdate metric = new MetricUpdate();
metric.setName(name);
metric.setScalar(BigDecimal.ONE);
return Lists.newArrayList(metric);
}
private JobMetrics generateMockStreamingMetricResponse(Map<String, BigDecimal> metricMap)
throws IOException {
return buildJobMetrics(generateMockStreamingMetrics(metricMap));
}
private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
List<MetricUpdate> metrics = Lists.newArrayList();
for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
MetricStructuredName name = new MetricStructuredName();
name.setName(entry.getKey());
MetricUpdate metric = new MetricUpdate();
metric.setName(name);
metric.setScalar(entry.getValue());
metrics.add(metric);
}
return metrics;
}
private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
JobMetrics jobMetrics = new JobMetrics();
jobMetrics.setMetrics(metricList);
// N.B. Setting the factory is necessary in order to get valid JSON.
jobMetrics.setFactory(Transport.getJsonFactory());
return jobMetrics;
}
/**
* Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has
* succeeded.
*/
@Test
public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */)));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
doReturn(State.DONE).when(job).getState();
assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
}
/**
* Tests that when we just see a tentative failure for a {@link PAssert} it is considered a
* conclusive failure.
*/
@Test
public void testCheckingForSuccessWhenPAssertFails() throws Exception {
DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(
buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
doReturn(State.DONE).when(job).getState();
assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
}
@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(
buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.updatePAssertCount(p);
doReturn(State.RUNNING).when(job).getState();
assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
}
/**
* Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
* throws an {@link AssertionError}.
*
* <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of
* failure.
*/
@Test
public void testStreamingPipelineFailsIfException() throws Exception {
options.setStreaming(true);
Pipeline pipeline = TestPipeline.create(options);
PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.RUNNING);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenAnswer(
invocation -> {
JobMessage message = new JobMessage();
message.setMessageText("FooException");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
((JobMessagesHandler) invocation.getArguments()[1]).process(Arrays.asList(message));
return State.CANCELLED;
});
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
expectedException.expect(RuntimeException.class);
runner.run(pipeline, mockRunner);
}
@Test
public void testGetJobMetricsThatSucceeds() throws Exception {
DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
JobMetrics metrics = runner.getJobMetrics(job);
assertEquals(1, metrics.getMetrics().size());
assertEquals(
generateMockMetrics(true /* success */, true /* tentative */), metrics.getMetrics());
}
@Test
public void testGetJobMetricsThatFailsForException() throws Exception {
DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
assertNull(runner.getJobMetrics(job));
}
@Test
public void testBatchOnCreateMatcher() throws Exception {
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
runner.run(p, mockRunner);
}
@Test
public void testStreamingOnCreateMatcher() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
runner.run(p, mockRunner);
}
@Test
public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
runner.run(p, mockRunner);
}
/**
* Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that
* the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
* invoked.
*/
@Test
public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
runner.run(p, mockRunner);
}
@Test
public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.FAILED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
verify(mockJob, Mockito.times(1))
.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class));
return;
}
fail("Expected an exception on pipeline failure.");
}
/**
* Tests that when a streaming pipeline terminates in FAIL that the {@link
* TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
* invoked.
*/
@Test
public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.FAILED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.FAILED);
expectedException.expect(RuntimeException.class);
runner.run(p, mockRunner);
// If the onSuccessMatcher were invoked, it would have crashed here with AssertionError
}
static class TestSuccessMatcher extends BaseMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
private final transient DataflowPipelineJob mockJob;
private final int called;
public TestSuccessMatcher(DataflowPipelineJob job, int times) {
this.mockJob = job;
this.called = times;
}
@Override
public boolean matches(Object o) {
if (!(o instanceof PipelineResult)) {
fail(String.format("Expected PipelineResult but received %s", o));
}
try {
verify(mockJob, Mockito.times(called))
.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class));
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
}
assertSame(mockJob, o);
return true;
}
@Override
public void describeTo(Description description) {}
}
static class TestFailureMatcher extends BaseMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
@Override
public boolean matches(Object o) {
fail("OnSuccessMatcher should not be called on pipeline failure.");
return false;
}
@Override
public void describeTo(Description description) {}
}
}