| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an AS IS BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /// An enumeration for representing various elements of a pipeline protocol buffer. This enum is used |
| /// when constructing the pipeline for submission to a portable runner. |
| enum PipelineComponent { |
| case none |
| case transform(String, PTransformProto) |
| case collection(String, PCollectionProto) |
| case coder(String, CoderProto) |
| case windowingStrategy(String, WindowingStrategyProto) |
| case environment(String, EnvironmentProto) |
| |
| /// The name of the pipeline component. Most components in the protocol buffer have a name used to reference the component throughout the pipeline structure. |
| var name: String { |
| switch self { |
| case .none: |
| fatalError("PipelineComponent not properly initialized") |
| case let .transform(n, _): |
| n |
| case let .collection(n, _): |
| n |
| case let .coder(n, _): |
| n |
| case let .windowingStrategy(n, _): |
| n |
| case let .environment(n, _): |
| n |
| } |
| } |
| |
| |
| /// The PTransfrom protocol buffer for this component. Returns ``nil`` for everything except a ``transform`` type. |
| var transform: PTransformProto? { |
| if case let .transform(_, pTransformProto) = self { |
| return pTransformProto |
| } else { |
| return nil |
| } |
| } |
| } |
| |
| // Convenience function for creating new pipeline elements. Note that these shouldn't be accessed concurrently |
| // but this isn't a problem itself since trying to access the proto concurrently throws an error. |
| extension PipelineProto { |
| |
| /// Adds a new `PTransform` protocol buffer to the pipeline along with an internal reference name to be used by other parts of the pipeline. |
| /// - Parameter mapper: A closure that returns the `PTransform` protocol buffer. Includes the reference id as a parameter in case it is needed. |
| /// - Returns: A ``PipelineComponent`` representing this `PTransform` |
| mutating func transform(_ mapper: @escaping (String) throws -> PTransformProto) throws -> PipelineComponent { |
| let name = "ref_PTransform_\(components.transforms.count + 1)" |
| let proto = try mapper(name) |
| components.transforms[name] = proto |
| return .transform(name, proto) |
| } |
| |
| /// Adds a new `PCollection` protocol buffer to the pipeline along with an internal reference name to be used elsewhere (e.g. `PTransform`s) |
| /// - Parameter mapper: A closure that returns the `PCollection` protocol buffer. It is passed the reference name in case that is useful |
| /// - Returns: A ``PipelineComponent`` representing the `PCollection` |
| mutating func collection(_ mapper: @escaping (String) throws -> PCollectionProto) throws -> PipelineComponent { |
| let name = "ref_PCollection_\(components.pcollections.count + 1)" |
| let proto = try mapper(name) |
| components.pcollections[name] = proto |
| return .collection(name, proto) |
| } |
| |
| mutating func coder(_ mapper: @escaping (String) -> CoderProto) -> PipelineComponent { |
| let name = "ref_Coder_\(components.coders.count + 1)" |
| let proto = mapper(name) |
| components.coders[name] = proto |
| return .coder(name, proto) |
| } |
| |
| mutating func windowingStrategy(_ mapper: @escaping (String) -> WindowingStrategyProto) -> PipelineComponent { |
| let name = "ref_WindowingStrategy_\(components.coders.count + 1)" |
| let proto = mapper(name) |
| components.windowingStrategies[name] = proto |
| return .windowingStrategy(name, proto) |
| } |
| |
| mutating func environment(_ mapper: @escaping (String) throws -> EnvironmentProto) throws -> PipelineComponent { |
| let name = "ref_Environment_\(components.coders.count + 1)" |
| let proto = try mapper(name) |
| components.environments[name] = proto |
| return .environment(name, proto) |
| } |
| } |