blob: 89d45955a593d82c24c38886765818116a4e6f99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.fn.control;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
import org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link BeamFnMapTaskExecutor}. */
@RunWith(JUnit4.class)
public class BeamFnMapTaskExecutorTest {
@Mock private OperationContext mockContext;
@Mock private StateDelegator mockBeamFnStateDelegator;
@Mock private StateDelegator.Registration mockStateDelegatorRegistration;
@Mock private ExecutionStateTracker executionStateTracker;
// Hacks to cause progress tracking to be done
@Mock private ReadOperation readOperation;
@Mock private RemoteGrpcPortWriteOperation grpcPortWriteOperation;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockBeamFnStateDelegator.registerForProcessBundleInstructionId(
any(String.class), any(StateRequestHandler.class)))
.thenReturn(mockStateDelegatorRegistration);
}
private static final String GRPC_READ_ID = "fake_grpc_read_id";
private static final String FAKE_OUTPUT_NAME = "fake_output_name";
private static final String FAKE_OUTPUT_PCOLLECTION_ID = "fake_pcollection_id";
private static final Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS =
Metrics.PTransform.newBuilder()
.setProcessedElements(
Metrics.PTransform.ProcessedElements.newBuilder()
.setMeasured(Metrics.PTransform.Measured.getDefaultInstance()))
.build();
private static final BeamFnApi.RegisterRequest REGISTER_REQUEST =
BeamFnApi.RegisterRequest.newBuilder()
.addProcessBundleDescriptor(
BeamFnApi.ProcessBundleDescriptor.newBuilder()
.setId("555")
.putTransforms(
GRPC_READ_ID,
RunnerApi.PTransform.newBuilder()
.setSpec(
RunnerApi.FunctionSpec.newBuilder().setUrn(RemoteGrpcPortRead.URN))
.putOutputs(FAKE_OUTPUT_NAME, FAKE_OUTPUT_PCOLLECTION_ID)
.build()))
.build();
@Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
public void testTentativeUserMetrics() throws Exception {
final String stepName = "fakeStepNameWithUserMetrics";
final String namespace = "sdk/whatever";
final String name = "someCounter";
final int counterValue = 42;
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
final Metrics.User.MetricName metricName =
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@Override
public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
switch (request.getRequestCase()) {
case REGISTER:
return CompletableFuture.completedFuture(responseFor(request).build());
case PROCESS_BUNDLE:
return MoreFutures.supplyAsync(
() -> {
processBundleLatch.await();
return responseFor(request)
.setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance())
.build();
});
case PROCESS_BUNDLE_PROGRESS:
progressSentLatch.countDown();
return CompletableFuture.completedFuture(
responseFor(request)
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
Metrics.PTransform.newBuilder()
.addUser(
Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
Metrics.User.CounterData.newBuilder()
.setValue(counterValue)))
.build())))
.build());
default:
// block forever
return new CompletableFuture<>();
}
}
@Override
public void close() {}
};
RegisterAndProcessBundleOperation processOperation =
new RegisterAndProcessBundleOperation(
IdGenerators.decrementingLongs(),
instructionRequestHandler,
mockBeamFnStateDelegator,
REGISTER_REQUEST,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
BeamFnMapTaskExecutor.forOperations(
ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
executionStateTracker);
// Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
// tentative update
CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
progressSentLatch.await();
Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
while (Iterables.size(metricsCounterUpdates) == 0) {
Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
}
assertThat(
metricsCounterUpdates,
contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(counterValue)));
// Now let it finish and clean up
processBundleLatch.countDown();
MoreFutures.get(doneFuture);
}
/** Tests that successive metric updates overwrite the previous. */
@Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
public void testTentativeUserMetricsOverwrite() throws Exception {
final String stepName = "fakeStepNameWithUserMetrics";
final String namespace = "sdk/whatever";
final String name = "someCounter";
final int firstCounterValue = 42;
final int secondCounterValue = 77;
final CountDownLatch progressSentTwiceLatch = new CountDownLatch(2);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
final Metrics.User.MetricName metricName =
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@Override
public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
switch (request.getRequestCase()) {
case REGISTER:
return CompletableFuture.completedFuture(responseFor(request).build());
case PROCESS_BUNDLE:
return MoreFutures.supplyAsync(
() -> {
processBundleLatch.await();
return responseFor(request)
.setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance())
.build();
});
case PROCESS_BUNDLE_PROGRESS:
progressSentTwiceLatch.countDown();
return CompletableFuture.completedFuture(
responseFor(request)
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
Metrics.PTransform.newBuilder()
.addUser(
Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
Metrics.User.CounterData.newBuilder()
.setValue(
progressSentTwiceLatch
.getCount()
> 0
? firstCounterValue
: secondCounterValue)))
.build())))
.build());
default:
// block forever
return new CompletableFuture<>();
}
}
@Override
public void close() {}
};
when(grpcPortWriteOperation.processedElementsConsumer()).thenReturn(elementsConsumed -> {});
RegisterAndProcessBundleOperation processOperation =
new RegisterAndProcessBundleOperation(
IdGenerators.decrementingLongs(),
instructionRequestHandler,
mockBeamFnStateDelegator,
REGISTER_REQUEST,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
BeamFnMapTaskExecutor.forOperations(
ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
executionStateTracker);
// Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
// tentative update
CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
progressSentTwiceLatch.await();
Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
while (Iterables.size(metricsCounterUpdates) == 0) {
Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
}
assertThat(
metricsCounterUpdates,
contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(secondCounterValue)));
// Now let it finish and clean up
processBundleLatch.countDown();
MoreFutures.get(doneFuture);
}
@Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
public void testFinalUserMetricsDeprecated() throws Exception {
final String stepName = "fakeStepNameWithUserMetrics";
final String namespace = "sdk/whatever";
final String name = "someCounter";
final int counterValue = 42;
final int finalCounterValue = 77;
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
final Metrics.User.MetricName metricName =
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@Override
public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
switch (request.getRequestCase()) {
case REGISTER:
return CompletableFuture.completedFuture(responseFor(request).build());
case PROCESS_BUNDLE:
return MoreFutures.supplyAsync(
() -> {
processBundleLatch.await();
return responseFor(request)
.setProcessBundle(
BeamFnApi.ProcessBundleResponse.newBuilder()
.setMetrics(
Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
Metrics.PTransform.newBuilder()
.addUser(
Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
Metrics.User.CounterData.newBuilder()
.setValue(finalCounterValue)))
.build())))
.build();
});
case PROCESS_BUNDLE_PROGRESS:
progressSentLatch.countDown();
return CompletableFuture.completedFuture(
responseFor(request)
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(
Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
Metrics.PTransform.newBuilder()
.addUser(
Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
Metrics.User.CounterData.newBuilder()
.setValue(counterValue)))
.build())))
.build());
default:
// block forever
return new CompletableFuture<>();
}
}
@Override
public void close() {}
};
RegisterAndProcessBundleOperation processOperation =
new RegisterAndProcessBundleOperation(
IdGenerators.decrementingLongs(),
instructionRequestHandler,
mockBeamFnStateDelegator,
REGISTER_REQUEST,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableTable.of(),
ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
BeamFnMapTaskExecutor.forOperations(
ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
executionStateTracker);
// Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
// tentative update
CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
progressSentLatch.await();
// TODO: add ability to wait for tentative progress update
Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
while (Iterables.size(metricsCounterUpdates) == 0) {
Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
}
// Get the final metrics
processBundleLatch.countDown();
MoreFutures.get(doneFuture);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
assertThat(Iterables.size(metricsCounterUpdates), equalTo(1));
assertThat(
metricsCounterUpdates,
contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
}
@Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 60)
public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
throws Exception {
final String stepName = "fakeStepNameWithUserMetrics";
final String namespace = "sdk/whatever";
final String name = "someCounter";
final int counterValue = 42;
final int finalCounterValue = 77;
final CountDownLatch progressSentLatch = new CountDownLatch(1);
final CountDownLatch processBundleLatch = new CountDownLatch(1);
final Metrics.User.MetricName metricName =
Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
final Metrics deprecatedMetrics =
Metrics.newBuilder()
.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
.putPtransforms(
stepName,
Metrics.PTransform.newBuilder()
.addUser(
Metrics.User.newBuilder()
.setMetricName(metricName)
.setCounterData(
Metrics.User.CounterData.newBuilder().setValue(finalCounterValue)))
.build())
.build();
final int expectedCounterValue = 5;
final MonitoringInfo expectedMonitoringInfo =
MonitoringInfo.newBuilder()
.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
.putLabels(MonitoringInfoConstants.Labels.NAME, "ExpectedCounter")
.putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "anyString")
.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64)
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "ExpectedPTransform")
.setMetric(
Metric.newBuilder()
.setCounterData(
MetricsApi.CounterData.newBuilder()
.setInt64Value(expectedCounterValue)
.build())
.build())
.build();
InstructionRequestHandler instructionRequestHandler =
new InstructionRequestHandler() {
@Override
public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
switch (request.getRequestCase()) {
case REGISTER:
return CompletableFuture.completedFuture(responseFor(request).build());
case PROCESS_BUNDLE:
return MoreFutures.supplyAsync(
() -> {
processBundleLatch.await();
return responseFor(request)
.setProcessBundle(
BeamFnApi.ProcessBundleResponse.newBuilder()
.setMetrics(deprecatedMetrics)
.addMonitoringInfos(expectedMonitoringInfo))
.build();
});
case PROCESS_BUNDLE_PROGRESS:
progressSentLatch.countDown();
return CompletableFuture.completedFuture(
responseFor(request)
.setProcessBundleProgress(
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
.setMetrics(deprecatedMetrics)
.addMonitoringInfos(expectedMonitoringInfo))
.build());
default:
throw new RuntimeException("Reached unexpected code path");
}
}
@Override
public void close() {}
};
Map<String, DataflowStepContext> stepContextMap = new HashMap<>();
stepContextMap.put("ExpectedPTransform", generateDataflowStepContext("Expected"));
RegisterAndProcessBundleOperation processOperation =
new RegisterAndProcessBundleOperation(
IdGenerators.decrementingLongs(),
instructionRequestHandler,
mockBeamFnStateDelegator,
REGISTER_REQUEST,
ImmutableMap.of(),
stepContextMap,
ImmutableMap.of(),
ImmutableTable.of(),
ImmutableMap.of(),
mockContext);
BeamFnMapTaskExecutor mapTaskExecutor =
BeamFnMapTaskExecutor.forOperations(
ImmutableList.of(readOperation, grpcPortWriteOperation, processOperation),
executionStateTracker);
// Launch the BeamFnMapTaskExecutor and wait until we are sure there has been one
// tentative update
CompletionStage<Void> doneFuture = MoreFutures.runAsync(mapTaskExecutor::execute);
progressSentLatch.await();
Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
while (Iterables.size(metricsCounterUpdates) == 0) {
Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
}
// Get the final metrics
processBundleLatch.countDown();
MoreFutures.get(doneFuture);
metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
assertThat(Iterables.size(metricsCounterUpdates), equalTo(1));
CounterUpdate resultCounter = metricsCounterUpdates.iterator().next();
assertTrue(
new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(expectedCounterValue)
.matches(resultCounter));
assertEquals(
"ExpectedCounter", resultCounter.getStructuredNameAndMetadata().getName().getName());
}
/**
* Generates bare minumum DataflowStepContext to use for testing.
*
* @param valuesPrefix prefix for all types of names that are specified in DataflowStepContext.
* @return new instance of DataflowStepContext
*/
private DataflowStepContext generateDataflowStepContext(String valuesPrefix) {
NameContext nc =
new NameContext() {
@Nullable
@Override
public String stageName() {
return valuesPrefix + "Stage";
}
@Nullable
@Override
public String originalName() {
return valuesPrefix + "OriginalName";
}
@Nullable
@Override
public String systemName() {
return valuesPrefix + "SystemName";
}
@Nullable
@Override
public String userName() {
return valuesPrefix + "UserName";
}
};
DataflowStepContext dsc =
new DataflowStepContext(nc) {
@Nullable
@Override
public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCoder) {
return null;
}
@Override
public <W extends BoundedWindow> void setStateCleanupTimer(
String timerId, W window, Coder<W> windowCoder, Instant cleanupTime) {}
@Override
public DataflowStepContext namespacedToUser() {
return this;
}
@Override
public StateInternals stateInternals() {
return null;
}
@Override
public TimerInternals timerInternals() {
return null;
}
};
return dsc;
}
private BeamFnApi.InstructionResponse.Builder responseFor(BeamFnApi.InstructionRequest request) {
return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(request.getInstructionId());
}
}