blob: abbde2512bc3cd98ed7f5f8c9371051a9f398571 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.util.TimeUtil.toCloudDuration;
import static org.apache.beam.runners.dataflow.util.TimeUtil.toCloudTime;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
import static org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.toDynamicSplitRequest;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.api.client.testing.http.FixedClock;
import com.google.api.services.dataflow.model.HotKeyDetection;
import com.google.api.services.dataflow.model.Position;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.StubbedExecutor;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Unit tests for {@link DataflowWorkProgressUpdater}. */
@RunWith(JUnit4.class)
public class DataflowWorkProgressUpdaterTest {
private static final long LEASE_MS = 2000;
private static final String PROJECT_ID = "TEST_PROJECT_ID";
private static final String JOB_ID = "TEST_JOB_ID";
private static final Long WORK_ID = 1234567890L;
private static final String STEP_ID = "TEST_STEP_ID";
private static final Duration HOT_KEY_AGE = Duration.standardSeconds(1);
@Rule public final ExpectedException thrown = ExpectedException.none();
private StubbedExecutor executor;
private DataflowWorkProgressUpdater progressUpdater;
private long startTime;
private FixedClock clock;
@Mock private WorkItemStatusClient workItemStatusClient;
@Mock private DataflowWorkExecutor worker;
@Mock private HotKeyLogger hotKeyLogger;
@Captor private ArgumentCaptor<DynamicSplitResult> splitResultCaptor;
@Before
public void initMocksAndWorkflowServiceAndWorkerAndWork() {
MockitoAnnotations.initMocks(this);
startTime = 0L;
clock = new FixedClock(startTime);
executor = new StubbedExecutor(clock);
WorkItem workItem = new WorkItem();
workItem.setProjectId(PROJECT_ID);
workItem.setJobId(JOB_ID);
workItem.setId(WORK_ID);
workItem.setLeaseExpireTime(toCloudTime(new Instant(clock.currentTimeMillis() + 1000)));
workItem.setReportStatusInterval(toCloudDuration(Duration.millis(300)));
workItem.setInitialReportIndex(1L);
progressUpdater =
new DataflowWorkProgressUpdater(
workItemStatusClient, workItem, worker, executor.getExecutor(), clock, hotKeyLogger) {
// Shorten reporting interval boundaries for faster testing.
@Override
protected long getMinReportingInterval() {
return 100;
}
@Override
protected long getLeaseRenewalLatencyMargin() {
return 150;
}
};
}
@Test
public void workProgressSendsAnUpdate() throws Exception {
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(generateServiceState(null, 1000));
progressUpdater.startReportingProgress();
executor.runNextRunnable();
// The initial update should be sent at startTime + 300 ms.
assertEquals(clock.currentTimeMillis(), startTime + 300);
verify(workItemStatusClient, atLeastOnce())
.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class));
progressUpdater.stopReportingProgress();
}
@Test
public void workProgressLogsHotKeyDetection() throws Exception {
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(generateServiceState(null, 1000));
progressUpdater.startReportingProgress();
executor.runNextRunnable();
verify(hotKeyLogger, atLeastOnce()).logHotKeyDetection(STEP_ID, HOT_KEY_AGE);
progressUpdater.stopReportingProgress();
}
/** Verifies that the update after a split is requested acknowledeges it. */
@Test
public void workProgressSendsSplitResults() throws Exception {
// The setup process sends one update after 300ms. Enqueue another that should be scheduled
// 1000ms after that.
WorkItemServiceState firstResponse =
generateServiceState(ReaderTestUtils.positionAtIndex(2L), 1000);
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(firstResponse);
when(worker.getWorkerProgress())
.thenReturn(cloudProgressToReaderProgress(ReaderTestUtils.approximateProgressAtIndex(1L)));
when(worker.requestDynamicSplit(toDynamicSplitRequest(firstResponse.getSplitRequest())))
.thenReturn(
new NativeReader.DynamicSplitResultWithPosition(
cloudPositionToReaderPosition(firstResponse.getSplitRequest().getPosition())));
progressUpdater.startReportingProgress();
executor.runNextRunnable();
// The initial update should be sent at startTime + 300 ms.
assertEquals(clock.currentTimeMillis(), startTime + 300);
verify(workItemStatusClient)
.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class));
verify(worker).requestDynamicSplit(isA(DynamicSplitRequest.class));
// The second update should be sent at startTime + 1300 ms and includes the split response.
// Also schedule another update after 1000ms.
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(generateServiceState(null, 1000));
executor.runNextRunnable();
assertEquals(clock.currentTimeMillis(), startTime + 1300);
// Verify that the update includes the respuonse to the split request.
verify(workItemStatusClient, atLeastOnce())
.reportUpdate(splitResultCaptor.capture(), isA(Duration.class));
assertEquals(
"Second update is sent and contains the latest split result",
splitResultCaptor.getValue(),
new NativeReader.DynamicSplitResultWithPosition(
cloudPositionToReaderPosition(ReaderTestUtils.positionAtIndex(2L))));
executor.runNextRunnable();
verify(workItemStatusClient, times(3))
.reportUpdate(splitResultCaptor.capture(), isA(Duration.class));
// Stop the progressUpdater now, and expect the last update immediately
progressUpdater.stopReportingProgress();
}
@Test
public void workProgressAdaptsNextDuration() throws Exception {
progressUpdater.startReportingProgress();
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(generateServiceState(null, 1000));
executor.runNextRunnable();
assertEquals(clock.currentTimeMillis(), startTime + 300);
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(generateServiceState(null, 400));
executor.runNextRunnable();
assertEquals(clock.currentTimeMillis(), startTime + 1300);
executor.runNextRunnable();
assertEquals(clock.currentTimeMillis(), startTime + 1700);
progressUpdater.stopReportingProgress();
}
/** Verifies that a last update is sent when there is an unacknowledged split request. */
@Test
public void workProgressUpdaterSendsLastPendingUpdateWhenStopped() throws Exception {
// The setup process sends one update after 300ms. Enqueue another that should be scheduled
// 1000ms after that.
WorkItemServiceState firstResponse =
generateServiceState(ReaderTestUtils.positionAtIndex(2L), 1000);
when(workItemStatusClient.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class)))
.thenReturn(firstResponse);
when(worker.getWorkerProgress())
.thenReturn(cloudProgressToReaderProgress(ReaderTestUtils.approximateProgressAtIndex(1L)));
when(worker.requestDynamicSplit(toDynamicSplitRequest(firstResponse.getSplitRequest())))
.thenReturn(
new NativeReader.DynamicSplitResultWithPosition(
cloudPositionToReaderPosition(firstResponse.getSplitRequest().getPosition())));
progressUpdater.startReportingProgress();
executor.runNextRunnable();
// The initial update should be sent at startTime + 300 ms.
assertEquals(clock.currentTimeMillis(), startTime + 300);
verify(workItemStatusClient)
.reportUpdate(isNull(DynamicSplitResult.class), isA(Duration.class));
verify(worker).requestDynamicSplit(isA(DynamicSplitRequest.class));
clock.setTime(clock.currentTimeMillis() + 100);
// Stop the progressUpdater now, and expect the last update immediately
progressUpdater.stopReportingProgress();
assertEquals(clock.currentTimeMillis(), startTime + 400);
// Verify that the last update is sent immediately and contained the latest split result.
verify(workItemStatusClient, atLeastOnce())
.reportUpdate(splitResultCaptor.capture(), isA(Duration.class));
assertEquals(
"Final update is sent and contains the latest split result",
splitResultCaptor.getValue(),
new NativeReader.DynamicSplitResultWithPosition(
cloudPositionToReaderPosition(ReaderTestUtils.positionAtIndex(2L))));
// And nothing happened after that.
verify(workItemStatusClient, Mockito.atLeastOnce()).uniqueWorkId();
verifyNoMoreInteractions(workItemStatusClient);
}
private WorkItemServiceState generateServiceState(
@Nullable Position suggestedStopPosition, long millisToNextUpdate) {
WorkItemServiceState responseState = new WorkItemServiceState();
responseState.setFactory(Transport.getJsonFactory());
responseState.setLeaseExpireTime(
toCloudTime(new Instant(clock.currentTimeMillis() + LEASE_MS)));
responseState.setReportStatusInterval(toCloudDuration(Duration.millis(millisToNextUpdate)));
if (suggestedStopPosition != null) {
responseState.setSplitRequest(
ReaderTestUtils.approximateSplitRequestAtPosition(suggestedStopPosition));
}
HotKeyDetection hotKeyDetection = new HotKeyDetection();
hotKeyDetection.setUserStepName(STEP_ID);
hotKeyDetection.setHotKeyAge(toCloudDuration(HOT_KEY_AGE));
responseState.setHotKeyDetection(hotKeyDetection);
return responseState;
}
}