blob: ed9b452138693be9c843ed9b61bd208fbbe1a006 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
/// Creating Static Values
public extension PCollection {
/// Each time the input fires output all of the values in this list.
func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
pstream(name: name ?? "\(_file):\(_line)", type: .bounded, values) { values, input, output in
for try await (_, ts, w) in input {
for v in values {
output.emit(v, timestamp: ts, window: w)
}
}
}
}
}
/// Convenience logging mappers
public extension PCollection {
@discardableResult
func log(prefix: String, _ name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Of> where Of == String {
pstream(name: name ?? "\(_file):\(_line)", prefix) { prefix, input, output in
for await element in input {
print("\(prefix): \(element)")
output.emit(element)
}
}
}
@discardableResult
func log<K, V>(prefix: String, _ name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<KV<K, V>> where Of == KV<K, V> {
pstream(name: name ?? "\(_file):\(_line)", prefix) { prefix, input, output in
for await element in input {
let kv = element.0
for v in kv.values {
print("\(prefix): \(kv.key),\(v)")
}
output.emit(element)
}
}
}
}
/// Modifying Values
public extension PCollection {
/// Modify a value without changing its window or timestamp
func map<Out>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
pardo(name: name ?? "\(_file):\(_line)") { input, output in
output.emit(fn(input.value))
}
}
/// Map function to convert a tuple pair into an encodable key-value pair
func map<K, V>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> (K, V)) -> PCollection<KV<K, V>> {
pardo(name: name ?? "\(_file):\(_line)") { input, output in
let (key, value) = fn(input.value)
output.emit(KV(key, value))
}
}
/// Produce multiple outputs as a single value without modifying window or timestamp
func flatMap<Out>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> {
pardo(name: name ?? "\(_file):\(_line)") { input, output in
for i in fn(input.value) {
output.emit(i)
}
}
}
}
/// Modifying Timestamps
public extension PCollection {
func timestamp(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Date) -> PCollection<Of> {
pstream(name: name ?? "\(_file):\(_line)") { input, output in
for try await (value, _, w) in input {
output.emit(value, timestamp: fn(value), window: w)
}
}
}
}
public extension PCollection<Never> {
/// Convenience function to add an impulse when we are at the root of the pipeline
func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
impulse().create(values, name: name, _file: _file, _line: _line)
}
}
public func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> {
let root = PCollection<Never>(type: .bounded)
return root.create(values, name: name, _file: _file, _line: _line)
}