blob: f4591e2aa1c666618bc28e3c3004df5034ac8247 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import GRPC
import Logging
import NIOCore
public struct PortableRunner: PipelineRunner {
let loopback: Bool
let log: Logging.Logger
let host: String
let port: Int
public init(host: String = "localhost", port: Int = 8073, loopback: Bool = false) {
self.loopback = loopback
log = .init(label: "PortableRunner")
self.host = host
self.port = port
}
public func run(_ context: PipelineContext) async throws {
var proto = context.proto
if loopback {
// If we are in loopback mode we want to replace the default environment
// with an external environment that points to our local worker server
let worker = try WorkerServer(context.collections, context.pardoFns)
log.info("Running in LOOPBACK mode with a worker server at \(worker.endpoint).")
proto.components.environments[context.defaultEnvironmentId] = try .with {
try Environment.external(worker.endpoint).populate(&$0)
}
}
log.info("\(proto)")
log.info("Connecting to Portable Runner at \(host):\(port).")
let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
let channel = try GRPCChannelPool.with(target: .host(host, port: port),
transportSecurity: .plaintext, eventLoopGroup: group)
let client = Org_Apache_Beam_Model_JobManagement_V1_JobServiceAsyncClient(channel: channel)
let prepared = try await client.prepare(.with {
$0.pipeline = proto
})
let job = try await client.run(.with {
$0.preparationID = prepared.preparationID
})
log.info("Submitted job \(job.jobID)")
var done = false
while !done {
let status = try await client.getState(.with {
$0.jobID = job.jobID
})
log.info("Job \(job.jobID) status: \(status.state)")
switch status.state {
case .stopped, .failed, .done:
done = true
default:
try await Task.sleep(for: .seconds(5))
}
}
log.info("Job completed.")
}
}