blob: 6ffe02cfcb6dd1e1be84e28597239b6141f3d865 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as BSON from "bson";
import { Writer, Reader } from "protobufjs";
import { Coder, Context, ProtoContext, globalRegistry } from "./coders";
import {
BoolCoder,
DoubleCoder,
StrUtf8Coder,
VarIntCoder,
} from "./standard_coders";
import * as runnerApi from "../proto/beam_runner_api";
export class BsonObjectCoder<T> implements Coder<T> {
static URN = "beam:coder:bsonjs:v1";
encode(element: T, writer: Writer, context: Context) {
const buff = BSON.serialize(element);
writer.bytes(buff);
}
decode(reader: Reader, context: Context): T {
const encoded = reader.bytes();
return BSON.deserialize(encoded) as T;
}
toProto(pipelineContext: ProtoContext): runnerApi.Coder {
return {
spec: {
urn: BsonObjectCoder.URN,
payload: new Uint8Array(),
},
componentCoderIds: [],
};
}
}
globalRegistry().register(BsonObjectCoder.URN, BsonObjectCoder);
class NumberOrFloatCoder implements Coder<number> {
static URN = "beam:coder:numbersforjs:v1";
intCoder: Coder<number> = new VarIntCoder();
doubleCoder: Coder<number> = new DoubleCoder();
encode(element: number, writer: Writer, context: Context) {
if (element % 1) {
writer.string("f");
this.doubleCoder.encode(element, writer, context);
} else {
writer.string("i");
this.intCoder.encode(element, writer, context);
}
}
decode(reader: Reader, context: Context): number {
const typeMarker = reader.string();
if (typeMarker === "f") {
return this.doubleCoder.decode(reader, context);
} else {
return this.intCoder.decode(reader, context);
}
}
toProto(pipelineContext: ProtoContext): runnerApi.Coder {
return {
spec: {
urn: NumberOrFloatCoder.URN,
payload: new Uint8Array(),
},
componentCoderIds: [],
};
}
}
export class GeneralObjectCoder<T> implements Coder<T> {
static URN = "beam:coder:genericobjectjs:v1";
codersByType = {
string: new StrUtf8Coder(),
number: new NumberOrFloatCoder(),
object: new BsonObjectCoder(),
boolean: new BoolCoder(),
};
// This is a map of type names to type markers. It maps a type name to its
// marker within a stream.
typeMarkers = {
string: "S",
number: "N",
object: "O",
boolean: "B",
};
// This is a map of type markers to type names. It maps a type marker to its
// type name.
markerToTypes = {
S: "string",
N: "number",
O: "object",
B: "boolean",
};
encode(element: T, writer: Writer, context: Context) {
if (element === null || element === undefined) {
// typeof is "object" but BSON can't handle it.
writer.string("Z");
} else {
const type = typeof element;
// TODO: Perf. Write a single byte (no need for the length prefix).
writer.string(this.typeMarkers[type]);
this.codersByType[type].encode(element, writer, context);
}
}
decode(reader: Reader, context: Context): T {
const typeMarker = reader.string();
if (typeMarker === "Z") {
return null!;
} else {
const type = this.markerToTypes[typeMarker];
return this.codersByType[type].decode(reader, context);
}
}
toProto(pipelineContext: ProtoContext): runnerApi.Coder {
return {
spec: {
urn: GeneralObjectCoder.URN,
payload: new Uint8Array(),
},
componentCoderIds: [],
};
}
}
globalRegistry().register(GeneralObjectCoder.URN, GeneralObjectCoder);
import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/coders/js_coders", exports);
requireForSerialization("apache-beam/coders/js_coders", {
NumberOrFloatCoder: NumberOrFloatCoder,
});