blob: 9fcdb147ffedcc7755583dd7a5e35de041a19b24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.api.services.dataflow.model.NameAndKind;
import com.google.api.services.dataflow.model.Position;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.DataflowReaderPosition;
import org.apache.beam.runners.dataflow.worker.WorkerCustomSources.BoundedSourceSplit;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link WorkItemStatusClient}. */
@RunWith(JUnit4.class)
public class WorkItemStatusClientTest {
private static final String PROJECT_ID = "ProjectId";
private static final String JOB_ID = "JobId";
private static final long WORK_ID = 0xDEADBEEF;
private static final Duration LEASE_DURATION = Duration.standardSeconds(10);
private static final long INITIAL_REPORT_INDEX = 5;
@Rule public ExpectedException thrown = ExpectedException.none();
@Mock private WorkUnitClient workUnitClient;
private WorkItem workItem =
new WorkItem()
.setProjectId(PROJECT_ID)
.setJobId(JOB_ID)
.setId(WORK_ID)
.setInitialReportIndex(INITIAL_REPORT_INDEX);
private DataflowPipelineOptions options;
@Mock private DataflowWorkExecutor worker;
private BatchModeExecutionContext executionContext;
@Captor private ArgumentCaptor<WorkItemStatus> statusCaptor;
private WorkItemStatusClient statusClient;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
executionContext = BatchModeExecutionContext.forTesting(options, "testStage");
statusClient = new WorkItemStatusClient(workUnitClient, workItem);
}
/** Verify that we can set the worker once, but not again. */
@Test
public void setWorker() {
// We should be able to set the worker the first time.
statusClient.setWorker(worker, executionContext);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("setWorker once");
statusClient.setWorker(worker, executionContext);
}
/** Reporting an error before setWorker has been called should work. */
@Test
public void reportError() throws IOException {
RuntimeException error = new RuntimeException();
error.fillInStackTrace();
statusClient.reportError(error);
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getWorkItemId(), equalTo(Long.toString(WORK_ID)));
assertThat(workStatus.getCompleted(), equalTo(true));
assertThat(workStatus.getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(workStatus.getErrors(), hasSize(1));
Status status = workStatus.getErrors().get(0);
assertThat(status.getCode(), equalTo(2));
assertThat(status.getMessage(), containsString("WorkItemStatusClientTest"));
}
/** Reporting an error after setWorker has been called should also work. */
@Test
public void reportErrorAfterSetWorker() throws IOException {
RuntimeException error = new RuntimeException();
error.fillInStackTrace();
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportError(error);
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getWorkItemId(), equalTo(Long.toString(WORK_ID)));
assertThat(workStatus.getCompleted(), equalTo(true));
assertThat(workStatus.getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(workStatus.getErrors(), hasSize(1));
Status status = workStatus.getErrors().get(0);
assertThat(status.getCode(), equalTo(2));
assertThat(status.getMessage(), containsString("WorkItemStatusClientTest"));
}
/** Reporting an out of memory error should log it in addition to the regular flow. */
@Test
public void reportOutOfMemoryErrorAfterSetWorker() throws IOException {
OutOfMemoryError error = new OutOfMemoryError();
error.fillInStackTrace();
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportError(error);
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getWorkItemId(), equalTo(Long.toString(WORK_ID)));
assertThat(workStatus.getCompleted(), equalTo(true));
assertThat(workStatus.getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(workStatus.getErrors(), hasSize(1));
Status status = workStatus.getErrors().get(0);
assertThat(status.getCode(), equalTo(2));
assertThat(status.getMessage(), containsString("WorkItemStatusClientTest"));
assertThat(status.getMessage(), containsString("An OutOfMemoryException occurred."));
}
@Test
public void reportUpdateAfterErrorShouldFail() throws Exception {
RuntimeException error = new RuntimeException();
error.fillInStackTrace();
statusClient.reportError(error);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("reportUpdate");
statusClient.reportUpdate(null, LEASE_DURATION);
}
@Test
public void reportSuccessBeforeSetWorker() throws IOException {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("setWorker");
thrown.expectMessage("reportSuccess");
statusClient.reportSuccess();
}
@Test
public void reportSuccess() throws IOException {
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportSuccess();
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getWorkItemId(), equalTo(Long.toString(WORK_ID)));
assertThat(workStatus.getCompleted(), equalTo(true));
assertThat(workStatus.getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(workStatus.getErrors(), nullValue());
}
@Test
public void reportSuccessWithSourceOperation() throws IOException {
SourceOperationExecutor sourceWorker = mock(SourceOperationExecutor.class);
when(sourceWorker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(sourceWorker, executionContext);
statusClient.reportSuccess();
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getWorkItemId(), equalTo(Long.toString(WORK_ID)));
assertThat(workStatus.getCompleted(), equalTo(true));
assertThat(workStatus.getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(workStatus.getErrors(), nullValue());
}
@Test
public void reportUpdateAfterSuccess() throws Exception {
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportSuccess();
thrown.expect(IllegalStateException.class);
thrown.expectMessage("reportUpdate");
statusClient.reportUpdate(null, LEASE_DURATION);
}
@Test
public void reportUpdateNullSplit() throws Exception {
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportUpdate(null, LEASE_DURATION);
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getCompleted(), equalTo(false));
}
@Test
public void reportUpdate() throws Exception {
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.reportUpdate(null, LEASE_DURATION);
verify(workUnitClient).reportWorkItemStatus(statusCaptor.capture());
WorkItemStatus workStatus = statusCaptor.getValue();
assertThat(workStatus.getCompleted(), equalTo(false));
}
@Test
public void reportIndexSequence() throws Exception {
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
when(workUnitClient.reportWorkItemStatus(isA(WorkItemStatus.class)))
.thenReturn(new WorkItemServiceState().setNextReportIndex(INITIAL_REPORT_INDEX + 4));
statusClient.reportUpdate(null, LEASE_DURATION);
when(workUnitClient.reportWorkItemStatus(isA(WorkItemStatus.class)))
.thenReturn(new WorkItemServiceState().setNextReportIndex(INITIAL_REPORT_INDEX + 8));
statusClient.reportUpdate(null, LEASE_DURATION);
statusClient.reportSuccess();
verify(workUnitClient, times(3)).reportWorkItemStatus(statusCaptor.capture());
List<WorkItemStatus> updates = statusCaptor.getAllValues();
assertThat(updates.get(0).getReportIndex(), equalTo(INITIAL_REPORT_INDEX));
assertThat(updates.get(1).getReportIndex(), equalTo(INITIAL_REPORT_INDEX + 4));
assertThat(updates.get(2).getReportIndex(), equalTo(INITIAL_REPORT_INDEX + 8));
}
@Test
public void populateMetricUpdatesNoStateSamplerInfo() throws Exception {
// When executionContext.getExecutionStateTracker() returns null, we get no metric updates.
WorkItemStatus status = new WorkItemStatus();
BatchModeExecutionContext executionContext = mock(BatchModeExecutionContext.class);
when(executionContext.getExecutionStateTracker()).thenReturn(null);
statusClient.setWorker(worker, executionContext);
statusClient.populateMetricUpdates(status);
assertThat(status.getMetricUpdates(), empty());
}
@Test
public void populateMetricUpdatesStateSamplerInfo() throws Exception {
// When executionContext.getExecutionStateTracker() returns non-null, we get one metric update.
WorkItemStatus status = new WorkItemStatus();
BatchModeExecutionContext executionContext = mock(BatchModeExecutionContext.class);
ExecutionStateTracker executionStateTracker = mock(ExecutionStateTracker.class);
ExecutionState executionState = mock(ExecutionState.class);
when(executionState.getDescription()).thenReturn("stageName-systemName-some-state");
when(executionContext.getExecutionStateTracker()).thenReturn(executionStateTracker);
when(executionStateTracker.getMillisSinceLastTransition()).thenReturn(20L);
when(executionStateTracker.getNumTransitions()).thenReturn(10L);
when(executionStateTracker.getCurrentState()).thenReturn(executionState);
statusClient.setWorker(worker, executionContext);
statusClient.populateMetricUpdates(status);
assertThat(status.getMetricUpdates(), hasSize(1));
MetricUpdate update = status.getMetricUpdates().get(0);
assertThat(update.getName().getName(), equalTo("state-sampler"));
assertThat(update.getKind(), equalTo("internal"));
Map<String, Object> samplerMetrics = (Map<String, Object>) update.getInternal();
assertThat(samplerMetrics, hasEntry("last-state-name", "stageName-systemName-some-state"));
assertThat(samplerMetrics, hasEntry("num-transitions", 10L));
assertThat(samplerMetrics, hasEntry("last-state-duration-ms", 20L));
}
@Test
public void populateCounterUpdatesEmptyOutputCounters() throws Exception {
// When worker.getOutputCounters == null, there should be no counters.
WorkItemStatus status = new WorkItemStatus();
statusClient.setWorker(worker, executionContext);
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.populateCounterUpdates(status);
assertThat(status.getCounterUpdates(), hasSize(0));
}
@Test
/** Validates that an "internal" Counter is reported. */
public void populateCounterUpdatesWithOutputCounters() throws Exception {
final CounterUpdate counter =
new CounterUpdate()
.setNameAndKind(new NameAndKind().setName("some-counter").setKind("SUM"))
.setCumulative(true)
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(42));
CounterSet counterSet = new CounterSet();
counterSet.intSum(CounterName.named("some-counter")).addValue(42);
WorkItemStatus status = new WorkItemStatus();
when(worker.getOutputCounters()).thenReturn(counterSet);
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, executionContext);
statusClient.populateCounterUpdates(status);
assertThat(status.getCounterUpdates(), containsInAnyOrder(counter));
}
/** Validates that Beam Metrics and "internal" Counters are merged in the update. */
@Test
public void populateCounterUpdatesWithMetricsAndCounters() throws Exception {
final CounterUpdate expectedCounter =
new CounterUpdate()
.setNameAndKind(new NameAndKind().setName("some-counter").setKind("SUM"))
.setCumulative(true)
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(42));
CounterSet counterSet = new CounterSet();
counterSet.intSum(CounterName.named("some-counter")).addValue(42);
final CounterUpdate expectedMetric =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin("USER")
.setOriginNamespace("namespace")
.setName("some-counter")
.setOriginalStepName("step"))
.setMetadata(new CounterMetadata().setKind("SUM")))
.setCumulative(true)
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(42));
MetricsContainerImpl metricsContainer = new MetricsContainerImpl("step");
BatchModeExecutionContext context = mock(BatchModeExecutionContext.class);
when(context.extractMetricUpdates(anyBoolean())).thenReturn(ImmutableList.of(expectedMetric));
when(context.extractMsecCounters(anyBoolean())).thenReturn(Collections.emptyList());
CounterCell counter =
metricsContainer.getCounter(MetricName.named("namespace", "some-counter"));
counter.inc(1);
counter.inc(41);
counter.inc(1);
counter.inc(-1);
WorkItemStatus status = new WorkItemStatus();
when(worker.getOutputCounters()).thenReturn(counterSet);
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, context);
statusClient.populateCounterUpdates(status);
assertThat(status.getCounterUpdates(), containsInAnyOrder(expectedCounter, expectedMetric));
}
@Test
public void populateCounterUpdatesWithMsecCounter() throws Exception {
final CounterUpdate expectedMsec =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin("SYSTEM")
.setName("start-msecs")
.setOriginalStepName("step"))
.setMetadata(new CounterMetadata().setKind("SUM")))
.setCumulative(true)
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(42));
BatchModeExecutionContext context = mock(BatchModeExecutionContext.class);
when(context.extractMetricUpdates(anyBoolean())).thenReturn(ImmutableList.of());
when(context.extractMsecCounters(anyBoolean())).thenReturn(ImmutableList.of(expectedMsec));
WorkItemStatus status = new WorkItemStatus();
when(worker.extractMetricUpdates()).thenReturn(Collections.emptyList());
statusClient.setWorker(worker, context);
statusClient.populateCounterUpdates(status);
assertThat(status.getCounterUpdates(), containsInAnyOrder(expectedMsec));
}
@Test
public void populateProgressNull() throws Exception {
WorkItemStatus status = new WorkItemStatus();
statusClient.setWorker(worker, executionContext);
statusClient.populateProgress(status);
assertThat(status.getReportedProgress(), nullValue());
}
@Test
public void populateProgress() throws Exception {
WorkItemStatus status = new WorkItemStatus();
Progress progress =
cloudProgressToReaderProgress(ReaderTestUtils.approximateProgressAtIndex(42L));
when(worker.getWorkerProgress()).thenReturn(progress);
statusClient.setWorker(worker, executionContext);
statusClient.populateProgress(status);
assertThat(
status.getReportedProgress(), equalTo(ReaderTestUtils.approximateProgressAtIndex(42L)));
}
@Test
public void populateSplitResultNativeReader() throws Exception {
WorkItemStatus status = new WorkItemStatus();
statusClient.setWorker(worker, executionContext);
Position position = ReaderTestUtils.positionAtIndex(42L);
DynamicSplitResult result =
new NativeReader.DynamicSplitResultWithPosition(new DataflowReaderPosition(position));
statusClient.populateSplitResult(status, result);
assertThat(status.getStopPosition(), equalTo(position));
assertThat(status.getDynamicSourceSplit(), nullValue());
}
@Test
public void populateSplitResultCustomReader() throws Exception {
WorkItemStatus status = new WorkItemStatus();
statusClient.setWorker(worker, executionContext);
BoundedSource<Integer> primary = new DummyBoundedSource(5);
BoundedSource<Integer> residual = new DummyBoundedSource(10);
BoundedSourceSplit<Integer> split = new BoundedSourceSplit<>(primary, residual);
statusClient.populateSplitResult(status, split);
assertThat(status.getDynamicSourceSplit(), equalTo(WorkerCustomSources.toSourceSplit(split)));
assertThat(status.getStopPosition(), nullValue());
}
@Test
public void populateSplitResultNull() throws Exception {
WorkItemStatus status = new WorkItemStatus();
statusClient.setWorker(worker, executionContext);
statusClient.populateSplitResult(status, null);
assertThat(status.getDynamicSourceSplit(), nullValue());
assertThat(status.getStopPosition(), nullValue());
}
@Test
public void reportUpdateBeforeSetWorker() throws Exception {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("setWorker");
thrown.expectMessage("reportUpdate");
statusClient.reportUpdate(null, null);
}
private static class DummyBoundedSource extends BoundedSource<Integer> {
private final int number;
public DummyBoundedSource(int number) {
this.number = number;
}
@Override
public List<? extends BoundedSource<Integer>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void validate() {
throw new UnsupportedOperationException();
}
@Override
public Coder<Integer> getDefaultOutputCoder() {
return null;
}
@Override
public int hashCode() {
return Integer.hashCode(number);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof DummyBoundedSource)) {
return false;
}
return number == ((DummyBoundedSource) obj).number;
}
}
}