blob: fed862d44436bafde10c4c40b26d75ce9d8ebdd4 [file] [log] [blame]
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult.State;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* Tests for BlockingDataflowPipelineRunner.
*/
@RunWith(JUnit4.class)
public class BlockingDataflowPipelineRunnerTest {
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
// This class mocks a call to DataflowPipelineJob.waitToFinish():
// it blocks the thread to simulate waiting,
// and releases the blocking once signaled
static class MockWaitToFinish implements Answer<State> {
NotificationHelper jobCompleted = new NotificationHelper();
@Override
public State answer(
InvocationOnMock invocation) throws InterruptedException {
System.out.println("MockWaitToFinish.answer(): Wait for signaling job completion.");
assertTrue("Test did not receive mock job completion signal",
jobCompleted.waitTillSet(10000));
System.out.println("MockWaitToFinish.answer(): job completed.");
return State.DONE;
}
public void signalJobComplete() {
jobCompleted.set();
}
}
// Mini helper class for wait-notify
static class NotificationHelper {
private boolean isSet = false;
public synchronized void set() {
isSet = true;
notifyAll();
}
public synchronized boolean check() {
return isSet;
}
public synchronized boolean waitTillSet(long timeout) throws InterruptedException {
long remainingTimeout = timeout;
long startTime = new Date().getTime();
while (!isSet && remainingTimeout > 0) {
wait(remainingTimeout);
remainingTimeout = timeout - (new Date().getTime() - startTime);
}
return isSet;
}
}
@Test
public void testJobWaitComplete() throws IOException, InterruptedException {
DataflowPipelineRunner mockDataflowPipelineRunner = mock(DataflowPipelineRunner.class);
DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
MockWaitToFinish mockWait = new MockWaitToFinish();
when(mockJob.waitToFinish(
anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
.thenAnswer(mockWait);
when(mockDataflowPipelineRunner.run(isA(Pipeline.class))).thenReturn(mockJob);
// Construct a BlockingDataflowPipelineRunner with mockDataflowPipelineRunner inside
final BlockingDataflowPipelineRunner blockingRunner =
new BlockingDataflowPipelineRunner(
mockDataflowPipelineRunner,
PipelineOptionsFactory.as(TestDataflowPipelineOptions.class));
final NotificationHelper executionStarted = new NotificationHelper();
final NotificationHelper jobCompleted = new NotificationHelper();
new Thread() {
@Override
public void run() {
executionStarted.set();
// Run on an empty test pipeline.
blockingRunner.run(DirectPipeline.createForTest());
// Test following code is not reached till mock job completion signal.
jobCompleted.set();
}
}.start();
assertTrue("'executionStarted' event not set till timeout.",
executionStarted.waitTillSet(2000));
assertFalse("Code after job completion should not be reached before mock signal.",
jobCompleted.check());
mockWait.signalJobComplete();
assertTrue("run() should return after job completion is mocked.",
jobCompleted.waitTillSet(2000));
expectedLogs.verifyInfo("Job finished with status DONE");
}
@Test
public void testToString() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setJobName("TestJobName");
options.setProject("TestProject");
options.setTempLocation("gs://test/temp/location");
options.setGcpCredential(new TestCredential());
assertEquals("BlockingDataflowPipelineRunner#TestJobName",
BlockingDataflowPipelineRunner.fromOptions(options).toString());
}
}