| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.fn.harness.data; |
| |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.beam.fn.harness.control.ProcessBundleHandler; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; |
| import org.apache.beam.model.pipeline.v1.Endpoints; |
| import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; |
| import org.apache.beam.sdk.fn.data.FnDataReceiver; |
| import org.apache.beam.sdk.fn.data.InboundDataClient; |
| import org.apache.beam.sdk.fn.data.LogicalEndpoint; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed in the |
| * thread which calls @{link #drainAndBlock}. |
| */ |
| public class QueueingBeamFnDataClient implements BeamFnDataClient { |
| |
| private static final int QUEUE_SIZE = 1000; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(QueueingBeamFnDataClient.class); |
| |
| private final BeamFnDataClient mainClient; |
| private final LinkedBlockingQueue<ConsumerAndData> queue; |
| private final ConcurrentHashMap<InboundDataClient, Object> inboundDataClients; |
| |
| public QueueingBeamFnDataClient(BeamFnDataClient mainClient) { |
| this.mainClient = mainClient; |
| this.queue = new LinkedBlockingQueue<ConsumerAndData>(QUEUE_SIZE); |
| this.inboundDataClients = new ConcurrentHashMap<>(); |
| } |
| |
| @Override |
| public <T> InboundDataClient receive( |
| ApiServiceDescriptor apiServiceDescriptor, |
| LogicalEndpoint inputLocation, |
| Coder<WindowedValue<T>> coder, |
| FnDataReceiver<WindowedValue<T>> consumer) { |
| LOG.debug( |
| "Registering consumer for instruction {} and target {}", |
| inputLocation.getInstructionId(), |
| inputLocation.getTarget()); |
| |
| QueueingFnDataReceiver<T> queueingConsumer = new QueueingFnDataReceiver<T>(consumer); |
| InboundDataClient inboundDataClient = |
| this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, queueingConsumer); |
| queueingConsumer.inboundDataClient = inboundDataClient; |
| this.inboundDataClients.computeIfAbsent( |
| inboundDataClient, (InboundDataClient idcToStore) -> idcToStore); |
| return inboundDataClient; |
| } |
| |
| // Returns true if all the InboundDataClients have finished or cancelled and no WindowedValues |
| // remain on the queue. |
| private boolean allDone() { |
| for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) { |
| if (!inboundDataClient.isDone()) { |
| return false; |
| } |
| } |
| if (!this.queue.isEmpty()) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Drains the internal queue of this class, by waiting for all WindowedValues to be passed to |
| * their consumers. The thread which wishes to process() the elements should call this method, as |
| * this will cause the consumers to invoke element processing. All receive() and send() calls must |
| * be made prior to calling drainAndBlock, in order to properly terminate. |
| * |
| * <p>All {@link InboundDataClient}s will be failed if processing throws an exception. |
| * |
| * <p>This method is NOT thread safe. This should only be invoked by a single thread, and is |
| * intended for use with a newly constructed QueueingBeamFnDataClient in {@link |
| * ProcessBundleHandler#processBundle(InstructionRequest)}. |
| */ |
| public void drainAndBlock() throws Exception { |
| while (true) { |
| try { |
| ConsumerAndData tuple = queue.poll(200, TimeUnit.MILLISECONDS); |
| if (tuple != null) { |
| // Forward to the consumers who cares about this data. |
| tuple.consumer.accept(tuple.data); |
| } else { |
| // Note: We do not expect to ever hit this point without receiving all values |
| // as (1) The InboundObserver will not be set to Done until the |
| // QueuingFnDataReceiver.accept() call returns and will not be invoked again. |
| // (2) The QueueingFnDataReceiver will not return until the value is received in |
| // drainAndBlock, because of the use of the SynchronousQueue. |
| if (allDone()) { |
| break; |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("Client failed to dequeue and process WindowedValue", e); |
| for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) { |
| inboundDataClient.fail(e); |
| } |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public <T> CloseableFnDataReceiver<WindowedValue<T>> send( |
| Endpoints.ApiServiceDescriptor apiServiceDescriptor, |
| LogicalEndpoint outputLocation, |
| Coder<WindowedValue<T>> coder) { |
| LOG.debug( |
| "Creating output consumer for instruction {} and target {}", |
| outputLocation.getInstructionId(), |
| outputLocation.getTarget()); |
| return this.mainClient.send(apiServiceDescriptor, outputLocation, coder); |
| } |
| |
| /** |
| * The QueueingFnDataReceiver is a a FnDataReceiver used by the QueueingBeamFnDataClient. |
| * |
| * <p>All {@link #accept accept()ed} values will be put onto a synchronous queue which will cause |
| * the calling thread to block until {@link QueueingBeamFnDataClient#drainAndBlock} is called. |
| * {@link QueueingBeamFnDataClient#drainAndBlock} is responsible for processing values from the |
| * queue. |
| */ |
| public class QueueingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> { |
| private final FnDataReceiver<WindowedValue<T>> consumer; |
| public InboundDataClient inboundDataClient; |
| |
| public QueueingFnDataReceiver(FnDataReceiver<WindowedValue<T>> consumer) { |
| this.consumer = consumer; |
| } |
| |
| /** |
| * This method is thread safe, we expect multiple threads to call this, passing in data when new |
| * data arrives via the QueueingBeamFnDataClient's mainClient. |
| */ |
| @Override |
| public void accept(WindowedValue<T> value) throws Exception { |
| try { |
| ConsumerAndData offering = new ConsumerAndData(this.consumer, value); |
| while (!queue.offer(offering, 200, TimeUnit.MILLISECONDS)) { |
| if (inboundDataClient.isDone()) { |
| // If it was cancelled by the consuming side of the queue. |
| break; |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to insert WindowedValue into the queue", e); |
| inboundDataClient.fail(e); |
| throw e; |
| } |
| } |
| } |
| |
| static class ConsumerAndData<T> { |
| public FnDataReceiver<WindowedValue<T>> consumer; |
| public WindowedValue<T> data; |
| |
| public ConsumerAndData(FnDataReceiver<WindowedValue<T>> receiver, WindowedValue<T> data) { |
| this.consumer = receiver; |
| this.data = data; |
| } |
| } |
| } |