| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.ABORT_STATE_NAME; |
| import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.FINISH_STATE_NAME; |
| import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.PROCESS_STATE_NAME; |
| import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.START_STATE_NAME; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.sameInstance; |
| import static org.junit.Assert.assertThat; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.api.services.dataflow.model.CounterUpdate; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.core.SimpleDoFnRunner; |
| import org.apache.beam.runners.core.metrics.ExecutionStateTracker; |
| import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionStateRegistry; |
| import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; |
| import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; |
| import org.apache.beam.runners.dataflow.worker.counters.NameContext; |
| import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; |
| import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; |
| import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; |
| import org.apache.beam.sdk.metrics.MetricsContainer; |
| import org.apache.beam.sdk.testing.RestoreSystemProperties; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; |
| import org.hamcrest.Matchers; |
| import org.joda.time.Duration; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.runners.Enclosed; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.Mock; |
| import org.mockito.Mockito; |
| import org.mockito.MockitoAnnotations; |
| |
| /** Tests for {@link DataflowOperationContext}. */ |
| @RunWith(Enclosed.class) |
| public class DataflowOperationContextTest { |
| |
| /** Tests for the management of {@link ExecutionState} in {@link DataflowOperationContext}. */ |
| @RunWith(JUnit4.class) |
| public static class ContextStatesTest { |
| |
| @Mock private CounterFactory counterFactory; |
| @Mock private MetricsContainer metricsContainer; |
| |
| private DataflowExecutionStateRegistry stateRegistry = new BatchModeExecutionStateRegistry(); |
| |
| @Mock private ScopedProfiler scopedProfiler; |
| |
| @Mock private ProfileScope emptyScope; |
| @Mock private ProfileScope profileScope; |
| |
| private DataflowExecutionState otherState; |
| private DataflowExecutionState startState; |
| private DataflowExecutionState processState; |
| private DataflowExecutionState abortState; |
| |
| private ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest(); |
| private DataflowOperationContext operationContext; |
| |
| @Before |
| public void setUp() { |
| MockitoAnnotations.initMocks(this); |
| |
| otherState = stateRegistry.getState(NameContext.forStage("STAGE"), "other", null, emptyScope); |
| startState = |
| stateRegistry.getState( |
| NameContextsForTests.nameContextForTest(), |
| START_STATE_NAME, |
| metricsContainer, |
| profileScope); |
| processState = |
| stateRegistry.getState( |
| NameContextsForTests.nameContextForTest(), |
| PROCESS_STATE_NAME, |
| metricsContainer, |
| profileScope); |
| abortState = |
| stateRegistry.getState( |
| NameContextsForTests.nameContextForTest(), |
| ABORT_STATE_NAME, |
| metricsContainer, |
| profileScope); |
| stateRegistry.getState( |
| NameContextsForTests.nameContextForTest(), |
| FINISH_STATE_NAME, |
| metricsContainer, |
| profileScope); |
| |
| operationContext = |
| new DataflowOperationContext( |
| counterFactory, |
| NameContextsForTests.nameContextForTest(), |
| metricsContainer, |
| stateTracker, |
| stateRegistry); |
| |
| // MapTaskExecutor ensures we start in the "other" (outside of a step) state. |
| stateTracker.enterState(otherState); |
| Mockito.reset(emptyScope); // reset so tests don't need to expect the activation above |
| } |
| |
| @Test |
| public void enterStart() { |
| operationContext.enterStart(); |
| |
| assertThat(stateTracker.getCurrentState(), sameInstance(startState)); |
| verify(profileScope).activate(); |
| } |
| |
| @Test |
| public void exitStart() throws IOException { |
| operationContext.enterStart().close(); |
| |
| assertThat(stateTracker.getCurrentState(), sameInstance(otherState)); |
| verify(profileScope).activate(); |
| verify(emptyScope).activate(); |
| } |
| |
| @Test |
| public void enterAllStates() { |
| operationContext.enterStart(); |
| operationContext.enterProcess(); |
| operationContext.enterFinish(); |
| operationContext.enterAbort(); |
| |
| assertThat(stateTracker.getCurrentState(), sameInstance(abortState)); |
| verify(profileScope, times(4)).activate(); // start, process, finish, abort |
| } |
| |
| @Test |
| public void enterAllStatesAndExit() throws IOException { |
| operationContext.enterStart(); |
| operationContext.enterProcess(); |
| Closeable finish = operationContext.enterFinish(); |
| Closeable abort = operationContext.enterAbort(); |
| abort.close(); |
| finish.close(); |
| |
| assertThat(stateTracker.getCurrentState(), sameInstance(processState)); |
| verify(profileScope, times(6)).activate(); // start, process, finish, abort, finish, process |
| } |
| } |
| |
| /** Tests for the lull logging in {@link DataflowOperationContext}. */ |
| @RunWith(JUnit4.class) |
| public static class LullLoggingTest { |
| |
| @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); |
| |
| @Rule public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); |
| |
| private File logFolder; |
| |
| @Before |
| public void setUp() throws IOException { |
| logFolder = tempFolder.newFolder(); |
| System.setProperty( |
| DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY, |
| new File(logFolder, "dataflow-json.log").getAbsolutePath()); |
| // We need to reset *first* because some other test may have already initialized the |
| // logging initializer. |
| DataflowWorkerLoggingInitializer.reset(); |
| DataflowWorkerLoggingInitializer.initialize(); |
| } |
| |
| @After |
| public void tearDown() { |
| DataflowWorkerLoggingInitializer.reset(); |
| } |
| |
| @Test |
| public void testLullReportsRightTrace() throws Exception { |
| Thread mockThread = mock(Thread.class); |
| |
| DataflowExecutionState executionState = |
| new DataflowExecutionState( |
| NameContextsForTests.nameContextForTest(), |
| "somestate", |
| null /* requestingStepName */, |
| null /* inputIndex */, |
| null /* metricsContainer */, |
| ScopedProfiler.INSTANCE.emptyScope()) { |
| @Nullable |
| @Override |
| public CounterUpdate extractUpdate(boolean isFinalUpdate) { |
| // not being used for extracting updates |
| return null; |
| } |
| |
| @Override |
| public void takeSample(long millisSinceLastSample) { |
| // Not being used for sampling |
| } |
| }; |
| |
| when(mockThread.getStackTrace()) |
| .thenReturn( |
| new StackTraceElement[] { |
| new StackTraceElement( |
| "userpackage.SomeUserDoFn", "helperMethod", "SomeUserDoFn.java", 250), |
| new StackTraceElement( |
| "userpackage.SomeUserDoFn", "process", "SomeUserDoFn.java", 450), |
| new StackTraceElement( |
| SimpleDoFnRunner.class.getName(), |
| "processElement", |
| "SimpleDoFnRunner.java", |
| 500), |
| }); |
| executionState.reportLull(mockThread, 6000); |
| |
| File[] files = logFolder.listFiles(); |
| assertThat(files, Matchers.arrayWithSize(1)); |
| String contents = Joiner.on("\n").join(Files.readAllLines(files[0].toPath())); |
| assertThat( |
| contents, |
| Matchers.allOf( |
| Matchers.containsString("Processing stuck in step " + NameContextsForTests.USER_NAME), |
| Matchers.containsString(" without outputting or completing in state somestate"), |
| Matchers.containsString("userpackage.SomeUserDoFn.helperMethod"), |
| Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName())))); |
| } |
| |
| @Test |
| public void testDurationFormatting() { |
| assertThat( |
| DataflowOperationContext.formatDuration(getDuration(0, 0, 5, 10, 500)), |
| equalTo("05m10s")); |
| assertThat( |
| DataflowOperationContext.formatDuration(getDuration(0, 2, 10, 23, 500)), |
| equalTo("02h10m23s")); |
| assertThat( |
| DataflowOperationContext.formatDuration(getDuration(1, 0, 0, 23, 500)), |
| equalTo("24h00m23s")); |
| assertThat( |
| DataflowOperationContext.formatDuration(getDuration(1, 0, 10, 23, 500)), |
| equalTo("24h10m23s")); |
| assertThat( |
| DataflowOperationContext.formatDuration(getDuration(2, 0, 0, 23, 500)), |
| equalTo("48h00m23s")); |
| } |
| |
| private Duration getDuration(int days, int hours, int minutes, int seconds, int millis) { |
| return Duration.standardDays(days) |
| .plus(Duration.standardHours(hours)) |
| .plus(Duration.standardMinutes(minutes)) |
| .plus(Duration.standardSeconds(seconds)) |
| .plus(Duration.millis(millis)); |
| } |
| } |
| } |