blob: c28c13cdf493e573dd011200135a6b4f3beaba7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Long from "long";
import { PaneInfoCoder } from "../src/apache_beam/coders/standard_coders";
import {
PTransform,
PCollection,
} from "../src/apache_beam/proto/beam_runner_api";
import { ProcessBundleDescriptor } from "../src/apache_beam/proto/beam_fn_api";
import * as worker from "../src/apache_beam/worker/worker";
import * as operators from "../src/apache_beam/worker/operators";
import {
Window,
GlobalWindow,
Instant,
PaneInfo,
WindowedValue,
} from "../src/apache_beam/values";
const assert = require("assert");
////////// Define some mock operators for use in our tests. //////////
const CREATE_URN = "create";
const RECORDING_URN = "recording";
const PARTITION_URN = "partition";
class Create implements operators.IOperator {
data: any;
receivers: operators.Receiver[];
constructor(
public transformId: string,
transform: PTransform,
context: operators.OperatorContext
) {
this.data = JSON.parse(new TextDecoder().decode(transform.spec!.payload!));
this.receivers = Object.values(transform.outputs).map(context.getReceiver);
}
process(wvalue: WindowedValue<any>) {
return operators.NonPromise;
}
async startBundle() {
const this_ = this;
this_.data.forEach(function (datum) {
this_.receivers.map((receiver) =>
receiver.receive({
value: datum,
windows: [new GlobalWindow()],
pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
timestamp: Long.fromValue("-9223372036854775"),
})
);
});
}
async finishBundle() {}
}
operators.registerOperator(CREATE_URN, Create);
class Recording implements operators.IOperator {
static log: string[];
transformId: string;
receivers: operators.Receiver[];
static reset() {
Recording.log = [];
}
constructor(
transformId: string,
transform: PTransform,
context: operators.OperatorContext
) {
this.transformId = transformId;
this.receivers = Object.values(transform.outputs).map(context.getReceiver);
}
async startBundle() {
Recording.log.push(this.transformId + ".startBundle()");
}
process(wvalue: WindowedValue<any>) {
Recording.log.push(this.transformId + ".process(" + wvalue.value + ")");
const results = this.receivers.map((receiver) => receiver.receive(wvalue));
if (!results.every((r) => r === operators.NonPromise)) {
throw new Error("Unexpected non-promise: " + results);
}
return operators.NonPromise;
}
async finishBundle() {
Recording.log.push(this.transformId + ".finishBundle()");
}
}
operators.registerOperator(RECORDING_URN, Recording);
worker.primitiveSinks.push(RECORDING_URN);
class Partition implements operators.IOperator {
big: operators.Receiver;
all: operators.Receiver;
constructor(
public transformId: string,
transform: PTransform,
context: operators.OperatorContext
) {
this.big = context.getReceiver(transform.outputs.big);
this.all = context.getReceiver(transform.outputs.all);
}
process(wvalue: WindowedValue<any>) {
const a = this.all.receive(wvalue);
if (a !== operators.NonPromise) throw new Error("Unexpected promise: " + a);
if (wvalue.value.length > 3) {
const b = this.big.receive(wvalue);
if (b !== operators.NonPromise)
throw new Error("Unexpected promise: " + b);
}
return operators.NonPromise;
}
async startBundle() {}
async finishBundle() {}
}
operators.registerOperator(PARTITION_URN, Partition);
////////// Helper Functions. //////////
function makePTransform(
urn: string,
inputs: { [key: string]: string },
outputs: { [key: string]: string },
payload: any = null
): PTransform {
return {
spec: {
urn: urn,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
inputs: inputs,
outputs: outputs,
uniqueName: "",
environmentId: "",
displayData: [],
annotations: {},
subtransforms: [],
};
}
////////// The actual tests. //////////
describe("worker module", function () {
it("operator construction", async function () {
const descriptor: ProcessBundleDescriptor = {
id: "",
transforms: {
// Note the inverted order should still be resolved correctly.
y: makePTransform(RECORDING_URN, { input: "pc1" }, { out: "pc2" }),
z: makePTransform(RECORDING_URN, { input: "pc2" }, {}),
x: makePTransform(CREATE_URN, {}, { out: "pc1" }, ["a", "b", "c"]),
},
pcollections: {},
windowingStrategies: {},
coders: {},
environments: {},
};
Recording.reset();
const processor = new worker.BundleProcessor(descriptor, null!, null!, [
CREATE_URN,
]);
await processor.process("bundle_id");
assert.deepEqual(Recording.log, [
"z.startBundle()",
"y.startBundle()",
"y.process(a)",
"z.process(a)",
"y.process(b)",
"z.process(b)",
"y.process(c)",
"z.process(c)",
"y.finishBundle()",
"z.finishBundle()",
]);
});
it("forking operator construction", async function () {
const descriptor: ProcessBundleDescriptor = {
id: "",
transforms: {
// Note the inverted order should still be resolved correctly.
all: makePTransform(RECORDING_URN, { input: "pcAll" }, {}),
big: makePTransform(RECORDING_URN, { input: "pcBig" }, {}),
part: makePTransform(
PARTITION_URN,
{ input: "pc1" },
{ all: "pcAll", big: "pcBig" }
),
create: makePTransform(CREATE_URN, {}, { out: "pc1" }, [
"a",
"b",
"ccccc",
]),
},
pcollections: {},
windowingStrategies: {},
coders: {},
environments: {},
};
Recording.reset();
const processor = new worker.BundleProcessor(descriptor, null!, null!, [
CREATE_URN,
]);
await processor.process("bundle_id");
assert.deepEqual(Recording.log, [
"all.startBundle()",
"big.startBundle()",
"all.process(a)",
"all.process(b)",
"all.process(ccccc)",
"big.process(ccccc)",
"big.finishBundle()",
"all.finishBundle()",
]);
});
});