blob: 72c613b0d53b5fdce7fde42ff6d1b347d31d3d71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.data;
import java.io.IOException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A buffering outbound {@link FnDataReceiver} for the Beam Fn Data API.
*
* <p>Encodes individually consumed elements with the provided {@link Coder} producing a single
* {@link BeamFnApi.Elements} message when the buffer threshold is surpassed.
*
* <p>The default buffer threshold can be overridden by specifying the experiment {@code
* beam_fn_api_data_buffer_limit=<bytes>}
*
* <p>TODO: Handle outputting large elements (&gt; 2GiBs). Note that this also applies to the input
* side as well.
*
* <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as a marker,
* detect on the input side that no bytes were read and force reading a single byte.
*/
public class BeamFnDataBufferingOutboundObserver<T>
implements CloseableFnDataReceiver<WindowedValue<T>> {
// TODO: Consider moving this constant out of this class
public static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
@VisibleForTesting static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
public static <T> BeamFnDataBufferingOutboundObserver<T> forLocation(
LogicalEndpoint endpoint,
Coder<WindowedValue<T>> coder,
StreamObserver<BeamFnApi.Elements> outboundObserver) {
return forLocationWithBufferLimit(
DEFAULT_BUFFER_LIMIT_BYTES, endpoint, coder, outboundObserver);
}
public static <T> BeamFnDataBufferingOutboundObserver<T> forLocationWithBufferLimit(
int bufferLimit,
LogicalEndpoint endpoint,
Coder<WindowedValue<T>> coder,
StreamObserver<BeamFnApi.Elements> outboundObserver) {
return new BeamFnDataBufferingOutboundObserver<>(
bufferLimit, endpoint, coder, outboundObserver);
}
private long byteCounter;
private long counter;
private boolean closed;
private final int bufferLimit;
private final Coder<WindowedValue<T>> coder;
private final LogicalEndpoint outputLocation;
private final StreamObserver<BeamFnApi.Elements> outboundObserver;
private final ByteString.Output bufferedElements;
private BeamFnDataBufferingOutboundObserver(
int bufferLimit,
LogicalEndpoint outputLocation,
Coder<WindowedValue<T>> coder,
StreamObserver<BeamFnApi.Elements> outboundObserver) {
this.bufferLimit = bufferLimit;
this.outputLocation = outputLocation;
this.coder = coder;
this.outboundObserver = outboundObserver;
this.bufferedElements = ByteString.newOutput();
this.closed = false;
}
@Override
public void close() throws Exception {
if (closed) {
throw new IllegalStateException("Already closed.");
}
closed = true;
BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
// This will add an empty data block representing the end of stream.
elements
.addDataBuilder()
.setInstructionReference(outputLocation.getInstructionId())
.setTarget(outputLocation.getTarget());
LOG.debug(
"Closing stream for instruction {} and "
+ "target {} having transmitted {} values {} bytes",
outputLocation.getInstructionId(),
outputLocation.getTarget(),
counter,
byteCounter);
outboundObserver.onNext(elements.build());
}
@Override
public void flush() throws IOException {
if (bufferedElements.size() > 0) {
outboundObserver.onNext(convertBufferForTransmission().build());
}
}
@Override
public void accept(WindowedValue<T> t) throws IOException {
if (closed) {
throw new IllegalStateException("Already closed.");
}
coder.encode(t, bufferedElements);
counter += 1;
if (bufferedElements.size() >= bufferLimit) {
flush();
}
}
private BeamFnApi.Elements.Builder convertBufferForTransmission() {
BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
if (bufferedElements.size() == 0) {
return elements;
}
elements
.addDataBuilder()
.setInstructionReference(outputLocation.getInstructionId())
.setTarget(outputLocation.getTarget())
.setData(bufferedElements.toByteString());
byteCounter += bufferedElements.size();
bufferedElements.reset();
return elements;
}
}