blob: 7e298e7b43d3fe1470e0521c6c6524f81ca350f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Maps main input windows onto side input windows.
*
* <p>Note that this {@link WindowMappingFn} performs blocking calls over the data plane. The thread
* which maps side input windows should not be on the same thread that is reading from the same data
* plane channel.
*
* <p>TODO: Swap this with an implementation which streams all required window mappings per bundle
* instead of per mapping request. This requires rewriting the {@link StreamingSideInputFetcher} to
* not be inline calls and process elements over a stream.
*/
class FnApiWindowMappingFn<TargetWindowT extends BoundedWindow>
extends WindowMappingFn<TargetWindowT> {
private static final Logger LOG = LoggerFactory.getLogger(FnApiWindowMappingFn.class);
private static final byte[] EMPTY_ARRAY = new byte[0];
@AutoValue
public abstract static class CacheKey {
public static CacheKey create(SdkFunctionSpec windowMappingFn, BoundedWindow mainWindow) {
return new AutoValue_FnApiWindowMappingFn_CacheKey(windowMappingFn, mainWindow);
}
public abstract SdkFunctionSpec getWindowMappingFn();
public abstract BoundedWindow getMainWindow();
}
private static final Cache<CacheKey, BoundedWindow> sideInputMappingCache =
CacheBuilder.newBuilder().maximumSize(1000).build();
private final IdGenerator idGenerator;
private final FnDataService beamFnDataService;
private final InstructionRequestHandler instructionRequestHandler;
private final SdkFunctionSpec windowMappingFn;
private final Coder<WindowedValue<KV<byte[], BoundedWindow>>> outboundCoder;
private final Coder<WindowedValue<KV<byte[], TargetWindowT>>> inboundCoder;
private final ProcessBundleDescriptor processBundleDescriptor;
FnApiWindowMappingFn(
IdGenerator idGenerator,
InstructionRequestHandler instructionRequestHandler,
ApiServiceDescriptor dataServiceApiServiceDescriptor,
FnDataService beamFnDataService,
SdkFunctionSpec windowMappingFn,
Coder<BoundedWindow> mainInputWindowCoder,
Coder<TargetWindowT> sideInputWindowCoder) {
this.idGenerator = idGenerator;
this.instructionRequestHandler = instructionRequestHandler;
this.beamFnDataService = beamFnDataService;
this.windowMappingFn = windowMappingFn;
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(RunnerApi.Environment.getDefaultInstance());
String mainInputWindowCoderId;
String sideInputWindowCoderId;
String windowingStrategyId;
RunnerApi.Components components;
try {
mainInputWindowCoderId =
sdkComponents.registerCoder(
WindowedValue.getFullCoder(
KvCoder.of(ByteArrayCoder.of(), mainInputWindowCoder),
GlobalWindow.Coder.INSTANCE));
sideInputWindowCoderId =
sdkComponents.registerCoder(
WindowedValue.getFullCoder(
KvCoder.of(ByteArrayCoder.of(), sideInputWindowCoder),
GlobalWindow.Coder.INSTANCE));
windowingStrategyId =
sdkComponents.registerWindowingStrategy(WindowingStrategy.globalDefault());
RehydratedComponents rehydratedComponents =
RehydratedComponents.forComponents(sdkComponents.toComponents());
components = sdkComponents.toComponents();
outboundCoder =
(Coder)
CoderTranslation.fromProto(
components.getCodersOrThrow(mainInputWindowCoderId), rehydratedComponents);
inboundCoder =
(Coder)
CoderTranslation.fromProto(
components.getCodersOrThrow(sideInputWindowCoderId), rehydratedComponents);
} catch (IOException e) {
throw new IllegalStateException(
"Unable to create side input window mapping process bundle specification.", e);
}
processBundleDescriptor =
ProcessBundleDescriptor.newBuilder()
.putAllCoders(components.getCodersMap())
.putPcollections(
"inPC",
PCollection.newBuilder()
.setCoderId(mainInputWindowCoderId)
.setWindowingStrategyId(windowingStrategyId)
.build())
.putPcollections(
"outPC",
PCollection.newBuilder()
.setCoderId(sideInputWindowCoderId)
.setWindowingStrategyId(windowingStrategyId)
.build())
.putTransforms(
"read",
RunnerApi.PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(RemoteGrpcPortRead.URN)
.setPayload(
RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(dataServiceApiServiceDescriptor)
.setCoderId(mainInputWindowCoderId)
.build()
.toByteString()))
.putOutputs("in", "inPC")
.build())
.putTransforms(
"map",
RunnerApi.PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn("beam:transform:map_windows:v1")
.setPayload(windowMappingFn.toByteString()))
.putInputs("in", "inPC")
.putOutputs("out", "outPC")
.build())
.putTransforms(
"write",
RunnerApi.PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(RemoteGrpcPortWrite.URN)
.setPayload(
RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(dataServiceApiServiceDescriptor)
.setCoderId(sideInputWindowCoderId)
.build()
.toByteString()))
.putInputs("out", "outPC")
.build())
.putAllWindowingStrategies(components.getWindowingStrategiesMap())
.build();
}
@Override
public TargetWindowT getSideInputWindow(BoundedWindow mainWindow) {
try {
TargetWindowT rval =
(TargetWindowT)
sideInputMappingCache.get(
CacheKey.create(windowMappingFn, mainWindow),
() -> loadIfNeeded(windowMappingFn, mainWindow));
return rval;
} catch (ExecutionException e) {
throw new IllegalStateException(
String.format("Unable to load side input window for %s", mainWindow), e);
}
}
private TargetWindowT loadIfNeeded(SdkFunctionSpec windowMappingFn, BoundedWindow mainWindow) {
try {
String processRequestInstructionId = idGenerator.getId();
InstructionRequest processRequest =
InstructionRequest.newBuilder()
.setInstructionId(processRequestInstructionId)
.setProcessBundle(
ProcessBundleRequest.newBuilder()
.setProcessBundleDescriptorId(registerIfRequired()))
.build();
ConcurrentLinkedQueue<WindowedValue<KV<byte[], TargetWindowT>>> outputValue =
new ConcurrentLinkedQueue<>();
// Open the inbound consumer
InboundDataClient waitForInboundTermination =
beamFnDataService.receive(
LogicalEndpoint.of(processRequestInstructionId, "write"),
inboundCoder,
outputValue::add);
CompletionStage<InstructionResponse> processResponse =
instructionRequestHandler.handle(processRequest);
// Open the outbound consumer
try (CloseableFnDataReceiver<WindowedValue<KV<byte[], BoundedWindow>>> outboundConsumer =
beamFnDataService.send(
LogicalEndpoint.of(processRequestInstructionId, "read"), outboundCoder)) {
outboundConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(EMPTY_ARRAY, mainWindow)));
}
// Check to see if processing the request failed.
throwIfFailure(processResponse);
waitForInboundTermination.awaitCompletion();
WindowedValue<KV<byte[], TargetWindowT>> sideInputWindow = outputValue.poll();
checkState(
sideInputWindow != null,
"Expected side input window to have been emitted by SDK harness.");
checkState(
sideInputWindow.getValue() != null,
"Side input window emitted by SDK harness was a WindowedValue with no value in it.");
checkState(
sideInputWindow.getValue().getValue() != null,
"Side input window emitted by SDK harness was a WindowedValue<KV<...>> with a null V.");
checkState(
outputValue.isEmpty(),
"Expected only a single side input window to have been emitted by "
+ "the SDK harness but also received %s",
outputValue);
return sideInputWindow.getValue().getValue();
} catch (Throwable e) {
LOG.error("Unable to map main input window {} to side input window.", mainWindow, e);
throw new IllegalStateException(e);
}
}
/** Should only be accessed from within {@link #registerIfRequired}. */
private String processBundleDescriptorId;
/**
* Register a process bundle descriptor for this remote window mapping fn.
*
* <p>Caches the result after the first registration.
*/
private synchronized String registerIfRequired() throws ExecutionException, InterruptedException {
if (processBundleDescriptorId == null) {
String descriptorId = idGenerator.getId();
CompletionStage<InstructionResponse> response =
instructionRequestHandler.handle(
InstructionRequest.newBuilder()
.setInstructionId(idGenerator.getId())
.setRegister(
RegisterRequest.newBuilder()
.addProcessBundleDescriptor(
processBundleDescriptor.toBuilder().setId(descriptorId).build())
.build())
.build());
throwIfFailure(response);
processBundleDescriptorId = descriptorId;
}
return processBundleDescriptorId;
}
private static InstructionResponse throwIfFailure(
CompletionStage<InstructionResponse> responseFuture)
throws ExecutionException, InterruptedException {
InstructionResponse response = MoreFutures.get(responseFuture);
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IllegalStateException(
String.format(
"Client failed to process %s with error [%s].",
response.getInstructionId(), response.getError()));
}
return response;
}
}