blob: e9a0718a8544ffe5996ce8693516b98d039f8873 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as protobufjs from "protobufjs";
import { PTransform, PCollection } from "../proto/beam_runner_api";
import * as runnerApi from "../proto/beam_runner_api";
import * as fnApi from "../proto/beam_fn_api";
import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
import { MultiplexingDataChannel, IDataChannel } from "./data";
import { StateProvider } from "./state";
import * as urns from "../internal/urns";
import { PipelineContext } from "../internal/pipeline";
import { deserializeFn } from "../internal/serialize";
import { Coder, Context as CoderContext } from "../coders/coders";
import { Window, Instant, WindowedValue } from "../values";
import { parDo, DoFn, SplitOptions } from "../transforms/pardo";
import { WindowFn } from "../transforms/window";
import {
ParamProviderImpl,
SideInputInfo,
createSideInputInfo,
} from "./pardo_context";
// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
export const NonPromise = null;
export type ProcessResult = null | Promise<void>;
export class ProcessResultBuilder {
promises: Promise<void>[] = [];
add(result: ProcessResult) {
if (result !== NonPromise) {
this.promises.push(result as Promise<void>);
}
}
build(): ProcessResult {
if (this.promises.length === 0) {
return NonPromise;
} else if (this.promises.length === 1) {
return this.promises[0];
} else {
return Promise.all(this.promises).then(() => void null);
}
}
}
export interface IOperator {
startBundle: () => Promise<void>;
// As this is called at every operator at every element, and the vast majority
// of the time Promises are not needed, we wish to avoid the overhead of
// creating promisses and await as much as possible.
process: (wv: WindowedValue<unknown>) => ProcessResult;
finishBundle: () => Promise<void>;
}
export class Receiver {
constructor(private operators: IOperator[]) {}
receive(wvalue: WindowedValue<unknown>): ProcessResult {
if (this.operators.length === 1) {
return this.operators[0].process(wvalue);
} else {
const result = new ProcessResultBuilder();
for (const operator of this.operators) {
result.add(operator.process(wvalue));
}
return result.build();
}
}
}
export class OperatorContext {
pipelineContext: PipelineContext;
constructor(
public descriptor: ProcessBundleDescriptor,
public getReceiver: (string) => Receiver,
public getDataChannel: (string) => MultiplexingDataChannel,
public getStateProvider: () => StateProvider,
public getBundleId: () => string
) {
this.pipelineContext = new PipelineContext(descriptor);
}
}
export function createOperator(
transformId: string,
context: OperatorContext
): IOperator {
const transform = context.descriptor.transforms[transformId];
// Ensure receivers are eagerly created.
Object.values(transform.outputs).map(context.getReceiver);
let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
if (operatorConstructor === null || operatorConstructor === undefined) {
throw new Error("Unknown transform type:" + transform.spec!.urn);
}
return operatorConstructor(transformId, transform, context);
}
type OperatorConstructor = (
transformId: string,
transformProto: PTransform,
context: OperatorContext
) => IOperator;
interface OperatorClass {
new (
transformId: string,
transformProto: PTransform,
context: OperatorContext
): IOperator;
}
const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
export function registerOperator(urn: string, cls: OperatorClass) {
registerOperatorConstructor(urn, (transformId, transformProto, context) => {
return new cls(transformId, transformProto, context);
});
}
export function registerOperatorConstructor(
urn: string,
constructor: OperatorConstructor
) {
operatorsByUrn.set(urn, constructor);
}
////////// Actual operator implementation. //////////
// NOTE: It may have been more idiomatic to use objects in closures satisfying
// the IOperator interface here, but classes are used to make a clearer pattern
// potential SDK authors that are less familiar with javascript.
class DataSourceOperator implements IOperator {
transformId: string;
getBundleId: () => string;
multiplexingDataChannel: MultiplexingDataChannel;
receiver: Receiver;
coder: Coder<WindowedValue<unknown>>;
endOfData: Promise<void>;
constructor(
transformId: string,
transform: PTransform,
context: OperatorContext
) {
const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
this.multiplexingDataChannel = context.getDataChannel(
readPort.apiServiceDescriptor!.url
);
this.transformId = transformId;
this.getBundleId = context.getBundleId;
this.receiver = context.getReceiver(
onlyElement(Object.values(transform.outputs))
);
this.coder = context.pipelineContext.getCoder(readPort.coderId);
}
async startBundle() {
const this_ = this;
var endOfDataResolve, endOfDataReject;
this.endOfData = new Promise(async (resolve, reject) => {
endOfDataResolve = resolve;
endOfDataReject = reject;
});
await this_.multiplexingDataChannel.registerConsumer(
this_.getBundleId(),
this_.transformId,
{
sendData: async function (data: Uint8Array) {
console.log("Got", data);
const reader = new protobufjs.Reader(data);
while (reader.pos < reader.len) {
const maybePromise = this_.receiver.receive(
this_.coder.decode(reader, CoderContext.needsDelimiters)
);
if (maybePromise !== NonPromise) {
await maybePromise;
}
}
},
sendTimers: async function (timerFamilyId: string, timers: Uint8Array) {
throw Error("Not expecting timers.");
},
close: function () {
endOfDataResolve();
},
onError: function (error: Error) {
endOfDataReject(error);
},
}
);
}
process(wvalue: WindowedValue<unknown>): ProcessResult {
throw Error("Data should not come in via process.");
}
async finishBundle() {
try {
await this.endOfData;
} finally {
this.multiplexingDataChannel.unregisterConsumer(
this.getBundleId(),
this.transformId
);
}
}
}
registerOperator("beam:runner:source:v1", DataSourceOperator);
class DataSinkOperator implements IOperator {
transformId: string;
getBundleId: () => string;
multiplexingDataChannel: MultiplexingDataChannel;
channel: IDataChannel;
coder: Coder<WindowedValue<unknown>>;
buffer: protobufjs.Writer;
constructor(
transformId: string,
transform: PTransform,
context: OperatorContext
) {
const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
this.multiplexingDataChannel = context.getDataChannel(
writePort.apiServiceDescriptor!.url
);
this.transformId = transformId;
this.getBundleId = context.getBundleId;
this.coder = context.pipelineContext.getCoder(writePort.coderId);
}
async startBundle() {
this.channel = this.multiplexingDataChannel.getSendChannel(
this.getBundleId(),
this.transformId
);
this.buffer = new protobufjs.Writer();
}
process(wvalue: WindowedValue<unknown>) {
this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
if (this.buffer.len > 1e6) {
return this.flush();
}
return NonPromise;
}
async finishBundle() {
await this.flush();
this.channel.close();
}
async flush() {
if (this.buffer.len > 0) {
await this.channel.sendData(this.buffer.finish());
this.buffer = new protobufjs.Writer();
}
}
}
registerOperator("beam:runner:sink:v1", DataSinkOperator);
class FlattenOperator implements IOperator {
receiver: Receiver;
constructor(
transformId: string,
transform: PTransform,
context: OperatorContext
) {
this.receiver = context.getReceiver(
onlyElement(Object.values(transform.outputs))
);
}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
return this.receiver.receive(wvalue);
}
async finishBundle() {}
}
registerOperator("beam:transform:flatten:v1", FlattenOperator);
class GenericParDoOperator implements IOperator {
private doFn: DoFn<unknown, unknown, unknown>;
private getStateProvider: () => StateProvider;
private sideInputInfo: Map<string, SideInputInfo> = new Map();
private originalContext: object | undefined;
private augmentedContext: object | undefined;
private paramProvider: ParamProviderImpl;
constructor(
private transformId: string,
private receiver: Receiver,
private spec: runnerApi.ParDoPayload,
private payload: {
doFn: DoFn<unknown, unknown, unknown>;
context: any;
},
transformProto: runnerApi.PTransform,
operatorContext: OperatorContext
) {
this.doFn = payload.doFn;
this.originalContext = payload.context;
this.getStateProvider = operatorContext.getStateProvider;
this.sideInputInfo = createSideInputInfo(
transformProto,
spec,
operatorContext
);
}
async startBundle() {
this.paramProvider = new ParamProviderImpl(
this.transformId,
this.sideInputInfo,
this.getStateProvider
);
this.augmentedContext = this.paramProvider.augmentContext(
this.originalContext
);
if (this.doFn.startBundle) {
this.doFn.startBundle(this.augmentedContext);
}
}
process(wvalue: WindowedValue<unknown>) {
if (this.augmentedContext && wvalue.windows.length !== 1) {
// We need to process each window separately.
// TODO: (Perf) We could inspect the context more deeply and allow some
// cases to go through.
const result = new ProcessResultBuilder();
for (const window of wvalue.windows) {
result.add(
this.process({
value: wvalue.value,
windows: [window],
pane: wvalue.pane,
timestamp: wvalue.timestamp,
})
);
}
return result.build();
}
const this_ = this;
function reallyProcess(): ProcessResult {
const doFnOutput = this_.doFn.process(
wvalue.value,
this_.augmentedContext
);
if (!doFnOutput) {
return NonPromise;
}
const result = new ProcessResultBuilder();
for (const element of doFnOutput) {
result.add(
this_.receiver.receive({
value: element,
windows: wvalue.windows,
pane: wvalue.pane,
timestamp: wvalue.timestamp,
})
);
}
this_.paramProvider.update(undefined);
return result.build();
}
// Update the context with any information specific to this window.
const updateContextResult = this.paramProvider.update(wvalue);
// If we were able to do so without any deferred actions, process the
// element immediately.
if (updateContextResult === NonPromise) {
return reallyProcess();
} else {
// Otherwise return a promise that first waits for all the deferred
// actions to complete and then process the element.
return (async () => {
await updateContextResult;
const update2 = this.paramProvider.update(wvalue);
if (update2 !== NonPromise) {
throw new Error("Expected all promises to be resolved: " + update2);
}
await reallyProcess();
})();
}
}
async finishBundle() {
if (this.doFn.finishBundle) {
const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
if (!finishBundleOutput) {
return;
}
// The finishBundle method must return `void` or a Generator<WindowedValue<OutputT>>. It may not
// return Generator<OutputT> without windowing information because a single bundle may contain
// elements from different windows, so each element must specify its window.
for (const element of finishBundleOutput) {
const maybePromise = this.receiver.receive(element);
if (maybePromise !== NonPromise) {
await maybePromise;
}
}
}
}
}
class IdentityParDoOperator implements IOperator {
constructor(private receiver: Receiver) {}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
return this.receiver.receive(wvalue);
}
async finishBundle() {}
}
class SplittingDoFnOperator implements IOperator {
constructor(
private receivers: { [key: string]: Receiver },
private options: SplitOptions
) {}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
const result = new ProcessResultBuilder();
const keys = Object.keys(wvalue.value as object);
if (this.options.exclusive && keys.length !== 1) {
throw new Error(
"Multiple keys for exclusively split element: " + wvalue.value
);
}
for (let tag of keys) {
if (!this.options.knownTags!.includes(tag)) {
if (this.options.unknownTagBehavior === "rename") {
tag = this.options.unknownTagName!;
} else if (this.options.unknownTagBehavior === "ignore") {
continue;
} else {
throw new Error(
"Unexpected tag '" +
tag +
"' for " +
wvalue.value +
" not in " +
this.options.knownTags
);
}
}
const receiver = this.receivers[tag];
if (receiver) {
result.add(
receiver.receive({
value: (wvalue.value as object)[tag],
windows: wvalue.windows,
timestamp: wvalue.timestamp,
pane: wvalue.pane,
})
);
}
}
return result.build();
}
async finishBundle() {}
}
class AssignWindowsParDoOperator implements IOperator {
constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) {}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
if (newWindowsOnce.length > 0) {
// Each element from each original window is assigned to the new windows.
const newWindows: Window[] = [];
for (var i = 0; i < wvalue.windows.length; i++) {
newWindows.push(...newWindowsOnce);
}
return this.receiver.receive({
value: wvalue.value,
windows: newWindows,
timestamp: wvalue.timestamp,
pane: wvalue.pane,
});
} else {
return NonPromise;
}
}
async finishBundle() {}
}
class AssignTimestampsParDoOperator implements IOperator {
constructor(
private receiver: Receiver,
private func: (any, Instant) => typeof Instant
) {}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
return this.receiver.receive({
value: wvalue.value,
windows: wvalue.windows,
// TODO: Verify it falls in window and doesn't cause late data.
timestamp: this.func(wvalue.value, wvalue.timestamp),
pane: wvalue.pane,
});
}
async finishBundle() {}
}
registerOperatorConstructor(
parDo.urn,
(transformId: string, transform: PTransform, context: OperatorContext) => {
const receiver = context.getReceiver(
onlyElement(Object.values(transform.outputs))
);
const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload);
// TODO: (Cleanup) Ideally we could branch on the urn itself, but some runners have a closed set of known URNs.
if (spec.doFn?.urn === urns.SERIALIZED_JS_DOFN_INFO) {
return new GenericParDoOperator(
transformId,
context.getReceiver(onlyElement(Object.values(transform.outputs))),
spec,
deserializeFn(spec.doFn.payload!),
transform,
context
);
} else if (spec.doFn?.urn === urns.IDENTITY_DOFN_URN) {
return new IdentityParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs)))
);
} else if (spec.doFn?.urn === urns.JS_WINDOW_INTO_DOFN_URN) {
return new AssignWindowsParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs))),
deserializeFn(spec.doFn.payload!).windowFn
);
} else if (spec.doFn?.urn === urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN) {
return new AssignTimestampsParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs))),
deserializeFn(spec.doFn.payload!).func
);
} else if (spec.doFn?.urn === urns.SPLITTING_JS_DOFN_URN) {
return new SplittingDoFnOperator(
Object.fromEntries(
Object.entries(transform.outputs).map(([tag, pcId]) => [
tag,
context.getReceiver(pcId),
])
),
deserializeFn(spec.doFn.payload!)
);
} else {
throw new Error("Unknown DoFn type: " + spec);
}
}
);
function onlyElement<Type>(arg: Type[]): Type {
if (arg.length > 1) {
Error("Expecting exactly one element.");
}
return arg[0];
}