blob: 8d778c6c170e7fb2e311bd5c3f3cf8153ce5de6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"use strict";
import {EgressMessage, Message} from "./message";
import "./generated/request-reply_pb";
import {Address, FunctionSpec, JsStatefulFunction, parseTypeName, ValueSpec} from "./core";
import {CancellationRequest, Context, DelayedMessage, InternalContext} from "./context";
import {AddressScopedStorageFactory, Value} from "./storage";
// ----------------------------------------------------------------------------------------------------
// Missing context handling
// ----------------------------------------------------------------------------------------------------
/**
* @param {[ValueSpec]} missing a list of value spec that the server does not know about.
* @returns {!Uint8Array}
*/
function respondImmediatelyWithMissingContext(missing: ValueSpec[]): Uint8Array {
const ctx = valueSpecsToIncompleteInvocationContext(missing);
const pbFromFn = new proto.io.statefun.sdk.reqreply.FromFunction();
pbFromFn.setIncompleteInvocationContext(ctx);
return pbFromFn.serializeBinary();
}
// noinspection JSValidateJSDoc
/**
* @param {ValueSpec} valueSpec the input value spec.
* @returns {null|proto.io.statefun.sdk.reqreply.FromFunction.ExpirationSpec} an expiration spec if one is set, or null otherwise.
*/
function expirationSpecFromValueSpec(valueSpec: ValueSpec) {
if (valueSpec.expireAfterWrite !== -1) {
const pbSpec = new proto.io.statefun.sdk.reqreply.FromFunction.ExpirationSpec();
pbSpec.setExpireAfterMillis(valueSpec.expireAfterWrite);
// noinspection JSCheckFunctionSignatures,TypeScriptValidateJSTypes
pbSpec.setMode(1); // AFTER_WRITE
return pbSpec;
}
if (valueSpec.expireAfterCall !== -1) {
const pbSpec = new proto.io.statefun.sdk.reqreply.FromFunction.ExpirationSpec();
pbSpec.setExpireAfterMillis(valueSpec.expireAfterCall);
// noinspection JSCheckFunctionSignatures,TypeScriptValidateJSTypes
pbSpec.setMode(2); // AFTER_CALL
return pbSpec;
}
return null;
}
/**
* @param {[ValueSpec]} missing a list of value spec that the server does not know about.
*/
function valueSpecsToIncompleteInvocationContext(missing: ValueSpec[]) {
const pbValueSpecs = missing.map(missingValueSpec => {
const pbPersistedValueSpec = new proto.io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec();
pbPersistedValueSpec.setStateName(missingValueSpec.name);
pbPersistedValueSpec.setTypeTypename(missingValueSpec.type.typename);
const pbExpirationSpec = expirationSpecFromValueSpec(missingValueSpec);
if (pbExpirationSpec !== null) {
pbPersistedValueSpec.setExpirationSpec(pbExpirationSpec);
}
return pbPersistedValueSpec;
});
const res = new proto.io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext();
pbValueSpecs.forEach(pbSpec => res.addMissingValues(pbSpec));
return res;
}
// ----------------------------------------------------------------------------------------------------
// Handler
// ----------------------------------------------------------------------------------------------------
export async function handle(
toFunctionBytes: Buffer | Uint8Array,
fns: Record<string, FunctionSpec>
): Promise<Buffer | Uint8Array> {
//
// setup
//
const toFn = proto.io.statefun.sdk.reqreply.ToFunction.deserializeBinary(toFunctionBytes);
const pbInvocationBatchRequest = toFn.getInvocation();
if (pbInvocationBatchRequest === null) {
throw new Error("An empty invocation request");
}
const targetAddress = pbAddressToSdkAddress(pbInvocationBatchRequest.getTarget());
if (targetAddress === null) {
throw new Error("Missing target address");
}
const fnSpec = findTargetFunctionSpec(fns, targetAddress);
const {missing, values, storage} = AddressScopedStorageFactory.tryCreateAddressScopedStorage(pbInvocationBatchRequest, fnSpec.valueSpecs);
if (missing !== null) {
return respondImmediatelyWithMissingContext(missing);
}
if (values === undefined || values === null) {
throw new Error("Unexpected internal error. Could not create an address scoped storage.");
}
//
// apply the batch
//
const internalContext = new InternalContext();
const context = new Context(targetAddress, storage, internalContext);
await applyBatch(pbInvocationBatchRequest, context, internalContext, fnSpec.fn);
//
// collect the side effects
//
const pbInvocationResponse = new proto.io.statefun.sdk.reqreply.FromFunction.InvocationResponse();
collectStateMutations(values, pbInvocationResponse);
collectOutgoingMessages(internalContext.sent, pbInvocationResponse);
collectEgress(internalContext.egress, pbInvocationResponse);
collectDelayedMessage(internalContext.delayed, pbInvocationResponse);
const fromFn = new proto.io.statefun.sdk.reqreply.FromFunction();
fromFn.setInvocationResult(pbInvocationResponse);
return fromFn.serializeBinary();
}
// noinspection JSValidateJSDoc
/**
* @param {?proto.io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest} pbInvocationBatchRequest
* @param {Context} context user facing context
* @param {InternalContext} internalContext
* @param {*} fn the function to apply
*/
async function applyBatch(
pbInvocationBatchRequest: proto.io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest,
context: Context,
internalContext: InternalContext,
fn: JsStatefulFunction
) {
for (const invocation of pbInvocationBatchRequest.getInvocationsList()) {
internalContext.caller = pbAddressToSdkAddress(invocation.getCaller());
const message = new Message(context.self, invocation.getArgument()!);
const maybePromise = fn(context, message);
if (maybePromise instanceof Promise) {
await maybePromise;
}
}
}
// ----------------------------------------------------------------------------------------------------
// Side Effect Collection
// ----------------------------------------------------------------------------------------------------
function collectStateMutations(
values: Value<unknown>[],
pbInvocationResponse: proto.io.statefun.sdk.reqreply.FromFunction.InvocationResponse
) {
for (const mutation of AddressScopedStorageFactory.collectMutations(values)) {
pbInvocationResponse.addStateMutations(mutation);
}
}
function collectOutgoingMessages(
sent: Message[],
pbInvocationResponse: proto.io.statefun.sdk.reqreply.FromFunction.InvocationResponse
) {
for (const message of sent) {
const pbAddr = sdkAddressToPbAddress(message.targetAddress);
const pbArg = message.typedValue;
const pbMessage = new proto.io.statefun.sdk.reqreply.FromFunction.Invocation();
pbMessage.setTarget(pbAddr);
pbMessage.setArgument(pbArg);
pbInvocationResponse.addOutgoingMessages(pbMessage);
}
}
function collectEgress(
egresses: EgressMessage[],
pbInvocationResponse: proto.io.statefun.sdk.reqreply.FromFunction.InvocationResponse
) {
for (const egress of egresses) {
const outEgress = new proto.io.statefun.sdk.reqreply.FromFunction.EgressMessage();
const {namespace, name} = parseTypeName(egress.typename);
outEgress.setEgressNamespace(namespace);
outEgress.setEgressType(name);
outEgress.setArgument(egress.typedValue);
pbInvocationResponse.addOutgoingEgresses(outEgress);
}
}
function collectDelayedMessage(
delayed: (DelayedMessage | CancellationRequest)[],
pbInvocationResponse: proto.io.statefun.sdk.reqreply.FromFunction.InvocationResponse
) {
for (const delayedOr of delayed) {
const pb = new proto.io.statefun.sdk.reqreply.FromFunction.DelayedInvocation();
if (delayedOr instanceof CancellationRequest) {
pb.setIsCancellationRequest(true);
pb.setCancellationToken(delayedOr.token);
} else {
pb.setIsCancellationRequest(false);
pb.setTarget(sdkAddressToPbAddress(delayedOr.message.targetAddress));
pb.setDelayInMs(delayedOr.delay);
pb.setArgument(delayedOr.message.typedValue);
if (delayedOr.token !== undefined) {
pb.setCancellationToken(delayedOr.token);
}
}
pbInvocationResponse.addDelayedInvocations(pb);
}
}
// ----------------------------------------------------------------------------------------------------
// Utils
// ----------------------------------------------------------------------------------------------------
function sdkAddressToPbAddress(sdkAddress: Address) {
const pbAddr = new proto.io.statefun.sdk.reqreply.Address();
pbAddr.setNamespace(sdkAddress.namespace);
pbAddr.setType(sdkAddress.name);
pbAddr.setId(sdkAddress.id);
return pbAddr;
}
function pbAddressToSdkAddress(pbAddress: proto.io.statefun.sdk.reqreply.Address | null | undefined) {
if (pbAddress === undefined || pbAddress === null) {
return null;
}
return Address.fromParts(pbAddress.getNamespace(), pbAddress.getType(), pbAddress.getId());
}
/**
*
* @param { {string : FunctionSpec} } fns an index of function specs by function typename.
* @param {Address} targetAddress the target function address which we need to invoke.
* @returns {FunctionSpec} the function spec that this batch is addressed to.
*/
function findTargetFunctionSpec(fns: Record<string, FunctionSpec>, targetAddress: Address): FunctionSpec {
if (!fns.hasOwnProperty(targetAddress.typename)) {
throw new Error(`unknown function type ${targetAddress.typename}`);
}
return fns[targetAddress.typename];
}