blob: 35887702a56b76671ecebc4f61dcaab664ff1586 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import GRPC
import Logging
import NIOCore
actor Worker {
private let id: String
private let collections: [String: AnyPCollection]
private let fns: [String: SerializableFn]
private let control: ApiServiceDescriptor
private let remoteLog: ApiServiceDescriptor
private let log: Logging.Logger
public init(id: String, control: ApiServiceDescriptor, log: ApiServiceDescriptor, collections: [String: AnyPCollection], functions: [String: SerializableFn]) {
self.id = id
self.collections = collections
fns = functions
self.control = control
remoteLog = log
self.log = Logging.Logger(label: "Worker(\(id))")
}
public func start() throws {
let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
let client = try Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel: GRPCChannelPool.with(endpoint: control, eventLoopGroup: group))
let (responses, responder) = AsyncStream.makeStream(of: Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse.self)
let options = CallOptions(customMetadata: ["worker_id": id])
let control = client.makeControlCall(callOptions: options)
// Start the response task. This will continue until a yield call is sent from responder
Task {
for await r in responses {
log.info("Sending response \(r)")
try await control.requestStream.send(r)
}
}
// Start the actual work task
Task {
log.info("Waiting for control plane instructions.")
var processors: [String: BundleProcessor] = [:]
func processor(for bundle: String) async throws -> BundleProcessor {
if let processor = processors[bundle] {
return processor
}
let descriptor = try await client.getProcessBundleDescriptor(.with { $0.processBundleDescriptorID = bundle })
let processor = try BundleProcessor(id: id, descriptor: descriptor, collections: collections, fns: fns)
processors[bundle] = processor
return processor
}
// This looks a little bit reversed from the usual because response don't need an initiating call
for try await instruction in control.responseStream {
switch instruction.request {
case let .processBundle(pbr):
do {
let p = try await processor(for: pbr.processBundleDescriptorID)
Task {
await p.process(instruction: instruction.instructionID, responder: responder)
}
} catch {
log.error("Unable to process bundle \(pbr.processBundleDescriptorID): \(error)")
}
default:
log.warning("Ignoring instruction \(instruction.instructionID). Not yet implemented.")
log.warning("\(instruction)")
responder.yield(.with {
$0.instructionID = instruction.instructionID
})
}
}
log.info("Control plane connection has closed.")
}
}
}