blob: 71d70a3446dee7b2263d91458f2a7a2b1383c6ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
// Utility Transforms
public extension PCollection {
/// For each item from the input ``PCollection`` emit all of the items in a static list of values.
///
/// This transform is most commonly used with ``impulse()`` to create an intial set of inputs for the pipeline. The values passed to
/// this transform are encoded in the pipeline submission making it suitable for passing information from input parameters, such as a set
/// of files to process.
///
/// Note that because these values are encoded in the pipeline itself that they are subject to the maximum size of a protocol buffer (2GB)
/// and large inputs can affect submitting the pipeline to the runner.
///
/// - Parameters:
/// - values: An array of ``Codable`` values to be emitted for each input record
/// - name: An optional name for this transform. By default it will use the source file and line number for easy debugging
///
/// - Returns: A ``PCollection`` containing contaings items of type `Value`
func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
pstream(name: name ?? "\(_file):\(_line)", type: .bounded, values) { values, input, output in
for try await (_, ts, w) in input {
for v in values {
output.emit(v, timestamp: ts, window: w)
}
}
}
}
}
// Logging Transforms
public extension PCollection {
@discardableResult
func log(prefix: String, _ name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Of> where Of == String {
pstream(name: name ?? "\(_file):\(_line)", prefix) { prefix, input, output in
for await element in input {
print("\(prefix): \(element)")
output.emit(element)
}
}
}
@discardableResult
func log<K, V>(prefix: String, _ name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<KV<K, V>> where Of == KV<K, V> {
pstream(name: name ?? "\(_file):\(_line)", prefix) { prefix, input, output in
for await element in input {
let kv = element.0
for v in kv.values {
print("\(prefix): \(kv.key),\(v)")
}
output.emit(element)
}
}
}
}
// Mapping Operations
public extension PCollection {
/// Perform a scalar transformation of input values from a ``PCollection`` to another type without modifying the window or the timestamp.
///
/// For example, the following code would return a ``PCollection<String>`` if applied to an input ``PCollection`` whose value has
/// a ``String`` property named `name`
/// ```swift
/// input.map { $0.name }
/// ```
/// Note: The return type of the mapping function is treated as a scalar even if it is an iterable type. To return multiple values from a single input
/// use ``flatMap(name:_file:_line:_:)`` instead.
///
/// - Parameters:
/// - name: An optional name for this transform. By default it will use the file name and line number for easy debugging.
/// - fn: A trailing closure specifying the mapping function
///
/// - Returns: A ``PCollection<Out>`` of scalar transformations.
func map<Out>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
pardo(name: name ?? "\(_file):\(_line)") { input, output in
output.emit(fn(input.value))
}
}
/// Map a scalar input into a tuple representing a key-value pair. This is most commonly used in conjunction with ``groupByKey()`` transformations but can
/// be used as a scalar transform to Beam's native key-value coding type.
///
/// - Parameters:
/// - name: An optional name for this transform. By default it will use the file name and line number for easy debugging.
/// - fn: A trailing closure specifying the mapping function
///
/// - Returns: A ``PCollection<KV<K,V>>`` which is encoded using Beam's native key value coding.
func map<K, V>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> (K, V)) -> PCollection<KV<K, V>> {
pardo(name: name ?? "\(_file):\(_line)") { input, output in
let (key, value) = fn(input.value)
output.emit(KV(key, value))
}
}
/// Map a ``PCollection`` to zero or more outputs by returning a ``Sequence``
///
/// - Parameters:
/// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging.
/// - fn: A trailing closure that returns a ``Sequence``.
///
/// - Returns: A ``PCollection`` of values with the type ``Sequence.Element``
///
func flatMap<S:Sequence>(name: String? = nil,_file: String = #fileID,_line: Int = #line,_ fn: @Sendable @escaping (Of) -> S) -> PCollection<S.Element> {
pardo(name: name ?? "\(_file):\(_line)") { input,output in
for i in fn(input.value) {
output.emit(i)
}
}
}
}
// Timestamp Operations
public extension PCollection {
/// Modifies the timestamps of values in the input ``PCollection`` according to user specified logic
/// - Parameters:
/// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging.
/// - fn: A trailing closure that returns the new timestamp for the input value.
///
/// - Returns: A ``PCollection`` with modified timestamps.
///
func timestamp(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Date) -> PCollection<Of> {
pstream(name: name ?? "\(_file):\(_line)") { input, output in
for try await (value, _, w) in input {
output.emit(value, timestamp: fn(value), window: w)
}
}
}
}
public extension PCollection<Never> {
/// A convience implementation of ``create(_:name:_file:_line:)-38du`` that prepends an ``impulse()`` transform to a pipeline root.
/// - Parameters:
/// - values: The values to emit when the impulse item is received
/// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging.
///
/// - Returns: A ``PCollection`` of values from the static list
func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
impulse().create(values, name: name, _file: _file, _line: _line)
}
}
public func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
let root = PCollection<Never>(type: .bounded)
return root.create(values, name: name, _file: _file, _line: _line)
}