| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker.windmill; |
| |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.ThreadSafe; |
| import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.net.HostAndPort; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| |
| /** Stub for communicating with a Windmill server. */ |
| public abstract class WindmillServerStub implements StatusDataProvider { |
| |
| /** |
| * Sets the new endpoints used to talk to windmill. Upon first call, the stubs are initialized. On |
| * subsequent calls, if endpoints are different from previous values new stubs are created, |
| * replacing the previous ones. |
| */ |
| public abstract void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws IOException; |
| |
| /** Returns true iff this WindmillServerStub is ready for making API calls. */ |
| public abstract boolean isReady(); |
| |
| /** Get a batch of work to process. */ |
| public abstract Windmill.GetWorkResponse getWork(Windmill.GetWorkRequest request); |
| |
| /** Get additional data such as state needed to process work. */ |
| public abstract Windmill.GetDataResponse getData(Windmill.GetDataRequest request); |
| |
| /** Commit the work, issuing any output productions, state modifications etc. */ |
| public abstract Windmill.CommitWorkResponse commitWork(Windmill.CommitWorkRequest request); |
| |
| /** Get configuration data from the server. */ |
| public abstract Windmill.GetConfigResponse getConfig(Windmill.GetConfigRequest request); |
| |
| /** Report execution information to the server. */ |
| public abstract Windmill.ReportStatsResponse reportStats(Windmill.ReportStatsRequest request); |
| |
| /** Functional interface for receiving WorkItems. */ |
| @FunctionalInterface |
| public interface WorkItemReceiver { |
| void receiveWork( |
| String computation, |
| @Nullable Instant inputDataWatermark, |
| Instant synchronizedProcessingTime, |
| Windmill.WorkItem workItem); |
| } |
| |
| /** |
| * Gets work to process, returned as a stream. |
| * |
| * <p>Each time a WorkItem is received, it will be passed to the given receiver. The returned |
| * GetWorkStream object can be used to control the lifetime of the stream. |
| */ |
| public abstract GetWorkStream getWorkStream( |
| Windmill.GetWorkRequest request, WorkItemReceiver receiver); |
| |
| /** Get additional data such as state needed to process work, returned as a stream. */ |
| public abstract GetDataStream getDataStream(); |
| |
| /** Returns a stream allowing individual WorkItemCommitRequests to be streamed to Windmill. */ |
| public abstract CommitWorkStream commitWorkStream(); |
| |
| /** Returns the amount of time the server has been throttled and resets the time to 0. */ |
| public abstract long getAndResetThrottleTime(); |
| |
| @Override |
| public void appendSummaryHtml(PrintWriter writer) {} |
| |
| /** Superclass for streams returned by streaming Windmill methods. */ |
| @ThreadSafe |
| public interface WindmillStream { |
| /** Indicates that no more requests will be sent. */ |
| void close(); |
| |
| /** Waits for the server to close its end of the connection, with timeout. */ |
| boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException; |
| |
| /** |
| * Cleanly closes the stream after implementation-speficied timeout, unless the stream is |
| * aborted before the timeout is reached. |
| */ |
| void closeAfterDefaultTimeout() throws InterruptedException; |
| |
| /** Returns when the stream was opened. */ |
| Instant startTime(); |
| } |
| |
| /** Handle representing a stream of GetWork responses. */ |
| @ThreadSafe |
| public interface GetWorkStream extends WindmillStream {} |
| |
| /** Interface for streaming GetDataRequests to Windmill. */ |
| @ThreadSafe |
| public interface GetDataStream extends WindmillStream { |
| /** Issues a keyed GetData fetch, blocking until the result is ready. */ |
| KeyedGetDataResponse requestKeyedData(String computation, Windmill.KeyedGetDataRequest request); |
| |
| /** Issues a global GetData fetch, blocking until the result is ready. */ |
| Windmill.GlobalData requestGlobalData(Windmill.GlobalDataRequest request); |
| |
| /** Tells windmill processing is ongoing for the given keys. */ |
| void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> active); |
| } |
| |
| /** Interface for streaming CommitWorkRequests to Windmill. */ |
| @ThreadSafe |
| public interface CommitWorkStream extends WindmillStream { |
| /** |
| * Commits a work item and running onDone when the commit has been processed by the server. |
| * Returns true if the request was accepted. If false is returned the stream should be flushed |
| * and the request recommitted. |
| * |
| * <p>onDone will be called with the status of the commit. |
| */ |
| boolean commitWorkItem( |
| String computation, Windmill.WorkItemCommitRequest request, Consumer<CommitStatus> onDone); |
| |
| /** Flushes any pending work items to the wire. */ |
| void flush(); |
| } |
| |
| /** |
| * Pool of homogeneous streams to Windmill. |
| * |
| * <p>The pool holds a fixed total number of streams, and keeps each stream open for a specified |
| * time to allow for better load-balancing. |
| */ |
| @ThreadSafe |
| public static class StreamPool<S extends WindmillStream> { |
| |
| private final Duration streamTimeout; |
| |
| private final class StreamData { |
| final S stream = supplier.get(); |
| int holds = 1; |
| }; |
| |
| private final List<StreamData> streams; |
| private final Supplier<S> supplier; |
| private final HashMap<S, StreamData> holds; |
| |
| public StreamPool(int numStreams, Duration streamTimeout, Supplier<S> supplier) { |
| this.streams = new ArrayList<>(numStreams); |
| for (int i = 0; i < numStreams; i++) { |
| streams.add(null); |
| } |
| this.streamTimeout = streamTimeout; |
| this.supplier = supplier; |
| this.holds = new HashMap<>(); |
| } |
| |
| // Returns a stream for use that may be cached from a previous call. Each call of getStream |
| // must be matched with a call of releaseStream. |
| public S getStream() { |
| int index = ThreadLocalRandom.current().nextInt(streams.size()); |
| S result; |
| S closeStream = null; |
| synchronized (this) { |
| StreamData streamData = streams.get(index); |
| if (streamData == null |
| || streamData.stream.startTime().isBefore(Instant.now().minus(streamTimeout))) { |
| if (streamData != null && --streamData.holds == 0) { |
| holds.remove(streamData.stream); |
| closeStream = streamData.stream; |
| } |
| streamData = new StreamData(); |
| streams.set(index, streamData); |
| holds.put(streamData.stream, streamData); |
| } |
| streamData.holds++; |
| result = streamData.stream; |
| } |
| if (closeStream != null) { |
| closeStream.close(); |
| } |
| return result; |
| } |
| |
| // Releases a stream that was obtained with getStream. |
| public void releaseStream(S stream) { |
| boolean closeStream = false; |
| synchronized (this) { |
| if (--holds.get(stream).holds == 0) { |
| closeStream = true; |
| holds.remove(stream); |
| } |
| } |
| if (closeStream) { |
| stream.close(); |
| } |
| } |
| } |
| |
| /** Generic Exception type for implementors to use to represent errors while making RPCs. */ |
| public static class RpcException extends RuntimeException { |
| public RpcException() { |
| super(); |
| } |
| |
| public RpcException(Throwable cause) { |
| super(cause); |
| } |
| |
| public RpcException(String message, Throwable cause) { |
| super(message, cause); |
| } |
| } |
| } |