blob: 5ee71d5f7fe8c2ca38463a84421e5ae54fffa0f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
public extension PCollection where Of == Never {
/// Impulse the most basic transform. It can only be attached to PCollections of type Never,
/// which is the root transform used by Pipelines.
func impulse() -> PCollection<Data> {
let output = PCollection<Data>(type: .bounded)
apply(.impulse(AnyPCollection(self), AnyPCollection(output)))
return output
}
}
func _resolve(_ parent: some PCollectionProtocol, _ type: StreamType) -> StreamType {
switch type {
case .unspecified:
parent.streamType
default:
type
}
}
/// ParDo is the core user operator that pretty much everything else gets built on. We provide two versions here
public extension PCollection {
// TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved
// No Output
func pstream(name: String, _ fn: some SerializableFn) {
apply(.pardo(AnyPCollection(self), name, fn, []))
}
func pstream(name: String, _ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
apply(.pardo(AnyPCollection(self), name, ClosureFn(fn), []))
}
func pstream<Param: Codable>(name: String, _ param: Param, _ fn: @Sendable @escaping (Param, PCollection<Of>.Stream) async throws -> Void) {
apply(.pardo(AnyPCollection(self), name, ParameterizedClosureFn(param, fn), []))
}
// No Output Generated Names
func pstream(_file: String = #fileID, _line: Int = #line, _ fn: some SerializableFn) {
pstream(name: "\(_file):\(_line)", fn)
}
func pstream(_file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
pstream(name: "\(_file):\(_line)", fn)
}
func pstream<Param: Codable>(_file: String = #fileID, _line: Int = #line, _ param: Param, _ fn: @Sendable @escaping (Param, PCollection<Of>.Stream) async throws -> Void) {
pstream(name: "\(_file):\(_line)", param, fn)
}
// Single Output
func pstream(name: String, _ fn: some SerializableFn,
_ o0: PCollection<some Any>)
{
apply(.pardo(AnyPCollection(self), name, fn, [AnyPCollection(o0)]))
}
func pstream<O0>(name: String, type: StreamType = .unspecified,
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>)
{
let output = PCollection<O0>(type: _resolve(self, type))
apply(.pardo(AnyPCollection(self), name, ClosureFn(fn), [AnyPCollection(output)]))
return output
}
func pstream<Param: Codable, O0>(name: String, type: StreamType = .unspecified, _ param: Param, _ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
let output = PCollection<O0>(type: _resolve(self, type))
apply(.pardo(AnyPCollection(self), name, ParameterizedClosureFn(param, fn), [AnyPCollection(output)]))
return output
}
// Single Output Generated Names
func pstream(_file: String = #fileID, _line: Int = #line, fn: some SerializableFn,
_ o0: PCollection<some Any>)
{
pstream(name: "\(_file):\(_line)", fn, o0)
}
func pstream<O0>(type: StreamType = .unspecified, _file: String = #fileID, _line: Int = #line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>)
{
pstream(name: "\(_file):\(_line)", type: type, fn)
}
func pstream<Param: Codable, O0>(type: StreamType = .unspecified, _file: String = #fileID, _line: Int = #line, _ param: Param, _ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
pstream(name: "\(_file):\(_line)", type: type, param, fn)
}
// Two Outputs
func pstream(name: String, _ fn: some SerializableFn,
_ o0: PCollection<some Any>, _ o1: PCollection<some Any>)
{
apply(.pardo(AnyPCollection(self), name, fn, [AnyPCollection(o0), AnyPCollection(o1)]))
}
func pstream<O0, O1>(name: String, type: (StreamType, StreamType) = (.unspecified, .unspecified),
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>)
{
let output = (PCollection<O0>(type: _resolve(self, type.0)), PCollection<O1>(type: _resolve(self, type.1)))
let parent = apply(.pardo(AnyPCollection(self), name, ClosureFn(fn), [AnyPCollection(output.0), AnyPCollection(output.1)]))
output.0.parent(parent)
output.1.parent(parent)
return output
}
func pstream<Param: Codable, O0, O1>(name: String, type: (StreamType, StreamType) = (.unspecified, .unspecified), _ param: Param,
_ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>)
{
let output = (PCollection<O0>(type: _resolve(self, type.0)), PCollection<O1>(type: _resolve(self, type.1)))
let parent = apply(.pardo(AnyPCollection(self), name, ParameterizedClosureFn(param, fn), [AnyPCollection(output.0), AnyPCollection(output.1)]))
output.0.parent(parent)
output.1.parent(parent)
return output
}
// Two Outputs Generated Names
func pstream(_file: String = #fileID, _line: Int = #line, _ fn: some SerializableFn,
_ o0: PCollection<some Any>, _ o1: PCollection<some Any>)
{
pstream(name: "\(_file):\(_line)", fn, o0, o1)
}
func pstream<O0, O1>(_file: String = #fileID, _line: Int = #line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>)
{
pstream(name: "\(_file):\(_line)", fn)
}
func pstream<Param: Codable, O0, O1>(_file: String = #fileID, _line: Int = #line, _ param: Param,
_ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>)
{
pstream(name: "\(_file):\(_line)", param, fn)
}
// Three Outputs
func pstream(name: String, _ fn: some SerializableFn,
_ o0: PCollection<some Any>, _ o1: PCollection<some Any>, _ o2: PCollection<some Any>)
{
apply(.pardo(AnyPCollection(self), name, fn, [AnyPCollection(o0), AnyPCollection(o1), AnyPCollection(o2)]))
}
func pstream<O0, O1, O2>(name: String,
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream, PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>, PCollection<O2>)
{
let output = (PCollection<O0>(), PCollection<O1>(), PCollection<O2>())
let parent = apply(.pardo(AnyPCollection(self), name, ClosureFn(fn), [AnyPCollection(output.0), AnyPCollection(output.1), AnyPCollection(output.2)]))
output.0.parent(parent)
output.1.parent(parent)
output.2.parent(parent)
return output
}
func pstream<Param: Codable, O0, O1, O2>(name: String, _ param: Param,
_ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream, PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>, PCollection<O2>)
{
let output = (PCollection<O0>(), PCollection<O1>(), PCollection<O2>())
let parent = apply(.pardo(AnyPCollection(self), name, ParameterizedClosureFn(param, fn), [AnyPCollection(output.0), AnyPCollection(output.1), AnyPCollection(output.2)]))
output.0.parent(parent)
output.1.parent(parent)
output.2.parent(parent)
return output
}
// Three Outputs Generated Names
func pstream(_file: String = #fileID, _line: Int = #line, _ fn: some SerializableFn,
_ o0: PCollection<some Any>, _ o1: PCollection<some Any>, _ o2: PCollection<some Any>)
{
pstream(name: "\(_file):\(_line)", fn, o0, o1, o2)
}
func pstream<O0, O1, O2>(_file: String = #fileID, _line: Int = #line,
_ fn: @Sendable @escaping (PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream, PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>, PCollection<O2>)
{
pstream(name: "\(_file):\(_line)", fn)
}
func pstream<Param: Codable, O0, O1, O2>(_file: String = #fileID, _line: Int = #line, _ param: Param,
_ fn: @Sendable @escaping (Param, PCollection<Of>.Stream, PCollection<O0>.Stream, PCollection<O1>.Stream, PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>, PCollection<O1>, PCollection<O2>)
{
pstream(name: "\(_file):\(_line)", param, fn)
}
// TODO: Add more as needed
}
public extension PCollection {
/// The built-in Beam GroupByKey transform. Requires that the input be a Beam key-value type.
///
/// - Returns: A ``PCollection`` of type `KV<K,V>` where the value of the key-value pair is a `Sequence` of all values associated with the key `K`
func groupByKey<K, V>() -> PCollection<KV<K, V>> where Of == KV<K, V> {
// Adjust the coder for the pcollection to reflect GBK semantcs
let output = PCollection<KV<K, V>>(coder: .keyvalue(.of(type: K.self)!, .of(type: [V].self)!), type: streamType)
apply(.groupByKey(AnyPCollection(self), AnyPCollection(output)))
return output
}
}