blob: 67a5d0bb8d3138e737361947e90e4e9bcc1539d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterResponse;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link FnApiWindowMappingFn}. */
@RunWith(JUnit4.class)
public class FnApiWindowMappingFnTest {
private static final ApiServiceDescriptor DATA_SERVICE =
ApiServiceDescriptor.newBuilder().setUrl("test://data").build();
private static final SdkFunctionSpec WINDOW_MAPPING_SPEC =
ParDoTranslation.translateWindowMappingFn(
new GlobalWindows().getDefaultWindowMappingFn(),
SdkComponents.create(PipelineOptionsFactory.create()));
@Test
public void testWindowMapping() throws Exception {
TestSdkHarness testSdkHarness = new TestSdkHarness(GlobalWindow.INSTANCE);
FnApiWindowMappingFn windowMappingFn =
new FnApiWindowMappingFn(
IdGenerators.decrementingLongs(),
testSdkHarness,
DATA_SERVICE,
testSdkHarness,
WINDOW_MAPPING_SPEC,
IntervalWindowCoder.of(),
GlobalWindow.Coder.INSTANCE);
// Check mapping an element returns the expected result.
BoundedWindow inputWindow = new IntervalWindow(Instant.now(), Duration.standardMinutes(1));
assertEquals(GlobalWindow.INSTANCE, windowMappingFn.getSideInputWindow(inputWindow));
assertEquals(inputWindow, ((KV) testSdkHarness.getInputValues().get(0).getValue()).getValue());
// Check mapping a different element returns the expected result.
BoundedWindow inputWindow2 = new IntervalWindow(Instant.now(), Duration.standardMinutes(2));
assertEquals(GlobalWindow.INSTANCE, windowMappingFn.getSideInputWindow(inputWindow2));
assertEquals(inputWindow2, ((KV) testSdkHarness.getInputValues().get(1).getValue()).getValue());
// Check that mapping the same element returns a cached result.
assertEquals(GlobalWindow.INSTANCE, windowMappingFn.getSideInputWindow(inputWindow));
assertEquals(2, testSdkHarness.getInputValues().size());
}
/** A fake SDK harness which always returns the expected output value. */
private static class TestSdkHarness implements FnDataService, InstructionRequestHandler {
private boolean registered;
private String processBundleDescriptorId;
private BoundedWindow outputValue;
private List<WindowedValue<?>> inputValues;
private FnDataReceiver<WindowedValue<?>> inboundReceiver;
private InboundDataClient inboundDataClient;
TestSdkHarness(BoundedWindow outputValue) {
this.outputValue = outputValue;
this.inputValues = new ArrayList<>();
this.registered = false;
}
public List<WindowedValue<?>> getInputValues() {
return inputValues;
}
@Override
public <T> InboundDataClient receive(
LogicalEndpoint inputLocation,
Coder<WindowedValue<T>> coder,
FnDataReceiver<WindowedValue<T>> consumer) {
this.inboundReceiver = (FnDataReceiver) consumer;
this.inboundDataClient = CompletableFutureInboundDataClient.create();
return inboundDataClient;
}
@Override
public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
return new CloseableFnDataReceiver<WindowedValue<T>>() {
@Override
public void accept(WindowedValue<T> windowedValue) throws Exception {
inputValues.add(windowedValue);
KV<Object, Object> kv = (KV) windowedValue.getValue();
inboundReceiver.accept(windowedValue.withValue(KV.of(kv.getKey(), outputValue)));
inboundDataClient.complete();
}
@Override
public void flush() throws Exception {}
@Override
public void close() throws Exception {}
};
}
@Override
public CompletionStage<InstructionResponse> handle(InstructionRequest request) {
if (RequestCase.REGISTER.equals(request.getRequestCase())) {
assertFalse("Expected only a single registration request", registered);
registered = true;
processBundleDescriptorId =
Iterables.getOnlyElement(request.getRegister().getProcessBundleDescriptorList())
.getId();
return CompletableFuture.completedFuture(
InstructionResponse.newBuilder()
.setInstructionId(request.getInstructionId())
.setRegister(RegisterResponse.getDefaultInstance())
.build());
} else if (RequestCase.PROCESS_BUNDLE.equals(request.getRequestCase())) {
assertEquals(
processBundleDescriptorId,
request.getProcessBundle().getProcessBundleDescriptorReference());
return CompletableFuture.completedFuture(
InstructionResponse.newBuilder()
.setInstructionId(request.getInstructionId())
.setProcessBundle(ProcessBundleResponse.getDefaultInstance())
.build());
}
throw new AssertionError(String.format("Unexpected request %s", request));
}
@Override
public void close() {}
}
}