blob: df9ad9b46a03ec73e4738e1c36d1cf5e1619b381 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// wasm is a simple example that loads and executes a wasm file function.
// greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
//
// New Concepts:
// 1. Load a wasm file compiled from: cargo build --release --target wasm32-unknown-unknown
// 2. Execute a wasm function within a DoFn
package main
import (
"context"
_ "embed"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
)
const (
wasmFunctionName = "greeting"
wasmAllocateFunctionName = "allocate"
wasmDeallocateFunctionName = "deallocate"
)
//go:embed greet.wasm
var greetWasm []byte
var (
output = flag.String("output", "", "Output file (required).")
)
func init() {
// register.DoFnXxY registers a struct DoFn so that it can be correctly
// serialized and does some optimization to avoid runtime reflection. Since
// embeddedWasmFn's ProcessElement func has 2 inputs (context.Context) and 2 outputs (string, error),
// we use register.DoFn2x2 and provide its input and output types as its constraints.
// Struct DoFns must be registered for a pipeline to run.
register.DoFn2x2[context.Context, string, string, error](&embeddedWasmFn{})
}
func preRun() error {
if *output == "" {
return fmt.Errorf("--output is required")
}
return nil
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := preRun(); err != nil {
log.Fatal(ctx, err)
}
if err := run(ctx); err != nil {
log.Fatal(ctx, err)
}
}
func run(ctx context.Context) error {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, "Ada", "Lovelace", "World", "Beam", "Senior López", "Random unicorn emoji 🦄")
out := beam.ParDo(s, &embeddedWasmFn{}, in)
textio.Write(s, *output, out)
if err := beamx.Run(ctx, p); err != nil {
return fmt.Errorf("failed to run pipeline: %v", err)
}
return nil
}
// Concept #2 wrap wasm function execution within a DoFn.
// wasmFn wraps a DoFn to execute a Rust compiled wasm function
type embeddedWasmFn struct {
r wazero.Runtime
mod api.Module
greeting, allocate, deallocate api.Function
}
// Setup loads and initializes the embedded wasm functions
// Concept #1: Load a compiled wasm file []byte content and function.
// This example is derived from
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
func (fn *embeddedWasmFn) Setup(ctx context.Context) error {
// Create a new WebAssembly Runtime.
// Typically, a defer r.Close() would be called subsequently after. Yet, we need to keep this in memory
// throughout the DoFn lifecycle after which we invoke r.Close(); see Teardown below.
fn.r = wazero.NewRuntime(ctx)
// Instantiate a Go-defined module named "env" that exports a function to
// log to the console.
_, err := fn.r.NewHostModuleBuilder("env").
NewFunctionBuilder().WithFunc(logString).Export("log").
Instantiate(ctx)
if err != nil {
return fmt.Errorf("failed to instantiate host module: %w", err)
}
// Instantiate a WebAssembly module that imports the "log" function defined
// in "env" and exports "memory" and functions we'll use in this example.
fn.mod, err = fn.r.Instantiate(ctx, greetWasm)
if err != nil {
return fmt.Errorf("failed to instantiate wasm module: %v", err)
}
// Get references to WebAssembly functions we'll use in this example.
fn.greeting = fn.mod.ExportedFunction(wasmFunctionName)
fn.allocate = fn.mod.ExportedFunction(wasmAllocateFunctionName)
fn.deallocate = fn.mod.ExportedFunction(wasmDeallocateFunctionName)
return nil
}
// ProcessElement processes a string calling a wasm function written in Rust
// This example is derived from
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
func (fn *embeddedWasmFn) ProcessElement(ctx context.Context, s string) (string, error) {
// We need to compute the size of s to use Rust's memory allocator.
size := uint64(len(s))
// Instead of an arbitrary memory offset, use Rust's allocator. Notice
// there is nothing string-specific in this allocation function. The same
// function could be used to pass binary serialized data to Wasm.
results, err := fn.allocate.Call(ctx, size)
if err != nil {
return "", fmt.Errorf("error calling allocate: %w", err)
}
ptr := results[0]
// This pointer was allocated by Rust, but owned by Go, So, we have to
// deallocate it when finished; defer means that this statement will be called when the function exits
defer fn.deallocate.Call(ctx, ptr, size)
// The pointer is a linear memory offset, which is where we write the value of the DoFn's input element s.
if !fn.mod.Memory().Write(uint32(ptr), []byte(s)) {
return "", fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d",
ptr, size, fn.mod.Memory().Size())
}
// Finally, we get the greeting message "Hello" concatenated to the DoFn's input element s.
// This shows how to read-back something allocated by Rust.
ptrSize, err := fn.greeting.Call(ctx, ptr, size)
resultPtr := uint32(ptrSize[0] >> 32)
resultSize := uint32(ptrSize[0])
// This pointer was allocated by Rust, but owned by Go, So, we have to
// deallocate it when finished; again defer flags Go to execute this statement when the function exits
defer fn.deallocate.Call(ctx, uint64(resultPtr), uint64(resultSize))
// The pointer is a linear memory offset, which is where we wrote the results of the string concatenation.
bytes, ok := fn.mod.Memory().Read(resultPtr, resultSize)
if !ok {
return "", fmt.Errorf("Memory.Read(%d, %d) out of range of memory size %d",
resultPtr, resultSize, fn.mod.Memory().Size())
}
// bytes contains our final result that we emit into the output PCollection
return string(bytes), nil
}
// Teardown the wazero.Runtime during the DoFn teardown lifecycle
func (fn *embeddedWasmFn) Teardown(ctx context.Context) error {
// Typically we would proceed wazero.Runtime's Close method with Go's defer keyword, just after instantiation.
// However, we need to keep the property in memory until the end of the DoFn lifecycle
if err := fn.r.Close(ctx); err != nil {
return fmt.Errorf("failed to close runtime: %w", err)
}
return nil
}
// logString is an exported function to the wasm module that logs to console output.
func logString(ctx context.Context, m api.Module, offset, byteCount uint32) {
buf, ok := m.Memory().Read(offset, byteCount)
if !ok {
log.Fatalf(ctx, "Memory.Read(%d, %d) out of range", offset, byteCount)
}
log.Info(ctx, string(buf))
}