blob: 641656a570231ee3e725f024452c888364e744b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
/// Test harness for PCollections. Primarily designed for unit testing.
public struct PCollectionTest {
let output: AnyPCollection
let fn: (Logger, [AnyPCollectionStream], [AnyPCollectionStream]) async throws -> Void
public init(_ collection: PCollection<some Any>, _ fn: @escaping (Logger, [AnyPCollectionStream], [AnyPCollectionStream]) async throws -> Void) {
output = AnyPCollection(collection)
self.fn = fn
}
public func run() async throws {
let log = Logger(label: "Test")
if let transform = output.parent {
switch transform {
case let .pardo(parent, _, fn, outputs):
let context = try SerializableFnBundleContext(instruction: "1", transform: "test", payload: fn.payload, log: log)
let input = parent.anyStream
let streams = outputs.map(\.anyStream)
try await withThrowingTaskGroup(of: Void.self) { group in
log.info("Starting process task")
group.addTask {
_ = try await fn.process(context: context, inputs: [input], outputs: streams)
}
log.info("Calling Stream")
group.addTask {
try await self.fn(log, [input], streams)
}
for try await _ in group {}
}
default:
throw ApacheBeamError.runtimeError("Not able to test this type of transform \(transform)")
}
} else {
throw ApacheBeamError.runtimeError("Unable to determine parent transform to test")
}
}
}