[FLINK-18810][sdk] Golang remote functions SDK
diff --git a/pom.xml b/pom.xml
index e963671..5c6b963 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
         <module>statefun-sdk-protos</module>
         <module>statefun-sdk-java</module>
         <module>statefun-sdk-python</module>
+        <module>statefun-sdk-go</module>
         <module>statefun-kafka-io</module>
         <module>statefun-kinesis-io</module>
         <module>statefun-flink</module>
@@ -228,18 +229,20 @@
                         <!-- Generated content -->
                         <exclude>**/target/**</exclude>
                         <exclude>**/_build/**</exclude>
-						<exclude>docs/static/font-awesome/**</exclude>
-						<exclude>docs/resources/**</exclude>
-						<exclude>docs/public/**</exclude>
-						<exclude>docs/themes/book/**</exclude>
-						<exclude>docs/assets/github.css</exclude>		
-						<exclude>docs/static/js/anchor.min.js</exclude>
+                        <exclude>docs/static/font-awesome/**</exclude>
+                        <exclude>docs/resources/**</exclude>
+                        <exclude>docs/public/**</exclude>
+                        <exclude>docs/themes/book/**</exclude>
+                        <exclude>docs/assets/github.css</exclude>
+                        <exclude>docs/static/js/anchor.min.js</exclude>
                         <!-- Generated code  -->
                         <exclude>**/generated/**</exclude>
                         <!-- Bundled license files -->
                         <exclude>**/LICENSE*</exclude>
                         <!-- Python venv -->
                         <exclude>**/venv/**</exclude>
+                        <!-- Generated lock file -->
+                        <exclude>**/go.sum/**</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/statefun-sdk-go/generate-dev-protos.sh b/statefun-sdk-go/generate-dev-protos.sh
new file mode 100644
index 0000000..690c0f9
--- /dev/null
+++ b/statefun-sdk-go/generate-dev-protos.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+SDK_PROTOS_DIR="${BASE_DIR}/../statefun-sdk-protos/src/main/protobuf"
+
+
+cd ${BASE_DIR}
+find ${SDK_PROTOS_DIR} -type f -name "*proto" -exec cp {} . \;
+protoc *proto --go_out=v3/internal/protocol/
+rm *proto
+cd ${CURR_DIR}
\ No newline at end of file
diff --git a/statefun-sdk-go/go.mod b/statefun-sdk-go/go.mod
new file mode 100644
index 0000000..a8b54b1
--- /dev/null
+++ b/statefun-sdk-go/go.mod
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module github.com/apache/flink-statefun/statefun-sdk-go
+
+go 1.16
+
+require (
+	github.com/stretchr/testify v1.7.0
+	google.golang.org/protobuf v1.26.0
+)
diff --git a/statefun-sdk-go/go.sum b/statefun-sdk-go/go.sum
new file mode 100644
index 0000000..cc352a6
--- /dev/null
+++ b/statefun-sdk-go/go.sum
@@ -0,0 +1,19 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/statefun-sdk-go/pom.xml b/statefun-sdk-go/pom.xml
new file mode 100644
index 0000000..4d15061
--- /dev/null
+++ b/statefun-sdk-go/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>3.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>statefun-sdk-go</artifactId>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/statefun-sdk-go/run-unit-tests.sh b/statefun-sdk-go/run-unit-tests.sh
new file mode 100755
index 0000000..1070e9b
--- /dev/null
+++ b/statefun-sdk-go/run-unit-tests.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if ! command -v go &> /dev/null
+then
+    echo "Could not find go compiler; skipping tests"
+    exit 0
+fi
+
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+GO_SDK_DIR="${BASE_DIR}/../statefun-sdk-go/"
+
+
+cd ${GO_SDK_DIR}
+go test ./...
+cd ${CURR_DIR}
\ No newline at end of file
diff --git a/statefun-sdk-go/v3/pkg/statefun/cancellation.go b/statefun-sdk-go/v3/pkg/statefun/cancellation.go
new file mode 100644
index 0000000..4747b95
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/cancellation.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"errors"
+	"fmt"
+)
+
+// CancellationToken tags a delayed message send with statefun.SendAfterWithCancellationToken.
+// It can then be used to cancel said message on a best effort basis with statefun.CancelDelayedMessage.
+// The underlying string token can be retrieved by invoking Token().
+type CancellationToken interface {
+	fmt.Stringer
+
+	// Token returns the underlying string
+	// used to create the CancellationToken.
+	Token() string
+
+	// prevents external implementations
+	// of the interface.
+	internal()
+}
+
+type token string
+
+func (t token) String() string {
+	return "CancellationToken(" + string(t) + ")"
+}
+
+func (t token) Token() string {
+	return string(t)
+}
+
+func (t token) internal() {}
+
+// NewCancellationToken creates a new cancellation token or
+// returns an error if the token is invalid.
+func NewCancellationToken(t string) (CancellationToken, error) {
+	if len(t) == 0 {
+		return nil, errors.New("cancellation token cannot be empty")
+	}
+	return token(t), nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go b/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go
new file mode 100644
index 0000000..a6e49b7
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestNewCancellationToken(t *testing.T) {
+	_, err := NewCancellationToken("")
+	assert.Error(t, err, "empty strings should fail token validation")
+
+	token, err := NewCancellationToken("token")
+	assert.NoError(t, err, "failed to validate token")
+	assert.Equal(t, "token", token.Token(), "failed to return correct token")
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go b/statefun-sdk-go/v3/pkg/statefun/context.go
new file mode 100644
index 0000000..677cfcd
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/context.go
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"context"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"sync"
+	"time"
+)
+
+// A Context contains information about the current function invocation, such as the invoked
+// function instance's and caller's Address. It is also used for side effects as a result of
+// the invocation such as send messages to other functions or egresses, and provides access to
+// AddressScopedStorage scoped to the current Address. This type is also a context.Context
+// and can be used to ensure any spawned go routines do not outlive the current function
+// invocation.
+type Context interface {
+	context.Context
+
+	// Self is the current invoked function instance's Address.
+	Self() Address
+
+	// Caller is the caller function instance's Address, if applicable. This is nil
+	// if the message was sent to this function via an ingress.
+	Caller() *Address
+
+	// Send forwards out a MessageBuilder to another function.
+	Send(message MessageBuilder)
+
+	// SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay.
+	SendAfter(delay time.Duration, message MessageBuilder)
+
+	// SendAfterWithCancellationToken forwards out a MessageBuilder to another function,
+	// after a specified time.Duration delay. The message is tagged with a non-empty,
+	//unique token to attach to this message, to be used for message cancellation
+	SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder)
+
+	// CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken).
+	// NOTE: this is a best-effort operation, since the message might have been already delivered.
+	// If the message was delivered, this is a no-op operation.
+	CancelDelayedMessage(token CancellationToken)
+
+	// SendEgress forwards out an EgressBuilder to an egress.
+	SendEgress(egress EgressBuilder)
+
+	// Storage returns the AddressScopedStorage, providing access to stored values scoped to the
+	// current invoked function instance's Address (which is obtainable using Self()).
+	Storage() AddressScopedStorage
+}
+
+type statefunContext struct {
+	sync.Mutex
+	context.Context
+	self     Address
+	caller   *Address
+	storage  *storage
+	response *protocol.FromFunction_InvocationResponse
+}
+
+func (s *statefunContext) Storage() AddressScopedStorage {
+	return s.storage
+}
+
+func (s *statefunContext) Self() Address {
+	return s.self
+}
+
+func (s *statefunContext) Caller() *Address {
+	return s.caller
+}
+
+func (s *statefunContext) Send(message MessageBuilder) {
+	msg, err := message.ToMessage()
+
+	if err != nil {
+		panic(err)
+	}
+
+	invocation := &protocol.FromFunction_Invocation{
+		Target:   msg.target,
+		Argument: msg.typedValue,
+	}
+
+	s.Lock()
+	s.response.OutgoingMessages = append(s.response.OutgoingMessages, invocation)
+	s.Unlock()
+}
+
+func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) {
+	msg, err := message.ToMessage()
+
+	if err != nil {
+		panic(err)
+	}
+
+	invocation := &protocol.FromFunction_DelayedInvocation{
+		Target:    msg.target,
+		Argument:  msg.typedValue,
+		DelayInMs: delay.Milliseconds(),
+	}
+
+	s.Lock()
+	s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+	s.Unlock()
+}
+
+func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) {
+	msg, err := message.ToMessage()
+
+	if err != nil {
+		panic(err)
+	}
+
+	invocation := &protocol.FromFunction_DelayedInvocation{
+		CancellationToken: token.Token(),
+		Target:            msg.target,
+		Argument:          msg.typedValue,
+		DelayInMs:         delay.Milliseconds(),
+	}
+
+	s.Lock()
+	s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+	s.Unlock()
+}
+
+func (s *statefunContext) CancelDelayedMessage(token CancellationToken) {
+	invocation := &protocol.FromFunction_DelayedInvocation{
+		IsCancellationRequest: true,
+		CancellationToken:     token.Token(),
+	}
+
+	s.Lock()
+	s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+	s.Unlock()
+}
+
+func (s *statefunContext) SendEgress(egress EgressBuilder) {
+	msg, err := egress.toEgressMessage()
+
+	if err != nil {
+		panic(err)
+	}
+
+	s.Lock()
+	s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
+	s.Unlock()
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go b/statefun-sdk-go/v3/pkg/statefun/context_test.go
new file mode 100644
index 0000000..18b3d1c
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/proto"
+	"testing"
+	"time"
+)
+
+func TestStatefunContext_Send(t *testing.T) {
+	context := createContext()
+
+	msg := MessageBuilder{
+		Target: Address{
+			FunctionType: TypeNameFrom("example/func"),
+			Id:           "0",
+		},
+		Value: "hello",
+	}
+
+	context.Send(msg)
+	outgoing := context.response.GetOutgoingMessages()
+
+	assert.Equal(t, 1, len(outgoing), "incorrect number of outgoing messages")
+	assert.Equal(t, "example", outgoing[0].Target.Namespace, "incorrect target namespace")
+	assert.Equal(t, "func", outgoing[0].Target.Type, "incorrect target type")
+	assert.Equal(t, "0", outgoing[0].Target.Id, "incorrect target id")
+	assert.Equal(t, stringTypeName.String(), outgoing[0].Argument.Typename, "incorrect typename set for message")
+	assert.True(t, outgoing[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_SendAfter(t *testing.T) {
+	context := createContext()
+
+	msg := MessageBuilder{
+		Target: Address{
+			FunctionType: TypeNameFrom("example/func"),
+			Id:           "0",
+		},
+		Value: "hello",
+	}
+
+	context.SendAfter(time.Duration(1)*time.Millisecond, msg)
+	delayed := context.response.GetDelayedInvocations()
+
+	assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+	assert.Equal(t, "example", delayed[0].Target.Namespace, "incorrect target namespace")
+	assert.Equal(t, "func", delayed[0].Target.Type, "incorrect target type")
+	assert.Equal(t, "0", delayed[0].Target.Id, "incorrect target id")
+	assert.Equal(t, int64(1), delayed[0].DelayInMs, "incorrect delay")
+	assert.Equal(t, "", delayed[0].CancellationToken, "set cancellation token")
+	assert.False(t, delayed[0].IsCancellationRequest, "delayed message should not be a cancellation request")
+	assert.Equal(t, stringTypeName.String(), delayed[0].Argument.Typename, "incorrect typename set for message")
+	assert.True(t, delayed[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_SendAfterWithCancellationTokenMessage(t *testing.T) {
+	context := createContext()
+
+	msg := MessageBuilder{
+		Target: Address{
+			FunctionType: TypeNameFrom("example/func"),
+			Id:           "0",
+		},
+		Value: "hello",
+	}
+
+	token, err := NewCancellationToken("token")
+	assert.NoError(t, err, "failed to create token")
+
+	context.SendAfterWithCancellationToken(time.Duration(1)*time.Millisecond, token, msg)
+	delayed := context.response.GetDelayedInvocations()
+
+	assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+	assert.Equal(t, "example", delayed[0].Target.Namespace, "incorrect target namespace")
+	assert.Equal(t, "func", delayed[0].Target.Type, "incorrect target type")
+	assert.Equal(t, "0", delayed[0].Target.Id, "incorrect target id")
+	assert.Equal(t, int64(1), delayed[0].DelayInMs, "incorrect delay")
+	assert.Equal(t, token.Token(), delayed[0].CancellationToken, "failed to set cancellation token")
+	assert.False(t, delayed[0].IsCancellationRequest, "delayed message should not be a cancellation request")
+	assert.Equal(t, stringTypeName.String(), delayed[0].Argument.Typename, "incorrect typename set for message")
+	assert.True(t, delayed[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_CancelDelayedMessage(t *testing.T) {
+	context := createContext()
+	token, err := NewCancellationToken("token")
+	assert.NoError(t, err, "failed to create token")
+
+	context.CancelDelayedMessage(token)
+	delayed := context.response.GetDelayedInvocations()
+	assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+	assert.Equal(t, token.Token(), delayed[0].CancellationToken, "failed to set cancellation token")
+	assert.True(t, delayed[0].IsCancellationRequest, "delayed message should be a cancellation request")
+}
+
+func TestStatefunContext_SendEgress_Kafka(t *testing.T) {
+	context := createContext()
+
+	kafka := &KafkaEgressBuilder{
+		Target: TypeNameFrom("example/kafka"),
+		Topic:  "topic",
+		Key:    "key",
+		Value:  "value",
+	}
+
+	context.SendEgress(kafka)
+	egress := context.response.GetOutgoingEgresses()
+
+	assert.Equal(t, 1, len(egress), "incorrect number of egress messages")
+	assert.Equal(t, "example", egress[0].EgressNamespace, "incorrect target namespace")
+	assert.Equal(t, "kafka", egress[0].EgressType, "incorrect target type")
+	assert.Equal(t, kafkaTypeName, egress[0].Argument.Typename, "incorrect typename")
+
+	kafkaRecord := protocol.KafkaProducerRecord{}
+	assert.NoError(t, proto.Unmarshal(egress[0].Argument.Value, &kafkaRecord), "failed to deserialize kafka record")
+	assert.Equal(t, "topic", kafkaRecord.Topic, "incorrect kafka topic")
+	assert.Equal(t, "key", kafkaRecord.Key, "incorrect kafka key")
+}
+
+func TestStatefunContext_SendEgress_Kinesis(t *testing.T) {
+	context := createContext()
+
+	kafka := &KinesisEgressBuilder{
+		Target:       TypeNameFrom("example/kinesis"),
+		Stream:       "stream",
+		PartitionKey: "key",
+		Value:        "value",
+	}
+
+	context.SendEgress(kafka)
+	egress := context.response.GetOutgoingEgresses()
+
+	assert.Equal(t, 1, len(egress), "incorrect number of egress messages")
+	assert.Equal(t, "example", egress[0].EgressNamespace, "incorrect target namespace")
+	assert.Equal(t, "kinesis", egress[0].EgressType, "incorrect target type")
+	assert.Equal(t, kinesisTypeName, egress[0].Argument.Typename, "incorrect typename")
+
+	kinesis := protocol.KinesisEgressRecord{}
+	assert.NoError(t, proto.Unmarshal(egress[0].Argument.Value, &kinesis), "failed to deserialize kinesis record")
+	assert.Equal(t, "stream", kinesis.Stream, "incorrect kinesis stream")
+	assert.Equal(t, "key", kinesis.PartitionKey, "incorrect kinesis key")
+}
+
+// creates a context with the minimal state to
+// run tests.
+func createContext() *statefunContext {
+	return &statefunContext{
+		response: &protocol.FromFunction_InvocationResponse{},
+	}
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/egress.go b/statefun-sdk-go/v3/pkg/statefun/egress.go
new file mode 100644
index 0000000..20e7644
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/egress.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"google.golang.org/protobuf/proto"
+)
+
+const (
+	kafkaTypeName   = "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord"
+	kinesisTypeName = "type.googleapis.com/io.statefun.sdk.egress.KinesisEgressRecord"
+)
+
+type EgressBuilder interface {
+	toEgressMessage() (*protocol.FromFunction_EgressMessage, error)
+}
+
+// KafkaEgressBuilder builds a message that can be emitted to a Kafka generic egress.
+// If a ValueType is provided, then Value will be serialized according to the
+// provided ValueType's serializer. Otherwise we will try to convert Value to bytes
+// if it is one of:
+//   - utf-8 string
+//   - []bytes
+//   - an int (as defined by Kafka's serialization format)
+//   - float (as defined by Kafka's serialization format)
+type KafkaEgressBuilder struct {
+	// The TypeName as specified in module.yaml
+	Target TypeName
+
+	// The Kafka destination topic for that record
+	Topic string
+
+	// The utf8 encoded string key to produce (can be empty)
+	Key string
+
+	// The value to produce
+	Value interface{}
+
+	// An optional hint to this values type
+	ValueType SimpleType
+}
+
+func (k KafkaEgressBuilder) isEnvelope() {}
+
+func (k KafkaEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+	if k.Target == nil {
+		return nil, errors.New("an egress record requires a Target")
+	}
+	if k.Topic == "" {
+		return nil, errors.New("A Kafka record requires a topic")
+	}
+
+	if k.Value == nil {
+		return nil, errors.New("A Kafka record requires a value")
+	}
+
+	buffer := bytes.Buffer{}
+	if k.ValueType != nil {
+		if err := k.ValueType.Serialize(&buffer, k.Value); err != nil {
+			return nil, err
+		}
+	} else {
+		switch value := k.Value.(type) {
+		case string:
+			_ = StringType.Serialize(&buffer, value)
+		case []byte:
+			buffer.Write(value)
+		case int, int32, int64, float32, float64:
+			if err := binary.Write(&buffer, binary.BigEndian, value); err != nil {
+				return nil, err
+			}
+		default:
+			return nil, errors.New("unable to convert value to bytes")
+		}
+	}
+
+	kafka := protocol.KafkaProducerRecord{
+		Key:        k.Key,
+		ValueBytes: buffer.Bytes(),
+		Topic:      k.Topic,
+	}
+
+	value, err := proto.Marshal(&kafka)
+	if err != nil {
+		return nil, err
+	}
+
+	return &protocol.FromFunction_EgressMessage{
+		EgressNamespace: k.Target.GetNamespace(),
+		EgressType:      k.Target.GetType(),
+		Argument: &protocol.TypedValue{
+			Typename: kafkaTypeName,
+			HasValue: true,
+			Value:    value,
+		},
+	}, nil
+}
+
+// KinesisEgressBuilder builds a message that can be emitted to a Kinesis generic egress.
+// If a ValueType is provided, then Value will be serialized according to the
+// provided ValueType's serializer. Otherwise we will try to convert Value to bytes
+// if it is one of:
+//   - utf-8 string
+//   - []byte
+type KinesisEgressBuilder struct {
+	// The TypeName as specified in module.yaml
+	Target TypeName
+
+	// The Kinesis destination stream for that record
+	Stream string
+
+	// The value to produce
+	Value interface{}
+
+	// An optional hint to this value type
+	ValueType SimpleType
+
+	// The utf8 encoded string partition key to use
+	PartitionKey string
+
+	// A utf8 encoded string explicit hash key to use (can be empty)
+	ExplicitHashKey string
+}
+
+func (k KinesisEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+	if k.Target == nil {
+		return nil, errors.New("an egress record requires a Target")
+	} else if k.Stream == "" {
+		return nil, errors.New("missing destination Kinesis stream")
+	} else if k.Value == nil {
+		return nil, errors.New("missing value")
+	} else if k.PartitionKey == "" {
+		return nil, errors.New("missing partition key")
+	}
+
+	buffer := bytes.Buffer{}
+	if k.ValueType != nil {
+		if err := k.ValueType.Serialize(&buffer, k.Value); err != nil {
+			return nil, err
+		}
+	} else {
+		switch value := k.Value.(type) {
+		case string:
+			_ = StringType.Serialize(&buffer, value)
+		case []byte:
+			buffer.Write(value)
+		default:
+			return nil, errors.New("unable to convert value to bytes")
+		}
+	}
+
+	kinesis := protocol.KinesisEgressRecord{
+		PartitionKey:    k.PartitionKey,
+		ValueBytes:      buffer.Bytes(),
+		Stream:          k.Stream,
+		ExplicitHashKey: k.ExplicitHashKey,
+	}
+
+	value, err := proto.Marshal(&kinesis)
+	if err != nil {
+		return nil, err
+	}
+
+	return &protocol.FromFunction_EgressMessage{
+		EgressNamespace: k.Target.GetNamespace(),
+		EgressType:      k.Target.GetType(),
+		Argument: &protocol.TypedValue{
+			Typename: kinesisTypeName,
+			HasValue: true,
+			Value:    value,
+		},
+	}, nil
+}
+
+// GenericEgressBuilder create a generic egress record. For Kafka
+// and Kinesis see KafkaEgressBuilder and
+// KinesisEgressBuilder respectively
+type GenericEgressBuilder struct {
+	// The TypeName as specified when registered
+	Target TypeName
+
+	// The value to produce
+	Value interface{}
+
+	// The values type
+	ValueType SimpleType
+}
+
+func (g GenericEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+	if g.Target == nil {
+		return nil, errors.New("an egress record requires a Target")
+	} else if g.ValueType == nil {
+		return nil, errors.New("missing value type")
+	} else if g.Value == nil {
+		return nil, errors.New("missing value")
+	}
+
+	buffer := bytes.Buffer{}
+	if err := g.ValueType.Serialize(&buffer, g.Value); err != nil {
+		return nil, err
+	}
+
+	return &protocol.FromFunction_EgressMessage{
+		EgressNamespace: g.Target.GetNamespace(),
+		EgressType:      g.Target.GetType(),
+		Argument: &protocol.TypedValue{
+			Typename: g.ValueType.GetTypeName().String(),
+			HasValue: true,
+			Value:    buffer.Bytes(),
+		},
+	}, nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/handler.go b/statefun-sdk-go/v3/pkg/statefun/handler.go
new file mode 100644
index 0000000..92e6f91
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/handler.go
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"google.golang.org/protobuf/proto"
+	"log"
+	"net/http"
+)
+
+// StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler
+// can be created from the registry that understands how to dispatch
+// invocation requests to the registered functions as well as encode
+// side-effects (e.g., sending messages to other functions or updating
+// values in storage) as the response.
+type StatefulFunctions interface {
+
+	// WithSpec registers a StatefulFunctionSpec, which will be
+	// used to build the runtime function. It returns an error
+	// if the specification is invalid and the handler
+	// fails to register the function.
+	WithSpec(spec StatefulFunctionSpec) error
+
+	// AsHandler creates a RequestReplyHandler from the registered
+	// function specs.
+	AsHandler() RequestReplyHandler
+}
+
+// The RequestReplyHandler processes messages
+// from the runtime, invokes functions, and encodes
+// side effects. The handler implements http.Handler
+// so it can easily be embedded in standard Go server
+// frameworks.
+type RequestReplyHandler interface {
+	http.Handler
+
+	// Invoke method provides compliance with AWS Lambda handler
+	Invoke(ctx context.Context, payload []byte) ([]byte, error)
+}
+
+// StatefulFunctionsBuilder creates a new StatefulFunctions registry.
+func StatefulFunctionsBuilder() StatefulFunctions {
+	return &handler{
+		module:     map[TypeName]StatefulFunction{},
+		stateSpecs: map[TypeName]map[string]*protocol.FromFunction_PersistedValueSpec{},
+	}
+}
+
+type handler struct {
+	module     map[TypeName]StatefulFunction
+	stateSpecs map[TypeName]map[string]*protocol.FromFunction_PersistedValueSpec
+}
+
+func (h *handler) WithSpec(spec StatefulFunctionSpec) error {
+	log.Printf("registering Stateful Function %v]n", spec.FunctionType)
+	if _, exists := h.module[spec.FunctionType]; exists {
+		err := fmt.Errorf("failed to register Stateful Function %s, there is already a spec registered under that type", spec.FunctionType)
+		log.Printf(err.Error())
+		return err
+	}
+
+	if spec.Function == nil {
+		err := fmt.Errorf("failed to register Stateful Function %s, the Function instance cannot be nil", spec.FunctionType)
+		log.Printf(err.Error())
+		return err
+	}
+
+	valueSpecs := make(map[string]*protocol.FromFunction_PersistedValueSpec, len(spec.States))
+
+	for _, state := range spec.States {
+		log.Printf("registering state specification %v\n", state)
+		if err := validateValueSpec(state); err != nil {
+			err := fmt.Errorf("failed to register Stateful Function %s: %w", spec.FunctionType, err)
+			log.Printf(err.Error())
+			return err
+		}
+
+		expiration := &protocol.FromFunction_ExpirationSpec{}
+		switch state.Expiration.expirationType {
+		case none:
+			expiration.Mode = protocol.FromFunction_ExpirationSpec_NONE
+		case expireAfterWrite:
+			expiration.Mode = protocol.FromFunction_ExpirationSpec_AFTER_WRITE
+			expiration.ExpireAfterMillis = state.Expiration.duration.Milliseconds()
+		case expireAfterCall:
+			expiration.Mode = protocol.FromFunction_ExpirationSpec_AFTER_INVOKE
+			expiration.ExpireAfterMillis = state.Expiration.duration.Milliseconds()
+		}
+
+		valueSpecs[state.Name] = &protocol.FromFunction_PersistedValueSpec{
+			StateName:      state.Name,
+			ExpirationSpec: expiration,
+			TypeTypename:   state.ValueType.GetTypeName().String(),
+		}
+	}
+
+	h.module[spec.FunctionType] = spec.Function
+	h.stateSpecs[spec.FunctionType] = valueSpecs
+
+	return nil
+}
+
+func (h *handler) AsHandler() RequestReplyHandler {
+	return h
+}
+
+func (h *handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
+	if request.Method != "POST" {
+		http.Error(writer, "invalid request method", http.StatusMethodNotAllowed)
+		return
+	}
+
+	contentType := request.Header.Get("Content-type")
+	if contentType != "" && contentType != "application/octet-stream" {
+		http.Error(writer, "invalid content type", http.StatusUnsupportedMediaType)
+		return
+	}
+
+	if request.Body == nil || request.ContentLength == 0 {
+		http.Error(writer, "empty request body", http.StatusBadRequest)
+		return
+	}
+
+	buffer := bytes.Buffer{}
+	if _, err := buffer.ReadFrom(request.Body); err != nil {
+		http.Error(writer, err.Error(), http.StatusBadRequest)
+		return
+	}
+
+	response, err := h.Invoke(request.Context(), buffer.Bytes())
+	if err != nil {
+		log.Printf(err.Error())
+		http.Error(writer, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	_, _ = writer.Write(response)
+}
+
+func (h *handler) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
+	toFunction := protocol.ToFunction{}
+	if err := proto.Unmarshal(payload, &toFunction); err != nil {
+		return nil, fmt.Errorf("failed to unmarshal ToFunction: %w", err)
+	}
+
+	fromFunction, err := h.invoke(ctx, &toFunction)
+	if err != nil {
+		return nil, err
+	}
+
+	return proto.Marshal(fromFunction)
+}
+
+func (h *handler) invoke(ctx context.Context, toFunction *protocol.ToFunction) (from *protocol.FromFunction, err error) {
+	batch := toFunction.GetInvocation()
+	self := addressFromInternal(batch.Target)
+	function, exists := h.module[self.FunctionType]
+
+	defer func() {
+		if r := recover(); r != nil {
+			switch r := r.(type) {
+			case error:
+				err = fmt.Errorf("failed to execute invocation for %s: %w", batch.Target, r)
+			default:
+				log.Fatal(r)
+			}
+		}
+	}()
+
+	if !exists {
+		return nil, fmt.Errorf("unknown function type %s", self.FunctionType)
+	}
+
+	storageFactory := newStorageFactory(batch, h.stateSpecs[self.FunctionType])
+
+	if missing := storageFactory.getMissingSpecs(); missing != nil {
+		log.Printf("missing state specs for function type %v", self)
+		for _, spec := range missing {
+			log.Printf("registering missing specs %v", spec)
+		}
+		return &protocol.FromFunction{
+			Response: &protocol.FromFunction_IncompleteInvocationContext_{
+				IncompleteInvocationContext: &protocol.FromFunction_IncompleteInvocationContext{
+					MissingValues: missing,
+				},
+			},
+		}, nil
+	}
+
+	storage := storageFactory.getStorage()
+	response := &protocol.FromFunction_InvocationResponse{}
+
+	for _, invocation := range batch.Invocations {
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+			sContext := statefunContext{
+				self:     self,
+				storage:  storage,
+				response: response,
+			}
+
+			var cancel context.CancelFunc
+			sContext.Context, cancel = context.WithCancel(ctx)
+
+			var caller Address
+			if invocation.Caller != nil {
+				caller = addressFromInternal(invocation.Caller)
+			}
+			sContext.caller = &caller
+			msg := Message{
+				target:     batch.Target,
+				typedValue: invocation.Argument,
+			}
+			err = function.Invoke(&sContext, msg)
+			cancel()
+
+			if err != nil {
+				return
+			}
+		}
+	}
+
+	response.StateMutations = storage.getStateMutations()
+	from = &protocol.FromFunction{
+		Response: &protocol.FromFunction_InvocationResult{
+			InvocationResult: response,
+		},
+	}
+
+	return
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/cell.go b/statefun-sdk-go/v3/pkg/statefun/internal/cell.go
new file mode 100644
index 0000000..f3479c8
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/cell.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"io"
+)
+
+// smallBufferSize is an initial allocation minimal capacity.
+// this is the same constant as used in bytes.Buffer.
+const smallBufferSize = 64
+
+// Cell is a mutable, persisted value.
+// This struct is not thread safe.
+type Cell struct {
+	buf          []byte // contents are the bytes buf[off : len(buf)].
+	off          int    // read at &buf[off], writes always begin from buf[0]
+	mutated      bool   // tracker if the cell has been mutated
+	hasValue     bool   // tracker if the cell has a valid value
+	typeTypeName string // the typename of the type whose serialized contents are stored in the cell
+}
+
+// NewCell creates and initializes a new Cell using state's value as its
+// initial contents. The new Cell takes ownership of state, and the
+// caller should not use state after this call.
+func NewCell(state *protocol.ToFunction_PersistedValue, typeTypeName string) *Cell {
+	c := &Cell{
+		typeTypeName: typeTypeName,
+	}
+
+	if state.StateValue != nil {
+		c.hasValue = true
+		c.buf = state.StateValue.Value
+	}
+
+	return c
+}
+
+// SeekToBeginning resets the cell so the next
+// read starts from the beginning of the underlying
+// buffer, regardless of where the last read left off
+func (c *Cell) SeekToBeginning() {
+	c.off = 0
+}
+
+// Read reads up to len(p) bytes into p. It returns the number of bytes
+// read (0 <= n <= len(p)) and any error encountered. Read is resumable
+// and returns EOF when there are no more bytes to read. This behavior
+// is required for Cell to interoperate with the go standard library.
+// Users of Cell are required to call SeekToBeginning, before the first
+// read to ensure reads always begin at the start of the buffer.
+func (c *Cell) Read(p []byte) (n int, err error) {
+	if c.empty() {
+		if len(p) == 0 {
+			return 0, nil
+		}
+		return 0, io.EOF
+	}
+	n = copy(p, c.buf[c.off:])
+	c.off += n
+	return n, nil
+}
+
+// Write writes the given slice into the cell.
+// Unlike standard implementations of Write,
+// cells always overwrite any existing data.
+func (c *Cell) Write(p []byte) (n int, err error) {
+	c.mutated = true
+	c.hasValue = true
+
+	c.sizeBufferForCapacity(len(p))
+
+	c.off = 0
+	return copy(c.buf, p), nil
+}
+
+// sizeBufferForCapacity resizes the buffer to guarantee space for n bytes,
+// attempting to first re-slice the underlying buffer to avoid allocations.
+func (c *Cell) sizeBufferForCapacity(n int) {
+	if c.buf != nil && n <= cap(c.buf) {
+		c.buf = c.buf[:n]
+	} else if n <= smallBufferSize {
+		c.buf = make([]byte, n, smallBufferSize)
+	} else {
+		c.buf = make([]byte, n)
+	}
+}
+
+// Delete marks the value to be deleted and resets the buffer to be empty,
+// but it retains the underlying storage for use by future writes.
+func (c *Cell) Delete() {
+	c.mutated = true
+	c.hasValue = false
+	c.buf = c.buf[:0]
+	c.off = 0
+}
+
+// HasValue returns true if the cell contains a valid value,
+// if the value is false, calls to Read will consume 0 bytes
+func (c *Cell) HasValue() bool {
+	return c.hasValue
+}
+
+// GetStateMutation turns the final Cell into a FromFunction_PersistedValueMutation.
+// The new FromFunction_PersistedValueMutation takes ownership of the underlying
+// buffer and the cell should not be used after this function returns.
+func (c *Cell) GetStateMutation(name string) *protocol.FromFunction_PersistedValueMutation {
+	if !c.mutated {
+		return nil
+	}
+
+	mutation := &protocol.FromFunction_PersistedValueMutation{
+		MutationType: protocol.FromFunction_PersistedValueMutation_DELETE,
+		StateName:    name,
+	}
+
+	if c.hasValue {
+		mutation.MutationType = protocol.FromFunction_PersistedValueMutation_MODIFY
+
+		mutation.StateValue = &protocol.TypedValue{
+			Typename: c.typeTypeName,
+			HasValue: true,
+			Value:    c.buf,
+		}
+	}
+
+	return mutation
+}
+
+// empty reports whether the unread portion of the buffer is empty.
+func (c *Cell) empty() bool { return len(c.buf) <= c.off }
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go b/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go
new file mode 100644
index 0000000..198c74a
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"github.com/stretchr/testify/assert"
+	"io"
+	"testing"
+)
+
+func TestCellReadWrite(t *testing.T) {
+	cell := NewCell(&protocol.ToFunction_PersistedValue{
+		StateName: "state",
+	}, "typename")
+
+	data := []byte{0, 1, 0, 1}
+	n, err := cell.Write(data)
+
+	assert.NoError(t, err, "unexpected error writing data")
+	assert.Equal(t, len(data), n, "unexpected number of bytes written")
+
+	read := make([]byte, 4)
+
+	cell.SeekToBeginning()
+	n, err = cell.Read(read)
+
+	assert.NoError(t, err, "unexpected error reading data")
+	assert.Equal(t, len(read), n, "unexpected number of bytes read")
+
+	assert.Equal(t, data, read, "unexpected bytes read from cell")
+
+	cell.SeekToBeginning()
+	allBytes, _ := io.ReadAll(cell)
+
+	assert.Equal(t, len(allBytes), n, "unexpected number of bytes read")
+	assert.Equal(t, data, allBytes, "unexpected bytes read from cell")
+
+	secondRead := make([]byte, 4)
+
+	cell.SeekToBeginning()
+	n, err = cell.Read(secondRead)
+
+	assert.NoError(t, err, "unexpected error reading data")
+	assert.Equal(t, len(secondRead), n, "unexpected number of bytes read")
+
+	assert.Equal(t, data, secondRead, "unexpected bytes read from cell")
+
+	cell.SeekToBeginning()
+	allBytes, _ = io.ReadAll(cell)
+
+	assert.Equal(t, len(allBytes), n, "unexpected number of bytes read")
+	assert.Equal(t, data, allBytes, "unexpected bytes read from cell")
+}
+
+func TestCellReadWriteFromInitial(t *testing.T) {
+	initial := []byte{0, 1, 0, 1}
+	cell := NewCell(&protocol.ToFunction_PersistedValue{
+		StateName: "state",
+		StateValue: &protocol.TypedValue{
+			Typename: "typename",
+			HasValue: true,
+			Value:    initial,
+		},
+	}, "typename")
+
+	read := make([]byte, 4)
+	n, err := cell.Read(read)
+
+	assert.NoError(t, err, "unexpected error reading data")
+	assert.Equal(t, len(read), n, "unexpected number of bytes read")
+
+	assert.Equal(t, initial, read, "unexpected bytes read from cell")
+
+	data := []byte{0, 0, 0, 1, 1, 1}
+	n, err = cell.Write(data)
+
+	assert.NoError(t, err, "unexpected error writing data")
+	assert.Equal(t, len(data), n, "unexpected number of bytes written")
+
+	secondRead := make([]byte, 6)
+	n, err = cell.Read(secondRead)
+
+	assert.NoError(t, err, "unexpected error reading data")
+	assert.Equal(t, len(secondRead), n, "unexpected number of bytes read")
+
+	assert.Equal(t, data, secondRead, "unexpected bytes read from cell")
+}
+
+func TestCell_EmptyWithNoValue(t *testing.T) {
+	initial := []byte{0, 1, 0, 1}
+	cell := NewCell(&protocol.ToFunction_PersistedValue{
+		StateName: "state",
+		StateValue: &protocol.TypedValue{
+			Typename: "typename",
+			HasValue: true,
+			Value:    initial,
+		},
+	}, "typename")
+
+	assert.True(t, cell.HasValue())
+	assert.False(t, cell.empty(), "cells with a value should not be empty before any read")
+
+	cell.Delete()
+	assert.False(t, cell.HasValue(), "cells that have been deleted should not have values")
+	assert.True(t, cell.empty(), "cells that have been deleted should be empty")
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go
new file mode 100644
index 0000000..7beb156
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go
@@ -0,0 +1,183 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.26.0
+// 	protoc        v3.14.0
+// source: kafka-egress.proto
+
+package protocol
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type KafkaProducerRecord struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Key        string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+	ValueBytes []byte `protobuf:"bytes,2,opt,name=value_bytes,json=valueBytes,proto3" json:"value_bytes,omitempty"`
+	Topic      string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
+}
+
+func (x *KafkaProducerRecord) Reset() {
+	*x = KafkaProducerRecord{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_kafka_egress_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *KafkaProducerRecord) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*KafkaProducerRecord) ProtoMessage() {}
+
+func (x *KafkaProducerRecord) ProtoReflect() protoreflect.Message {
+	mi := &file_kafka_egress_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use KafkaProducerRecord.ProtoReflect.Descriptor instead.
+func (*KafkaProducerRecord) Descriptor() ([]byte, []int) {
+	return file_kafka_egress_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *KafkaProducerRecord) GetKey() string {
+	if x != nil {
+		return x.Key
+	}
+	return ""
+}
+
+func (x *KafkaProducerRecord) GetValueBytes() []byte {
+	if x != nil {
+		return x.ValueBytes
+	}
+	return nil
+}
+
+func (x *KafkaProducerRecord) GetTopic() string {
+	if x != nil {
+		return x.Topic
+	}
+	return ""
+}
+
+var File_kafka_egress_proto protoreflect.FileDescriptor
+
+var file_kafka_egress_proto_rawDesc = []byte{
+	0x0a, 0x12, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70,
+	0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+	0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5e, 0x0a, 0x13,
+	0x4b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x52, 0x65, 0x63,
+	0x6f, 0x72, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x62,
+	0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x76, 0x61, 0x6c, 0x75,
+	0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18,
+	0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x3e, 0x0a, 0x2e,
+	0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b,
+	0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67,
+	0x72, 0x65, 0x73, 0x73, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01,
+	0x5a, 0x0a, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72,
+	0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_kafka_egress_proto_rawDescOnce sync.Once
+	file_kafka_egress_proto_rawDescData = file_kafka_egress_proto_rawDesc
+)
+
+func file_kafka_egress_proto_rawDescGZIP() []byte {
+	file_kafka_egress_proto_rawDescOnce.Do(func() {
+		file_kafka_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_kafka_egress_proto_rawDescData)
+	})
+	return file_kafka_egress_proto_rawDescData
+}
+
+var file_kafka_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_kafka_egress_proto_goTypes = []interface{}{
+	(*KafkaProducerRecord)(nil), // 0: io.statefun.sdk.egress.KafkaProducerRecord
+}
+var file_kafka_egress_proto_depIdxs = []int32{
+	0, // [0:0] is the sub-list for method output_type
+	0, // [0:0] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_kafka_egress_proto_init() }
+func file_kafka_egress_proto_init() {
+	if File_kafka_egress_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_kafka_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*KafkaProducerRecord); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_kafka_egress_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   1,
+			NumExtensions: 0,
+			NumServices:   0,
+		},
+		GoTypes:           file_kafka_egress_proto_goTypes,
+		DependencyIndexes: file_kafka_egress_proto_depIdxs,
+		MessageInfos:      file_kafka_egress_proto_msgTypes,
+	}.Build()
+	File_kafka_egress_proto = out.File
+	file_kafka_egress_proto_rawDesc = nil
+	file_kafka_egress_proto_goTypes = nil
+	file_kafka_egress_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go
new file mode 100644
index 0000000..078987d
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go
@@ -0,0 +1,195 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.26.0
+// 	protoc        v3.14.0
+// source: kinesis-egress.proto
+
+package protocol
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type KinesisEgressRecord struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	PartitionKey    string `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"`
+	ValueBytes      []byte `protobuf:"bytes,2,opt,name=value_bytes,json=valueBytes,proto3" json:"value_bytes,omitempty"`
+	Stream          string `protobuf:"bytes,3,opt,name=stream,proto3" json:"stream,omitempty"`
+	ExplicitHashKey string `protobuf:"bytes,4,opt,name=explicit_hash_key,json=explicitHashKey,proto3" json:"explicit_hash_key,omitempty"`
+}
+
+func (x *KinesisEgressRecord) Reset() {
+	*x = KinesisEgressRecord{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_kinesis_egress_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *KinesisEgressRecord) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*KinesisEgressRecord) ProtoMessage() {}
+
+func (x *KinesisEgressRecord) ProtoReflect() protoreflect.Message {
+	mi := &file_kinesis_egress_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use KinesisEgressRecord.ProtoReflect.Descriptor instead.
+func (*KinesisEgressRecord) Descriptor() ([]byte, []int) {
+	return file_kinesis_egress_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *KinesisEgressRecord) GetPartitionKey() string {
+	if x != nil {
+		return x.PartitionKey
+	}
+	return ""
+}
+
+func (x *KinesisEgressRecord) GetValueBytes() []byte {
+	if x != nil {
+		return x.ValueBytes
+	}
+	return nil
+}
+
+func (x *KinesisEgressRecord) GetStream() string {
+	if x != nil {
+		return x.Stream
+	}
+	return ""
+}
+
+func (x *KinesisEgressRecord) GetExplicitHashKey() string {
+	if x != nil {
+		return x.ExplicitHashKey
+	}
+	return ""
+}
+
+var File_kinesis_egress_proto protoreflect.FileDescriptor
+
+var file_kinesis_egress_proto_rawDesc = []byte{
+	0x0a, 0x14, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x2d, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73,
+	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65,
+	0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x22, 0x9f,
+	0x01, 0x0a, 0x13, 0x4b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73,
+	0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74,
+	0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70,
+	0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x76,
+	0x61, 0x6c, 0x75, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
+	0x52, 0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x16, 0x0a, 0x06,
+	0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74,
+	0x72, 0x65, 0x61, 0x6d, 0x12, 0x2a, 0x0a, 0x11, 0x65, 0x78, 0x70, 0x6c, 0x69, 0x63, 0x69, 0x74,
+	0x5f, 0x68, 0x61, 0x73, 0x68, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
+	0x0f, 0x65, 0x78, 0x70, 0x6c, 0x69, 0x63, 0x69, 0x74, 0x48, 0x61, 0x73, 0x68, 0x4b, 0x65, 0x79,
+	0x42, 0x3e, 0x0a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66,
+	0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64,
+	0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74,
+	0x65, 0x64, 0x50, 0x01, 0x5a, 0x0a, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
+	0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_kinesis_egress_proto_rawDescOnce sync.Once
+	file_kinesis_egress_proto_rawDescData = file_kinesis_egress_proto_rawDesc
+)
+
+func file_kinesis_egress_proto_rawDescGZIP() []byte {
+	file_kinesis_egress_proto_rawDescOnce.Do(func() {
+		file_kinesis_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_kinesis_egress_proto_rawDescData)
+	})
+	return file_kinesis_egress_proto_rawDescData
+}
+
+var file_kinesis_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_kinesis_egress_proto_goTypes = []interface{}{
+	(*KinesisEgressRecord)(nil), // 0: io.statefun.sdk.egress.KinesisEgressRecord
+}
+var file_kinesis_egress_proto_depIdxs = []int32{
+	0, // [0:0] is the sub-list for method output_type
+	0, // [0:0] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_kinesis_egress_proto_init() }
+func file_kinesis_egress_proto_init() {
+	if File_kinesis_egress_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_kinesis_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*KinesisEgressRecord); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_kinesis_egress_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   1,
+			NumExtensions: 0,
+			NumServices:   0,
+		},
+		GoTypes:           file_kinesis_egress_proto_goTypes,
+		DependencyIndexes: file_kinesis_egress_proto_depIdxs,
+		MessageInfos:      file_kinesis_egress_proto_msgTypes,
+	}.Build()
+	File_kinesis_egress_proto = out.File
+	file_kinesis_egress_proto_rawDesc = nil
+	file_kinesis_egress_proto_goTypes = nil
+	file_kinesis_egress_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go
new file mode 100644
index 0000000..ef5085d
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go
@@ -0,0 +1,1602 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.27.1
+// 	protoc        v3.17.3
+// source: request-reply.proto
+
+package protocol
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type FromFunction_PersistedValueMutation_MutationType int32
+
+const (
+	FromFunction_PersistedValueMutation_DELETE FromFunction_PersistedValueMutation_MutationType = 0
+	FromFunction_PersistedValueMutation_MODIFY FromFunction_PersistedValueMutation_MutationType = 1
+)
+
+// Enum value maps for FromFunction_PersistedValueMutation_MutationType.
+var (
+	FromFunction_PersistedValueMutation_MutationType_name = map[int32]string{
+		0: "DELETE",
+		1: "MODIFY",
+	}
+	FromFunction_PersistedValueMutation_MutationType_value = map[string]int32{
+		"DELETE": 0,
+		"MODIFY": 1,
+	}
+)
+
+func (x FromFunction_PersistedValueMutation_MutationType) Enum() *FromFunction_PersistedValueMutation_MutationType {
+	p := new(FromFunction_PersistedValueMutation_MutationType)
+	*p = x
+	return p
+}
+
+func (x FromFunction_PersistedValueMutation_MutationType) String() string {
+	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FromFunction_PersistedValueMutation_MutationType) Descriptor() protoreflect.EnumDescriptor {
+	return file_request_reply_proto_enumTypes[0].Descriptor()
+}
+
+func (FromFunction_PersistedValueMutation_MutationType) Type() protoreflect.EnumType {
+	return &file_request_reply_proto_enumTypes[0]
+}
+
+func (x FromFunction_PersistedValueMutation_MutationType) Number() protoreflect.EnumNumber {
+	return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueMutation_MutationType.Descriptor instead.
+func (FromFunction_PersistedValueMutation_MutationType) EnumDescriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 0, 0}
+}
+
+type FromFunction_ExpirationSpec_ExpireMode int32
+
+const (
+	FromFunction_ExpirationSpec_NONE         FromFunction_ExpirationSpec_ExpireMode = 0
+	FromFunction_ExpirationSpec_AFTER_WRITE  FromFunction_ExpirationSpec_ExpireMode = 1
+	FromFunction_ExpirationSpec_AFTER_INVOKE FromFunction_ExpirationSpec_ExpireMode = 2
+)
+
+// Enum value maps for FromFunction_ExpirationSpec_ExpireMode.
+var (
+	FromFunction_ExpirationSpec_ExpireMode_name = map[int32]string{
+		0: "NONE",
+		1: "AFTER_WRITE",
+		2: "AFTER_INVOKE",
+	}
+	FromFunction_ExpirationSpec_ExpireMode_value = map[string]int32{
+		"NONE":         0,
+		"AFTER_WRITE":  1,
+		"AFTER_INVOKE": 2,
+	}
+)
+
+func (x FromFunction_ExpirationSpec_ExpireMode) Enum() *FromFunction_ExpirationSpec_ExpireMode {
+	p := new(FromFunction_ExpirationSpec_ExpireMode)
+	*p = x
+	return p
+}
+
+func (x FromFunction_ExpirationSpec_ExpireMode) String() string {
+	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FromFunction_ExpirationSpec_ExpireMode) Descriptor() protoreflect.EnumDescriptor {
+	return file_request_reply_proto_enumTypes[1].Descriptor()
+}
+
+func (FromFunction_ExpirationSpec_ExpireMode) Type() protoreflect.EnumType {
+	return &file_request_reply_proto_enumTypes[1]
+}
+
+func (x FromFunction_ExpirationSpec_ExpireMode) Number() protoreflect.EnumNumber {
+	return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FromFunction_ExpirationSpec_ExpireMode.Descriptor instead.
+func (FromFunction_ExpirationSpec_ExpireMode) EnumDescriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 5, 0}
+}
+
+// An Address is the unique identity of an individual StatefulFunction, containing
+// a function's type and an unique identifier within the type. The function's
+// type denotes the "class" of function to invoke, while the unique identifier addresses the
+// invocation to a specific function instance.
+type Address struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
+	Type      string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"`
+	Id        string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
+}
+
+func (x *Address) Reset() {
+	*x = Address{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Address) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Address) ProtoMessage() {}
+
+func (x *Address) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Address.ProtoReflect.Descriptor instead.
+func (*Address) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Address) GetNamespace() string {
+	if x != nil {
+		return x.Namespace
+	}
+	return ""
+}
+
+func (x *Address) GetType() string {
+	if x != nil {
+		return x.Type
+	}
+	return ""
+}
+
+func (x *Address) GetId() string {
+	if x != nil {
+		return x.Id
+	}
+	return ""
+}
+
+type TypedValue struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Typename string `protobuf:"bytes,1,opt,name=typename,proto3" json:"typename,omitempty"`
+	// has_value is set to differentiate a zero length value bytes explicitly set,
+	// or a non existing value.
+	HasValue bool   `protobuf:"varint,2,opt,name=has_value,json=hasValue,proto3" json:"has_value,omitempty"`
+	Value    []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *TypedValue) Reset() {
+	*x = TypedValue{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *TypedValue) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TypedValue) ProtoMessage() {}
+
+func (x *TypedValue) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use TypedValue.ProtoReflect.Descriptor instead.
+func (*TypedValue) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *TypedValue) GetTypename() string {
+	if x != nil {
+		return x.Typename
+	}
+	return ""
+}
+
+func (x *TypedValue) GetHasValue() bool {
+	if x != nil {
+		return x.HasValue
+	}
+	return false
+}
+
+func (x *TypedValue) GetValue() []byte {
+	if x != nil {
+		return x.Value
+	}
+	return nil
+}
+
+// The following section contains all the message types that are sent
+// from Flink to a remote function.
+type ToFunction struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// Types that are assignable to Request:
+	//	*ToFunction_Invocation_
+	Request isToFunction_Request `protobuf_oneof:"request"`
+}
+
+func (x *ToFunction) Reset() {
+	*x = ToFunction{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[2]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ToFunction) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction) ProtoMessage() {}
+
+func (x *ToFunction) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[2]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction.ProtoReflect.Descriptor instead.
+func (*ToFunction) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{2}
+}
+
+func (m *ToFunction) GetRequest() isToFunction_Request {
+	if m != nil {
+		return m.Request
+	}
+	return nil
+}
+
+func (x *ToFunction) GetInvocation() *ToFunction_InvocationBatchRequest {
+	if x, ok := x.GetRequest().(*ToFunction_Invocation_); ok {
+		return x.Invocation
+	}
+	return nil
+}
+
+type isToFunction_Request interface {
+	isToFunction_Request()
+}
+
+type ToFunction_Invocation_ struct {
+	Invocation *ToFunction_InvocationBatchRequest `protobuf:"bytes,100,opt,name=invocation,proto3,oneof"`
+}
+
+func (*ToFunction_Invocation_) isToFunction_Request() {}
+
+// The following section contains messages sent from a remote function back to Flink.
+type FromFunction struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// Response sent from the function, as a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.
+	// It can be one of the following types:
+	//   - io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+	//   - io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+	//
+	// Types that are assignable to Response:
+	//	*FromFunction_InvocationResult
+	//	*FromFunction_IncompleteInvocationContext_
+	Response isFromFunction_Response `protobuf_oneof:"response"`
+}
+
+func (x *FromFunction) Reset() {
+	*x = FromFunction{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[3]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction) ProtoMessage() {}
+
+func (x *FromFunction) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[3]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction.ProtoReflect.Descriptor instead.
+func (*FromFunction) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3}
+}
+
+func (m *FromFunction) GetResponse() isFromFunction_Response {
+	if m != nil {
+		return m.Response
+	}
+	return nil
+}
+
+func (x *FromFunction) GetInvocationResult() *FromFunction_InvocationResponse {
+	if x, ok := x.GetResponse().(*FromFunction_InvocationResult); ok {
+		return x.InvocationResult
+	}
+	return nil
+}
+
+func (x *FromFunction) GetIncompleteInvocationContext() *FromFunction_IncompleteInvocationContext {
+	if x, ok := x.GetResponse().(*FromFunction_IncompleteInvocationContext_); ok {
+		return x.IncompleteInvocationContext
+	}
+	return nil
+}
+
+type isFromFunction_Response interface {
+	isFromFunction_Response()
+}
+
+type FromFunction_InvocationResult struct {
+	InvocationResult *FromFunction_InvocationResponse `protobuf:"bytes,100,opt,name=invocation_result,json=invocationResult,proto3,oneof"`
+}
+
+type FromFunction_IncompleteInvocationContext_ struct {
+	IncompleteInvocationContext *FromFunction_IncompleteInvocationContext `protobuf:"bytes,101,opt,name=incomplete_invocation_context,json=incompleteInvocationContext,proto3,oneof"`
+}
+
+func (*FromFunction_InvocationResult) isFromFunction_Response() {}
+
+func (*FromFunction_IncompleteInvocationContext_) isFromFunction_Response() {}
+
+// PersistedValue represents a PersistedValue's value that is managed by Flink on behalf of a remote function.
+type ToFunction_PersistedValue struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// The unique name of the persisted state.
+	StateName string `protobuf:"bytes,1,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+	// The serialized state value
+	StateValue *TypedValue `protobuf:"bytes,2,opt,name=state_value,json=stateValue,proto3" json:"state_value,omitempty"`
+}
+
+func (x *ToFunction_PersistedValue) Reset() {
+	*x = ToFunction_PersistedValue{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[4]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ToFunction_PersistedValue) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_PersistedValue) ProtoMessage() {}
+
+func (x *ToFunction_PersistedValue) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[4]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_PersistedValue.ProtoReflect.Descriptor instead.
+func (*ToFunction_PersistedValue) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{2, 0}
+}
+
+func (x *ToFunction_PersistedValue) GetStateName() string {
+	if x != nil {
+		return x.StateName
+	}
+	return ""
+}
+
+func (x *ToFunction_PersistedValue) GetStateValue() *TypedValue {
+	if x != nil {
+		return x.StateValue
+	}
+	return nil
+}
+
+// Invocation represents a remote function call, it associated with an (optional) return address,
+// and an argument.
+type ToFunction_Invocation struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// The address of the function that requested the invocation (possibly absent)
+	Caller *Address `protobuf:"bytes,1,opt,name=caller,proto3" json:"caller,omitempty"`
+	// The invocation argument (aka the message sent to the target function)
+	Argument *TypedValue `protobuf:"bytes,2,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *ToFunction_Invocation) Reset() {
+	*x = ToFunction_Invocation{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[5]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ToFunction_Invocation) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_Invocation) ProtoMessage() {}
+
+func (x *ToFunction_Invocation) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[5]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_Invocation.ProtoReflect.Descriptor instead.
+func (*ToFunction_Invocation) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{2, 1}
+}
+
+func (x *ToFunction_Invocation) GetCaller() *Address {
+	if x != nil {
+		return x.Caller
+	}
+	return nil
+}
+
+func (x *ToFunction_Invocation) GetArgument() *TypedValue {
+	if x != nil {
+		return x.Argument
+	}
+	return nil
+}
+
+// InvocationBatchRequest represents a request to invoke a remote function. It is always associated with a target
+// address (the function to invoke), and a list of values for registered state.
+type ToFunction_InvocationBatchRequest struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// The address of the function to invoke
+	Target *Address `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
+	// A list of PersistedValues that were registered as a persisted state.
+	State []*ToFunction_PersistedValue `protobuf:"bytes,2,rep,name=state,proto3" json:"state,omitempty"`
+	// A non empty (at least one) list of invocations
+	Invocations []*ToFunction_Invocation `protobuf:"bytes,3,rep,name=invocations,proto3" json:"invocations,omitempty"`
+}
+
+func (x *ToFunction_InvocationBatchRequest) Reset() {
+	*x = ToFunction_InvocationBatchRequest{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[6]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ToFunction_InvocationBatchRequest) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_InvocationBatchRequest) ProtoMessage() {}
+
+func (x *ToFunction_InvocationBatchRequest) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[6]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_InvocationBatchRequest.ProtoReflect.Descriptor instead.
+func (*ToFunction_InvocationBatchRequest) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{2, 2}
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetTarget() *Address {
+	if x != nil {
+		return x.Target
+	}
+	return nil
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetState() []*ToFunction_PersistedValue {
+	if x != nil {
+		return x.State
+	}
+	return nil
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetInvocations() []*ToFunction_Invocation {
+	if x != nil {
+		return x.Invocations
+	}
+	return nil
+}
+
+// MutatePersistedValueCommand represents a command sent from a remote function to Flink,
+// requesting a change to a persisted value.
+type FromFunction_PersistedValueMutation struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	MutationType FromFunction_PersistedValueMutation_MutationType `protobuf:"varint,1,opt,name=mutation_type,json=mutationType,proto3,enum=io.statefun.sdk.reqreply.FromFunction_PersistedValueMutation_MutationType" json:"mutation_type,omitempty"`
+	StateName    string                                           `protobuf:"bytes,2,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+	StateValue   *TypedValue                                      `protobuf:"bytes,3,opt,name=state_value,json=stateValue,proto3" json:"state_value,omitempty"`
+}
+
+func (x *FromFunction_PersistedValueMutation) Reset() {
+	*x = FromFunction_PersistedValueMutation{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[7]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_PersistedValueMutation) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_PersistedValueMutation) ProtoMessage() {}
+
+func (x *FromFunction_PersistedValueMutation) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[7]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueMutation.ProtoReflect.Descriptor instead.
+func (*FromFunction_PersistedValueMutation) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 0}
+}
+
+func (x *FromFunction_PersistedValueMutation) GetMutationType() FromFunction_PersistedValueMutation_MutationType {
+	if x != nil {
+		return x.MutationType
+	}
+	return FromFunction_PersistedValueMutation_DELETE
+}
+
+func (x *FromFunction_PersistedValueMutation) GetStateName() string {
+	if x != nil {
+		return x.StateName
+	}
+	return ""
+}
+
+func (x *FromFunction_PersistedValueMutation) GetStateValue() *TypedValue {
+	if x != nil {
+		return x.StateValue
+	}
+	return nil
+}
+
+// Invocation represents a remote function call, it associated with a (mandatory) target address,
+// and an argument.
+type FromFunction_Invocation struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// The target function to invoke
+	Target *Address `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
+	// The invocation argument (aka the message sent to the target function)
+	Argument *TypedValue `protobuf:"bytes,2,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_Invocation) Reset() {
+	*x = FromFunction_Invocation{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[8]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_Invocation) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_Invocation) ProtoMessage() {}
+
+func (x *FromFunction_Invocation) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[8]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_Invocation.ProtoReflect.Descriptor instead.
+func (*FromFunction_Invocation) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 1}
+}
+
+func (x *FromFunction_Invocation) GetTarget() *Address {
+	if x != nil {
+		return x.Target
+	}
+	return nil
+}
+
+func (x *FromFunction_Invocation) GetArgument() *TypedValue {
+	if x != nil {
+		return x.Argument
+	}
+	return nil
+}
+
+// DelayedInvocation represents a delayed remote function call with a target address, an argument
+// and a delay in milliseconds, after which this message to be sent.
+type FromFunction_DelayedInvocation struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// a boolean value (default false) that indicates rather this is a regular delayed message, or (true) a message
+	// cancellation request.
+	// in case of a regular delayed message all other fields are expected to be preset, otherwise only the
+	// cancellation_token is expected
+	IsCancellationRequest bool `protobuf:"varint,10,opt,name=is_cancellation_request,json=isCancellationRequest,proto3" json:"is_cancellation_request,omitempty"`
+	// an optional cancellation token that can be used to request the "unsending" of a delayed message.
+	CancellationToken string `protobuf:"bytes,11,opt,name=cancellation_token,json=cancellationToken,proto3" json:"cancellation_token,omitempty"`
+	// the amount of milliseconds to wait before sending this message
+	DelayInMs int64 `protobuf:"varint,1,opt,name=delay_in_ms,json=delayInMs,proto3" json:"delay_in_ms,omitempty"`
+	// the target address to send this message to
+	Target *Address `protobuf:"bytes,2,opt,name=target,proto3" json:"target,omitempty"`
+	// the invocation argument
+	Argument *TypedValue `protobuf:"bytes,3,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_DelayedInvocation) Reset() {
+	*x = FromFunction_DelayedInvocation{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[9]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_DelayedInvocation) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_DelayedInvocation) ProtoMessage() {}
+
+func (x *FromFunction_DelayedInvocation) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[9]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_DelayedInvocation.ProtoReflect.Descriptor instead.
+func (*FromFunction_DelayedInvocation) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 2}
+}
+
+func (x *FromFunction_DelayedInvocation) GetIsCancellationRequest() bool {
+	if x != nil {
+		return x.IsCancellationRequest
+	}
+	return false
+}
+
+func (x *FromFunction_DelayedInvocation) GetCancellationToken() string {
+	if x != nil {
+		return x.CancellationToken
+	}
+	return ""
+}
+
+func (x *FromFunction_DelayedInvocation) GetDelayInMs() int64 {
+	if x != nil {
+		return x.DelayInMs
+	}
+	return 0
+}
+
+func (x *FromFunction_DelayedInvocation) GetTarget() *Address {
+	if x != nil {
+		return x.Target
+	}
+	return nil
+}
+
+func (x *FromFunction_DelayedInvocation) GetArgument() *TypedValue {
+	if x != nil {
+		return x.Argument
+	}
+	return nil
+}
+
+// EgressMessage an argument to forward to an egress.
+// An egress is identified by a namespace and type (see EgressIdentifier SDK class).
+// The argument is an io.statefun.sdk.reqreply.TypedValue.
+type FromFunction_EgressMessage struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// The target egress namespace
+	EgressNamespace string `protobuf:"bytes,1,opt,name=egress_namespace,json=egressNamespace,proto3" json:"egress_namespace,omitempty"`
+	// The target egress type
+	EgressType string `protobuf:"bytes,2,opt,name=egress_type,json=egressType,proto3" json:"egress_type,omitempty"`
+	// egress argument
+	Argument *TypedValue `protobuf:"bytes,3,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_EgressMessage) Reset() {
+	*x = FromFunction_EgressMessage{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[10]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_EgressMessage) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_EgressMessage) ProtoMessage() {}
+
+func (x *FromFunction_EgressMessage) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[10]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_EgressMessage.ProtoReflect.Descriptor instead.
+func (*FromFunction_EgressMessage) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 3}
+}
+
+func (x *FromFunction_EgressMessage) GetEgressNamespace() string {
+	if x != nil {
+		return x.EgressNamespace
+	}
+	return ""
+}
+
+func (x *FromFunction_EgressMessage) GetEgressType() string {
+	if x != nil {
+		return x.EgressType
+	}
+	return ""
+}
+
+func (x *FromFunction_EgressMessage) GetArgument() *TypedValue {
+	if x != nil {
+		return x.Argument
+	}
+	return nil
+}
+
+// InvocationResponse represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+// it contains a list of state mutation to preform as a result of computing this batch, and a list of outgoing messages.
+type FromFunction_InvocationResponse struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	StateMutations     []*FromFunction_PersistedValueMutation `protobuf:"bytes,1,rep,name=state_mutations,json=stateMutations,proto3" json:"state_mutations,omitempty"`
+	OutgoingMessages   []*FromFunction_Invocation             `protobuf:"bytes,2,rep,name=outgoing_messages,json=outgoingMessages,proto3" json:"outgoing_messages,omitempty"`
+	DelayedInvocations []*FromFunction_DelayedInvocation      `protobuf:"bytes,3,rep,name=delayed_invocations,json=delayedInvocations,proto3" json:"delayed_invocations,omitempty"`
+	OutgoingEgresses   []*FromFunction_EgressMessage          `protobuf:"bytes,4,rep,name=outgoing_egresses,json=outgoingEgresses,proto3" json:"outgoing_egresses,omitempty"`
+}
+
+func (x *FromFunction_InvocationResponse) Reset() {
+	*x = FromFunction_InvocationResponse{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[11]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_InvocationResponse) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_InvocationResponse) ProtoMessage() {}
+
+func (x *FromFunction_InvocationResponse) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[11]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_InvocationResponse.ProtoReflect.Descriptor instead.
+func (*FromFunction_InvocationResponse) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 4}
+}
+
+func (x *FromFunction_InvocationResponse) GetStateMutations() []*FromFunction_PersistedValueMutation {
+	if x != nil {
+		return x.StateMutations
+	}
+	return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetOutgoingMessages() []*FromFunction_Invocation {
+	if x != nil {
+		return x.OutgoingMessages
+	}
+	return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetDelayedInvocations() []*FromFunction_DelayedInvocation {
+	if x != nil {
+		return x.DelayedInvocations
+	}
+	return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetOutgoingEgresses() []*FromFunction_EgressMessage {
+	if x != nil {
+		return x.OutgoingEgresses
+	}
+	return nil
+}
+
+// ExpirationSpec represents TTL (Time-To-Live) configuration for persisted states.
+type FromFunction_ExpirationSpec struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Mode              FromFunction_ExpirationSpec_ExpireMode `protobuf:"varint,1,opt,name=mode,proto3,enum=io.statefun.sdk.reqreply.FromFunction_ExpirationSpec_ExpireMode" json:"mode,omitempty"`
+	ExpireAfterMillis int64                                  `protobuf:"varint,2,opt,name=expire_after_millis,json=expireAfterMillis,proto3" json:"expire_after_millis,omitempty"`
+}
+
+func (x *FromFunction_ExpirationSpec) Reset() {
+	*x = FromFunction_ExpirationSpec{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[12]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_ExpirationSpec) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_ExpirationSpec) ProtoMessage() {}
+
+func (x *FromFunction_ExpirationSpec) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[12]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_ExpirationSpec.ProtoReflect.Descriptor instead.
+func (*FromFunction_ExpirationSpec) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 5}
+}
+
+func (x *FromFunction_ExpirationSpec) GetMode() FromFunction_ExpirationSpec_ExpireMode {
+	if x != nil {
+		return x.Mode
+	}
+	return FromFunction_ExpirationSpec_NONE
+}
+
+func (x *FromFunction_ExpirationSpec) GetExpireAfterMillis() int64 {
+	if x != nil {
+		return x.ExpireAfterMillis
+	}
+	return 0
+}
+
+// PersistedValueSpec represents specifications of a function's persisted value state.
+type FromFunction_PersistedValueSpec struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	StateName      string                       `protobuf:"bytes,1,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+	ExpirationSpec *FromFunction_ExpirationSpec `protobuf:"bytes,2,opt,name=expiration_spec,json=expirationSpec,proto3" json:"expiration_spec,omitempty"`
+	TypeTypename   string                       `protobuf:"bytes,3,opt,name=type_typename,json=typeTypename,proto3" json:"type_typename,omitempty"`
+}
+
+func (x *FromFunction_PersistedValueSpec) Reset() {
+	*x = FromFunction_PersistedValueSpec{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[13]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_PersistedValueSpec) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_PersistedValueSpec) ProtoMessage() {}
+
+func (x *FromFunction_PersistedValueSpec) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[13]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueSpec.ProtoReflect.Descriptor instead.
+func (*FromFunction_PersistedValueSpec) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 6}
+}
+
+func (x *FromFunction_PersistedValueSpec) GetStateName() string {
+	if x != nil {
+		return x.StateName
+	}
+	return ""
+}
+
+func (x *FromFunction_PersistedValueSpec) GetExpirationSpec() *FromFunction_ExpirationSpec {
+	if x != nil {
+		return x.ExpirationSpec
+	}
+	return nil
+}
+
+func (x *FromFunction_PersistedValueSpec) GetTypeTypename() string {
+	if x != nil {
+		return x.TypeTypename
+	}
+	return ""
+}
+
+// IncompleteInvocationContext represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest,
+// which should be used as the response if the InvocationBatchRequest provided incomplete information about the
+// invocation, e.g. insufficient state values were provided.
+type FromFunction_IncompleteInvocationContext struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	MissingValues []*FromFunction_PersistedValueSpec `protobuf:"bytes,1,rep,name=missing_values,json=missingValues,proto3" json:"missing_values,omitempty"`
+}
+
+func (x *FromFunction_IncompleteInvocationContext) Reset() {
+	*x = FromFunction_IncompleteInvocationContext{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_request_reply_proto_msgTypes[14]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FromFunction_IncompleteInvocationContext) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_IncompleteInvocationContext) ProtoMessage() {}
+
+func (x *FromFunction_IncompleteInvocationContext) ProtoReflect() protoreflect.Message {
+	mi := &file_request_reply_proto_msgTypes[14]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_IncompleteInvocationContext.ProtoReflect.Descriptor instead.
+func (*FromFunction_IncompleteInvocationContext) Descriptor() ([]byte, []int) {
+	return file_request_reply_proto_rawDescGZIP(), []int{3, 7}
+}
+
+func (x *FromFunction_IncompleteInvocationContext) GetMissingValues() []*FromFunction_PersistedValueSpec {
+	if x != nil {
+		return x.MissingValues
+	}
+	return nil
+}
+
+var File_request_reply_proto protoreflect.FileDescriptor
+
+var file_request_reply_proto_rawDesc = []byte{
+	0x0a, 0x13, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2d, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66,
+	0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x22,
+	0x4b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61,
+	0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e,
+	0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
+	0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02,
+	0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x5b, 0x0a, 0x0a,
+	0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x79,
+	0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x79,
+	0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x68, 0x61, 0x73, 0x5f, 0x76, 0x61,
+	0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x68, 0x61, 0x73, 0x56, 0x61,
+	0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01,
+	0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xee, 0x04, 0x0a, 0x0a, 0x54, 0x6f,
+	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5d, 0x0a, 0x0a, 0x69, 0x6e, 0x76, 0x6f,
+	0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x69,
+	0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
+	0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69,
+	0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74,
+	0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x69, 0x6e, 0x76,
+	0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x76, 0x0a, 0x0e, 0x50, 0x65, 0x72, 0x73, 0x69,
+	0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61,
+	0x74, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73,
+	0x74, 0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74,
+	0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e,
+	0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e,
+	0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61,
+	0x6c, 0x75, 0x65, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a,
+	0x89, 0x01, 0x0a, 0x0a, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39,
+	0x0a, 0x06, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21,
+	0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b,
+	0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
+	0x73, 0x52, 0x06, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x40, 0x0a, 0x08, 0x61, 0x72, 0x67,
+	0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x69, 0x6f,
+	0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65,
+	0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75,
+	0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0xf1, 0x01, 0x0a, 0x16,
+	0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52,
+	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+	0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+	0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65,
+	0x74, 0x12, 0x49, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
+	0x32, 0x33, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46, 0x75,
+	0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64,
+	0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x51, 0x0a, 0x0b,
+	0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28,
+	0x0b, 0x32, 0x2f, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e,
+	0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46,
+	0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69,
+	0x6f, 0x6e, 0x52, 0x0b, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42,
+	0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xac, 0x10, 0x0a, 0x0c, 0x46,
+	0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x68, 0x0a, 0x11, 0x69,
+	0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
+	0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+	0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+	0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49,
+	0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+	0x65, 0x48, 0x00, 0x52, 0x10, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+	0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x88, 0x01, 0x0a, 0x1d, 0x69, 0x6e, 0x63, 0x6f, 0x6d, 0x70,
+	0x6c, 0x65, 0x74, 0x65, 0x5f, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+	0x63, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x65, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e,
+	0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e,
+	0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e,
+	0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65,
+	0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78,
+	0x74, 0x48, 0x00, 0x52, 0x1b, 0x69, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x49,
+	0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74,
+	0x1a, 0x97, 0x02, 0x0a, 0x16, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64, 0x56, 0x61,
+	0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x6f, 0x0a, 0x0d, 0x6d,
+	0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x0e, 0x32, 0x4a, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e,
+	0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72,
+	0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69,
+	0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f,
+	0x6e, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0c,
+	0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1d, 0x0a, 0x0a,
+	0x73, 0x74, 0x61, 0x74, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x09, 0x73, 0x74, 0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x73,
+	0x74, 0x61, 0x74, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+	0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+	0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x56, 0x61, 0x6c,
+	0x75, 0x65, 0x22, 0x26, 0x0a, 0x0c, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79,
+	0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x00, 0x12, 0x0a,
+	0x0a, 0x06, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x59, 0x10, 0x01, 0x1a, 0x89, 0x01, 0x0a, 0x0a, 0x49,
+	0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72,
+	0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73,
+	0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72,
+	0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61,
+	0x72, 0x67, 0x65, 0x74, 0x12, 0x40, 0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+	0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+	0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+	0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72,
+	0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x97, 0x02, 0x0a, 0x11, 0x44, 0x65, 0x6c, 0x61, 0x79,
+	0x65, 0x64, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x36, 0x0a, 0x17,
+	0x69, 0x73, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+	0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x69,
+	0x73, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71,
+	0x75, 0x65, 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61,
+	0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x11, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f,
+	0x6b, 0x65, 0x6e, 0x12, 0x1e, 0x0a, 0x0b, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x69, 0x6e, 0x5f,
+	0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x49,
+	0x6e, 0x4d, 0x73, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+	0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41,
+	0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x40,
+	0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+	0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+	0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+	0x1a, 0x9d, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61,
+	0x67, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x6e, 0x61, 0x6d,
+	0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x65, 0x67,
+	0x72, 0x65, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1f, 0x0a,
+	0x0b, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x0a, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x40,
+	0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+	0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+	0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+	0x1a, 0xaa, 0x03, 0x0a, 0x12, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x74, 0x65,
+	0x5f, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
+	0x32, 0x3d, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d,
+	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74,
+	0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+	0x0e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
+	0x5e, 0x0a, 0x11, 0x6f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x6d, 0x65, 0x73, 0x73,
+	0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x69, 0x6f, 0x2e,
+	0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71,
+	0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69,
+	0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x10, 0x6f,
+	0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12,
+	0x69, 0x0a, 0x13, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x76, 0x6f, 0x63,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x38, 0x2e, 0x69,
+	0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
+	0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63,
+	0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x49, 0x6e, 0x76, 0x6f,
+	0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x49,
+	0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x61, 0x0a, 0x11, 0x6f, 0x75,
+	0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18,
+	0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65,
+	0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79,
+	0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x67,
+	0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x10, 0x6f, 0x75, 0x74,
+	0x67, 0x6f, 0x69, 0x6e, 0x67, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x1a, 0xd1, 0x01,
+	0x0a, 0x0e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63,
+	0x12, 0x54, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x40,
+	0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b,
+	0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75,
+	0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f,
+	0x6e, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x4d, 0x6f, 0x64, 0x65,
+	0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65,
+	0x5f, 0x61, 0x66, 0x74, 0x65, 0x72, 0x5f, 0x6d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x03, 0x52, 0x11, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x41, 0x66, 0x74, 0x65, 0x72,
+	0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x39, 0x0a, 0x0a, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65,
+	0x4d, 0x6f, 0x64, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0f,
+	0x0a, 0x0b, 0x41, 0x46, 0x54, 0x45, 0x52, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x01, 0x12,
+	0x10, 0x0a, 0x0c, 0x41, 0x46, 0x54, 0x45, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x4f, 0x4b, 0x45, 0x10,
+	0x02, 0x1a, 0xb8, 0x01, 0x0a, 0x12, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64, 0x56,
+	0x61, 0x6c, 0x75, 0x65, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74,
+	0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74,
+	0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x5e, 0x0a, 0x0f, 0x65, 0x78, 0x70, 0x69, 0x72,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+	0x32, 0x35, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+	0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d,
+	0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74,
+	0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0e, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74,
+	0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x12, 0x23, 0x0a, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x5f,
+	0x74, 0x79, 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c,
+	0x74, 0x79, 0x70, 0x65, 0x54, 0x79, 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x1a, 0x7f, 0x0a, 0x1b,
+	0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61,
+	0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x60, 0x0a, 0x0e, 0x6d,
+	0x69, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20,
+	0x03, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+	0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46,
+	0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73,
+	0x69, 0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0d,
+	0x6d, 0x69, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x42, 0x0a, 0x0a,
+	0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x0a, 0x30, 0x6f, 0x72, 0x67,
+	0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74,
+	0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65,
+	0x70, 0x6c, 0x79, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01, 0x5a,
+	0x0a, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x33,
+}
+
+var (
+	file_request_reply_proto_rawDescOnce sync.Once
+	file_request_reply_proto_rawDescData = file_request_reply_proto_rawDesc
+)
+
+func file_request_reply_proto_rawDescGZIP() []byte {
+	file_request_reply_proto_rawDescOnce.Do(func() {
+		file_request_reply_proto_rawDescData = protoimpl.X.CompressGZIP(file_request_reply_proto_rawDescData)
+	})
+	return file_request_reply_proto_rawDescData
+}
+
+var file_request_reply_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
+var file_request_reply_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
+var file_request_reply_proto_goTypes = []interface{}{
+	(FromFunction_PersistedValueMutation_MutationType)(0), // 0: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.MutationType
+	(FromFunction_ExpirationSpec_ExpireMode)(0),           // 1: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.ExpireMode
+	(*Address)(nil),                                  // 2: io.statefun.sdk.reqreply.Address
+	(*TypedValue)(nil),                               // 3: io.statefun.sdk.reqreply.TypedValue
+	(*ToFunction)(nil),                               // 4: io.statefun.sdk.reqreply.ToFunction
+	(*FromFunction)(nil),                             // 5: io.statefun.sdk.reqreply.FromFunction
+	(*ToFunction_PersistedValue)(nil),                // 6: io.statefun.sdk.reqreply.ToFunction.PersistedValue
+	(*ToFunction_Invocation)(nil),                    // 7: io.statefun.sdk.reqreply.ToFunction.Invocation
+	(*ToFunction_InvocationBatchRequest)(nil),        // 8: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+	(*FromFunction_PersistedValueMutation)(nil),      // 9: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation
+	(*FromFunction_Invocation)(nil),                  // 10: io.statefun.sdk.reqreply.FromFunction.Invocation
+	(*FromFunction_DelayedInvocation)(nil),           // 11: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation
+	(*FromFunction_EgressMessage)(nil),               // 12: io.statefun.sdk.reqreply.FromFunction.EgressMessage
+	(*FromFunction_InvocationResponse)(nil),          // 13: io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+	(*FromFunction_ExpirationSpec)(nil),              // 14: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec
+	(*FromFunction_PersistedValueSpec)(nil),          // 15: io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec
+	(*FromFunction_IncompleteInvocationContext)(nil), // 16: io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+}
+var file_request_reply_proto_depIdxs = []int32{
+	8,  // 0: io.statefun.sdk.reqreply.ToFunction.invocation:type_name -> io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+	13, // 1: io.statefun.sdk.reqreply.FromFunction.invocation_result:type_name -> io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+	16, // 2: io.statefun.sdk.reqreply.FromFunction.incomplete_invocation_context:type_name -> io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+	3,  // 3: io.statefun.sdk.reqreply.ToFunction.PersistedValue.state_value:type_name -> io.statefun.sdk.reqreply.TypedValue
+	2,  // 4: io.statefun.sdk.reqreply.ToFunction.Invocation.caller:type_name -> io.statefun.sdk.reqreply.Address
+	3,  // 5: io.statefun.sdk.reqreply.ToFunction.Invocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+	2,  // 6: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.target:type_name -> io.statefun.sdk.reqreply.Address
+	6,  // 7: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.state:type_name -> io.statefun.sdk.reqreply.ToFunction.PersistedValue
+	7,  // 8: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.invocations:type_name -> io.statefun.sdk.reqreply.ToFunction.Invocation
+	0,  // 9: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.mutation_type:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.MutationType
+	3,  // 10: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.state_value:type_name -> io.statefun.sdk.reqreply.TypedValue
+	2,  // 11: io.statefun.sdk.reqreply.FromFunction.Invocation.target:type_name -> io.statefun.sdk.reqreply.Address
+	3,  // 12: io.statefun.sdk.reqreply.FromFunction.Invocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+	2,  // 13: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation.target:type_name -> io.statefun.sdk.reqreply.Address
+	3,  // 14: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+	3,  // 15: io.statefun.sdk.reqreply.FromFunction.EgressMessage.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+	9,  // 16: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.state_mutations:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation
+	10, // 17: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.outgoing_messages:type_name -> io.statefun.sdk.reqreply.FromFunction.Invocation
+	11, // 18: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.delayed_invocations:type_name -> io.statefun.sdk.reqreply.FromFunction.DelayedInvocation
+	12, // 19: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.outgoing_egresses:type_name -> io.statefun.sdk.reqreply.FromFunction.EgressMessage
+	1,  // 20: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.mode:type_name -> io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.ExpireMode
+	14, // 21: io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec.expiration_spec:type_name -> io.statefun.sdk.reqreply.FromFunction.ExpirationSpec
+	15, // 22: io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext.missing_values:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec
+	23, // [23:23] is the sub-list for method output_type
+	23, // [23:23] is the sub-list for method input_type
+	23, // [23:23] is the sub-list for extension type_name
+	23, // [23:23] is the sub-list for extension extendee
+	0,  // [0:23] is the sub-list for field type_name
+}
+
+func init() { file_request_reply_proto_init() }
+func file_request_reply_proto_init() {
+	if File_request_reply_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_request_reply_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Address); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*TypedValue); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ToFunction); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ToFunction_PersistedValue); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ToFunction_Invocation); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ToFunction_InvocationBatchRequest); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_PersistedValueMutation); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_Invocation); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_DelayedInvocation); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_EgressMessage); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_InvocationResponse); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_ExpirationSpec); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_PersistedValueSpec); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_request_reply_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FromFunction_IncompleteInvocationContext); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	file_request_reply_proto_msgTypes[2].OneofWrappers = []interface{}{
+		(*ToFunction_Invocation_)(nil),
+	}
+	file_request_reply_proto_msgTypes[3].OneofWrappers = []interface{}{
+		(*FromFunction_InvocationResult)(nil),
+		(*FromFunction_IncompleteInvocationContext_)(nil),
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_request_reply_proto_rawDesc,
+			NumEnums:      2,
+			NumMessages:   15,
+			NumExtensions: 0,
+			NumServices:   0,
+		},
+		GoTypes:           file_request_reply_proto_goTypes,
+		DependencyIndexes: file_request_reply_proto_depIdxs,
+		EnumInfos:         file_request_reply_proto_enumTypes,
+		MessageInfos:      file_request_reply_proto_msgTypes,
+	}.Build()
+	File_request_reply_proto = out.File
+	file_request_reply_proto_rawDesc = nil
+	file_request_reply_proto_goTypes = nil
+	file_request_reply_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go
new file mode 100644
index 0000000..5b54383
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go
@@ -0,0 +1,486 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.27.1
+// 	protoc        v3.17.3
+// source: types.proto
+
+package protocol
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// BooleanWrapper represents a StateFun primitive type of a boolean value. This is recognized as:
+// io.statefun.types/bool
+type BooleanWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value bool `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *BooleanWrapper) Reset() {
+	*x = BooleanWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *BooleanWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BooleanWrapper) ProtoMessage() {}
+
+func (x *BooleanWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use BooleanWrapper.ProtoReflect.Descriptor instead.
+func (*BooleanWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *BooleanWrapper) GetValue() bool {
+	if x != nil {
+		return x.Value
+	}
+	return false
+}
+
+// IntWrapper represents a StateFun primitive type of an signed 32 bit integer value. This is recognized as:
+// io.statefun.types/int
+type IntWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value int32 `protobuf:"fixed32,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *IntWrapper) Reset() {
+	*x = IntWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *IntWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IntWrapper) ProtoMessage() {}
+
+func (x *IntWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use IntWrapper.ProtoReflect.Descriptor instead.
+func (*IntWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *IntWrapper) GetValue() int32 {
+	if x != nil {
+		return x.Value
+	}
+	return 0
+}
+
+// FloatWrapper represents a StateFun primitive type of a signed float value. This is recognized as:
+// io.statefun.types/float
+type FloatWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value float32 `protobuf:"fixed32,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *FloatWrapper) Reset() {
+	*x = FloatWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[2]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *FloatWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FloatWrapper) ProtoMessage() {}
+
+func (x *FloatWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[2]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use FloatWrapper.ProtoReflect.Descriptor instead.
+func (*FloatWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *FloatWrapper) GetValue() float32 {
+	if x != nil {
+		return x.Value
+	}
+	return 0
+}
+
+// LongWrapper represents a StateFun primitive type of a signed 64 bit long value. This is recognized as:
+// io.statefun.types/long
+type LongWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value int64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *LongWrapper) Reset() {
+	*x = LongWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[3]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *LongWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*LongWrapper) ProtoMessage() {}
+
+func (x *LongWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[3]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use LongWrapper.ProtoReflect.Descriptor instead.
+func (*LongWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *LongWrapper) GetValue() int64 {
+	if x != nil {
+		return x.Value
+	}
+	return 0
+}
+
+// DoubleWrapper represents a StateFun primitive type of a double value. This is recognized as:
+// io.statefun.types/double
+type DoubleWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *DoubleWrapper) Reset() {
+	*x = DoubleWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[4]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *DoubleWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*DoubleWrapper) ProtoMessage() {}
+
+func (x *DoubleWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[4]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use DoubleWrapper.ProtoReflect.Descriptor instead.
+func (*DoubleWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *DoubleWrapper) GetValue() float64 {
+	if x != nil {
+		return x.Value
+	}
+	return 0
+}
+
+// StringWrapper represents a StateFun string. This is recognized as:
+// io.statefun.types/string
+type StringWrapper struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *StringWrapper) Reset() {
+	*x = StringWrapper{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_types_proto_msgTypes[5]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *StringWrapper) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*StringWrapper) ProtoMessage() {}
+
+func (x *StringWrapper) ProtoReflect() protoreflect.Message {
+	mi := &file_types_proto_msgTypes[5]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use StringWrapper.ProtoReflect.Descriptor instead.
+func (*StringWrapper) Descriptor() ([]byte, []int) {
+	return file_types_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *StringWrapper) GetValue() string {
+	if x != nil {
+		return x.Value
+	}
+	return ""
+}
+
+var File_types_proto protoreflect.FileDescriptor
+
+var file_types_proto_rawDesc = []byte{
+	0x0a, 0x0b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x69,
+	0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x74,
+	0x79, 0x70, 0x65, 0x73, 0x22, 0x26, 0x0a, 0x0e, 0x42, 0x6f, 0x6f, 0x6c, 0x65, 0x61, 0x6e, 0x57,
+	0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
+	0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x22, 0x0a, 0x0a,
+	0x49, 0x6e, 0x74, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61,
+	0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+	0x22, 0x24, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x61, 0x74, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72,
+	0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x02, 0x52,
+	0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x23, 0x0a, 0x0b, 0x4c, 0x6f, 0x6e, 0x67, 0x57, 0x72,
+	0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x10, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x44,
+	0x6f, 0x75, 0x62, 0x6c, 0x65, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05,
+	0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x76, 0x61, 0x6c,
+	0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x61, 0x70,
+	0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3d, 0x0a, 0x2d, 0x6f, 0x72, 0x67,
+	0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74,
+	0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73,
+	0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01, 0x5a, 0x0a, 0x2e, 0x3b,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_types_proto_rawDescOnce sync.Once
+	file_types_proto_rawDescData = file_types_proto_rawDesc
+)
+
+func file_types_proto_rawDescGZIP() []byte {
+	file_types_proto_rawDescOnce.Do(func() {
+		file_types_proto_rawDescData = protoimpl.X.CompressGZIP(file_types_proto_rawDescData)
+	})
+	return file_types_proto_rawDescData
+}
+
+var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_types_proto_goTypes = []interface{}{
+	(*BooleanWrapper)(nil), // 0: io.statefun.sdk.types.BooleanWrapper
+	(*IntWrapper)(nil),     // 1: io.statefun.sdk.types.IntWrapper
+	(*FloatWrapper)(nil),   // 2: io.statefun.sdk.types.FloatWrapper
+	(*LongWrapper)(nil),    // 3: io.statefun.sdk.types.LongWrapper
+	(*DoubleWrapper)(nil),  // 4: io.statefun.sdk.types.DoubleWrapper
+	(*StringWrapper)(nil),  // 5: io.statefun.sdk.types.StringWrapper
+}
+var file_types_proto_depIdxs = []int32{
+	0, // [0:0] is the sub-list for method output_type
+	0, // [0:0] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_types_proto_init() }
+func file_types_proto_init() {
+	if File_types_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_types_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*BooleanWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_types_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*IntWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_types_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*FloatWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_types_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*LongWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_types_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*DoubleWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_types_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*StringWrapper); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_types_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   6,
+			NumExtensions: 0,
+			NumServices:   0,
+		},
+		GoTypes:           file_types_proto_goTypes,
+		DependencyIndexes: file_types_proto_depIdxs,
+		MessageInfos:      file_types_proto_msgTypes,
+	}.Build()
+	File_types_proto = out.File
+	file_types_proto_rawDesc = nil
+	file_types_proto_goTypes = nil
+	file_types_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/message.go b/statefun-sdk-go/v3/pkg/statefun/message.go
new file mode 100644
index 0000000..dff5fdf
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/message.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+)
+
+type MessageBuilder struct {
+	Target    Address
+	Value     interface{}
+	ValueType SimpleType
+}
+
+func (m MessageBuilder) ToMessage() (Message, error) {
+	if m.Target == (Address{}) {
+		return Message{}, errors.New("a message must have a non-empty target")
+	}
+
+	if m.Value == nil {
+		return Message{}, errors.New("a message cannot have a nil value")
+	}
+
+	if m.ValueType == nil {
+		switch m.Value.(type) {
+		case int:
+			return Message{}, errors.New("ambiguous integer type; please specify int32 or int64")
+		case bool, *bool:
+			m.ValueType = BoolType
+		case int32, *int32:
+			m.ValueType = Int32Type
+		case int64, *int64:
+			m.ValueType = Int64Type
+		case float32, *float32:
+			m.ValueType = Float32Type
+		case float64, *float64:
+			m.ValueType = Float64Type
+		case string, *string:
+			m.ValueType = StringType
+		default:
+			return Message{}, errors.New("message contains non-primitive type, please supply a non-nil SimpleType")
+		}
+	}
+
+	buffer := bytes.Buffer{}
+	err := m.ValueType.Serialize(&buffer, m.Value)
+	if err != nil {
+		return Message{}, err
+	}
+
+	return Message{
+		target: &protocol.Address{
+			Namespace: m.Target.FunctionType.GetNamespace(),
+			Type:      m.Target.FunctionType.GetType(),
+			Id:        m.Target.Id,
+		},
+		typedValue: &protocol.TypedValue{
+			Typename: m.ValueType.GetTypeName().String(),
+			HasValue: true,
+			Value:    buffer.Bytes(),
+		},
+	}, nil
+}
+
+type Message struct {
+	target     *protocol.Address
+	typedValue *protocol.TypedValue
+}
+
+func (m *Message) IsBool() bool {
+	return m.Is(BoolType)
+}
+
+func (m *Message) AsBool() bool {
+	var receiver bool
+	if err := BoolType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+	return receiver
+}
+
+func (m *Message) IsInt32() bool {
+	return m.Is(Int32Type)
+}
+
+func (m *Message) AsInt32() int32 {
+	var receiver int32
+	if err := Int32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+	return receiver
+}
+
+func (m *Message) IsInt64() bool {
+	return m.Is(Int64Type)
+}
+
+func (m *Message) AsInt64() int64 {
+	var receiver int64
+	if err := Int64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+	return receiver
+}
+
+func (m *Message) IsFloat32() bool {
+	return m.Is(Float32Type)
+}
+
+func (m *Message) AsFloat32() float32 {
+	var receiver float32
+	if err := Float32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+	return receiver
+}
+
+func (m *Message) IsFloat64() bool {
+	return m.Is(Float64Type)
+}
+
+func (m *Message) AsFloat64() float64 {
+	var receiver float64
+	if err := Float64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+	return receiver
+}
+
+func (m *Message) IsString() bool {
+	return m.Is(StringType)
+}
+
+func (m *Message) AsString() string {
+	var receiver string
+	if err := StringType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize message: %w", err))
+	}
+
+	return receiver
+}
+
+func (m *Message) Is(t SimpleType) bool {
+	return t.GetTypeName().String() == m.typedValue.Typename
+}
+
+func (m *Message) As(t SimpleType, receiver interface{}) error {
+	return t.Deserialize(bytes.NewReader(m.typedValue.Value), receiver)
+}
+
+func (m *Message) ValueTypeName() TypeName {
+	return TypeNameFrom(m.typedValue.Typename)
+}
+
+func (m *Message) RawValue() []byte {
+	return m.typedValue.Value
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/message_test.go b/statefun-sdk-go/v3/pkg/statefun/message_test.go
new file mode 100644
index 0000000..2b18894
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/message_test.go
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestBasicIntMessage(t *testing.T) {
+	typename, err := ParseTypeName("foo/bar")
+	assert.NoError(t, err)
+
+	message, err := MessageBuilder{
+		Target: Address{
+			FunctionType: typename,
+			Id:           "a",
+		},
+		Value: int32(1),
+	}.ToMessage()
+
+	assert.NoError(t, err)
+	assert.True(t, message.IsInt32())
+
+	value := message.AsInt32()
+	assert.Equal(t, value, int32(1))
+}
+
+func TestMessageWithType(t *testing.T) {
+	typename, err := ParseTypeName("foo/bar")
+	assert.NoError(t, err)
+
+	message, err := MessageBuilder{
+		Target: Address{
+			FunctionType: typename,
+			Id:           "a",
+		},
+		Value:     float32(5.0),
+		ValueType: Float32Type,
+	}.ToMessage()
+
+	assert.NoError(t, err)
+	assert.True(t, message.IsFloat32())
+
+	value := message.AsFloat32()
+	assert.Equal(t, value, float32(5.0))
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/stateful_function.go b/statefun-sdk-go/v3/pkg/statefun/stateful_function.go
new file mode 100644
index 0000000..de95be3
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/stateful_function.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+// A StatefulFunction is a user-defined function that can be invoked with a given input.
+// This is the primitive building block for a Stateful Functions application.
+//
+// Concept
+//
+// Each individual StatefulFunction is an uniquely invokable "instance" of a registered
+// StatefulFunctionSpec. Each instance is identified by an Address, representing the
+// function's unique id (a string) within its type. From a user's perspective, it would seem as if
+// for each unique function id, there exists a stateful instance of the function that is always
+// available to be invoked within a Stateful Functions application.
+//
+// Invoking a StatefulFunction
+//
+// An individual StatefulFunction can be invoked with arbitrary input from any another
+// StatefulFunction (including itself), or routed from ingresses. To invoke a
+// StatefulFunction, the caller simply needs to know the Address of the target function.
+//
+// As a result of invoking a StatefulFunction, the function may continue to invoke other
+// functions, access persisted values, or send messages to egresses.
+//
+// Persistent State
+//
+// Each individual StatefulFunction may have persistent values written to storage that is
+// maintained by the system, providing consistent exactly-once and fault-tolerant guarantees. Please
+// see docs in ValueSpec and AddressScopedStorage for an overview of how to
+// register persistent values and access the storage.
+type StatefulFunction interface {
+
+	// Invoke is the method called for each message. The passed Context
+	// is canceled as soon as Invoke returns as a signal to
+	// any spawned go routines. The method may return
+	// an Error to signal the invocation failed and should
+	// be reattempted.
+	Invoke(ctx Context, message Message) error
+}
+
+// StatefulFunctionSpec for a Stateful Function, identifiable
+// by a unique TypeName.
+type StatefulFunctionSpec struct {
+	// The unique TypeName associated
+	// the StatefulFunction being defined.
+	FunctionType TypeName
+
+	// A slice of registered ValueSpec's that will be used
+	// by this function. A function may only access values
+	// that have been eagerly registered as part of its spec.
+	States []ValueSpec
+
+	// The physical StatefulFunction instance.
+	Function StatefulFunction
+}
+
+// The StatefulFunctionPointer type is an adapter to allow the use of
+// ordinary functions as StatefulFunction's. If f is a function
+// with the appropriate signature, StatefulFunctionPointer(f) is a
+// StatefulFunction that calls f.
+type StatefulFunctionPointer func(Context, Message) error
+
+func (s StatefulFunctionPointer) Invoke(ctx Context, message Message) error {
+	return s(ctx, message)
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/storage.go b/statefun-sdk-go/v3/pkg/statefun/storage.go
new file mode 100644
index 0000000..57e5364
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/storage.go
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"fmt"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"sync"
+)
+
+// An AddressScopedStorage is used for reading and writing persistent
+// values that are managed by the Stateful Functions runtime for
+// fault-tolerance and consistency.
+//
+// All access to the storage is scoped to the current function instance,
+// identified by the instance's Address. This means that within an
+// invocation, function instances may only access its own persisted
+// values through this storage.
+type AddressScopedStorage interface {
+
+	// Get returnss the values of the provided ValueSpec, scoped to the
+	// current invoked Address and stores the result in the value
+	// pointed to by receiver. The method will return false
+	// if there is no value for the spec in storage
+	// so callers can differentiate between missing and
+	// the types zero value.
+	Get(spec ValueSpec, receiver interface{}) (exists bool)
+
+	// Set updates the value for the provided ValueSpec, scoped
+	// to the current invoked Address.
+	Set(spec ValueSpec, value interface{})
+
+	// Remove deletes the prior value set for the the provided
+	// ValueSpec, scoped to the current invoked Address.
+	//
+	// After removing the value, calling Get for the same
+	// spec under the same Address will return false.
+	Remove(spec ValueSpec)
+}
+
+type storage struct {
+	mutex sync.RWMutex
+	cells map[string]*internal.Cell
+}
+
+type storageFactory interface {
+	getStorage() *storage
+
+	getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec
+}
+
+func newStorageFactory(
+	batch *protocol.ToFunction_InvocationBatchRequest,
+	specs map[string]*protocol.FromFunction_PersistedValueSpec,
+) storageFactory {
+	storage := &storage{
+		cells: make(map[string]*internal.Cell, len(specs)),
+	}
+
+	states := make(map[string]*protocol.FromFunction_PersistedValueSpec, len(specs))
+	for k, v := range specs {
+		states[k] = v
+	}
+
+	if batch.State != nil {
+		for _, state := range batch.State {
+			spec, exists := states[state.StateName]
+			if !exists {
+				continue
+			}
+
+			delete(states, state.StateName)
+
+			storage.cells[state.StateName] = internal.NewCell(state, spec.TypeTypename)
+		}
+	}
+
+	if len(states) > 0 {
+		var missing = make([]*protocol.FromFunction_PersistedValueSpec, 0, len(states))
+		for _, spec := range states {
+			missing = append(missing, spec)
+		}
+
+		return MissingSpecs(missing)
+	} else {
+		return storage
+	}
+}
+
+func (s *storage) getStorage() *storage {
+	return s
+}
+
+func (s *storage) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
+	return nil
+}
+
+func (s *storage) Get(spec ValueSpec, receiver interface{}) bool {
+	s.mutex.RLock()
+	defer s.mutex.RUnlock()
+
+	cell, ok := s.cells[spec.Name]
+	if !ok {
+		panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+	}
+
+	if !cell.HasValue() {
+		return false
+	}
+
+	cell.SeekToBeginning()
+	if err := spec.ValueType.Deserialize(cell, receiver); err != nil {
+		panic(fmt.Errorf("failed to deserialize persisted value `%s`: %w", spec.Name, err))
+	}
+
+	return true
+}
+
+func (s *storage) Set(spec ValueSpec, value interface{}) {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	cell, ok := s.cells[spec.Name]
+	if !ok {
+		panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+	}
+
+	err := spec.ValueType.Serialize(cell, value)
+	if err != nil {
+		panic(fmt.Errorf("failed to serialize %s: %w", spec.Name, err))
+	}
+}
+
+func (s *storage) Remove(spec ValueSpec) {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	cell, ok := s.cells[spec.Name]
+	if !ok {
+		panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+	}
+
+	cell.Delete()
+}
+
+func (s *storage) getStateMutations() []*protocol.FromFunction_PersistedValueMutation {
+	mutations := make([]*protocol.FromFunction_PersistedValueMutation, 0, len(s.cells))
+	for name, cell := range s.cells {
+		if mutation := cell.GetStateMutation(name); mutation != nil {
+			mutations = append(mutations, mutation)
+		}
+	}
+
+	return mutations
+}
+
+type MissingSpecs []*protocol.FromFunction_PersistedValueSpec
+
+func (m MissingSpecs) getStorage() *storage {
+	return nil
+}
+
+func (m MissingSpecs) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
+	return m
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/typename.go b/statefun-sdk-go/v3/pkg/statefun/typename.go
new file mode 100644
index 0000000..8491dc5
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/typename.go
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"errors"
+	"fmt"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"strings"
+)
+
+var (
+	boolTypeName    = TypeNameFrom("io.statefun.types/bool")
+	int32TypeName   = TypeNameFrom("io.statefun.types/int")
+	int64TypeName   = TypeNameFrom("io.statefun.types/long")
+	float32TypeName = TypeNameFrom("io.statefun.types/float")
+	float64TypeName = TypeNameFrom("io.statefun.types/double")
+	stringTypeName  = TypeNameFrom("io.statefun.types/string")
+)
+
+// A TypeName is used to uniquely identify objects within
+// a Stateful Functions application, including functions,
+// egresses, and types. TypeName's serve as an integral
+// part of identifying these objects for message delivery
+// as well as message data serialization and deserialization.
+type TypeName interface {
+	fmt.Stringer
+	GetNamespace() string
+	GetType() string
+}
+
+type typeName struct {
+	namespace      string
+	tpe            string
+	typenameString string
+}
+
+func (t typeName) String() string {
+	return t.typenameString
+}
+
+func (t typeName) GetNamespace() string {
+	return t.namespace
+}
+
+func (t typeName) GetType() string {
+	return t.tpe
+}
+
+// TypeNameFrom creates a TypeName from a canonical string
+// in the format `<namespace>/<type>`. This Function
+// assumes correctly formatted strings and will panic
+// on error. For runtime error handling please
+// see ParseTypeName.
+func TypeNameFrom(typename string) TypeName {
+	result, err := ParseTypeName(typename)
+	if err != nil {
+		panic(err)
+	}
+
+	return result
+}
+
+// ParseTypeName creates a TypeName from a canonical string
+// in the format `<namespace>/<type>`.
+func ParseTypeName(typename string) (TypeName, error) {
+	position := strings.LastIndex(typename, "/")
+	if position <= 0 || position == len(typename)-1 {
+		return nil, fmt.Errorf("%v does not conform to the <namespace>/<type> format", typename)
+	}
+
+	namespace := typename[:position]
+	name := typename[position+1:]
+
+	if namespace[len(namespace)-1] == '/' {
+		namespace = namespace[:len(namespace)-1]
+	}
+
+	return TypeNameFromParts(namespace, name)
+}
+
+func TypeNameFromParts(namespace, tpe string) (TypeName, error) {
+	if len(namespace) == 0 {
+		return nil, errors.New("namespace cannot be empty")
+	}
+
+	if len(tpe) == 0 {
+		return nil, errors.New("type cannot be empty")
+	}
+
+	return typeName{
+		namespace:      namespace,
+		tpe:            tpe,
+		typenameString: fmt.Sprintf("%s/%s", namespace, tpe),
+	}, nil
+}
+
+// An Address is the unique identity of an individual StatefulFunction,
+// containing the function's FunctionType and a unique identifier
+// within the type. The function's type denotes the type (or class) of function
+// to invoke, while the unique identifier addresses the invocation to a specific
+// function instance.
+type Address struct {
+	FunctionType TypeName
+	Id           string
+}
+
+func (a Address) String() string {
+	return fmt.Sprintf("Address(%s, %s, %s)", a.FunctionType.GetNamespace(), a.FunctionType.GetType(), a.Id)
+}
+
+func addressFromInternal(a *protocol.Address) Address {
+	name, _ := TypeNameFromParts(a.Namespace, a.Type)
+	return Address{
+		FunctionType: name,
+		Id:           a.Id,
+	}
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/typename_test.go b/statefun-sdk-go/v3/pkg/statefun/typename_test.go
new file mode 100644
index 0000000..c07cd19
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/typename_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestTypeNameParse(t *testing.T) {
+	typename, err := ParseTypeName("namespace/tpe")
+
+	assert.NoError(t, err)
+	assert.Equal(t, typename.GetNamespace(), "namespace")
+	assert.Equal(t, typename.GetType(), "tpe")
+}
+
+func TestNoNamespace(t *testing.T) {
+	_, err := ParseTypeName("/bar")
+	assert.Error(t, err)
+}
+
+func TestNoName(t *testing.T) {
+	_, err := ParseTypeName("n/")
+	assert.Error(t, err)
+}
+
+func TestNoNamespaceOrName(t *testing.T) {
+	_, err := ParseTypeName("/")
+	assert.Error(t, err)
+}
+
+func TestEmptyString(t *testing.T) {
+	_, err := ParseTypeName("")
+	assert.Error(t, err)
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/types.go b/statefun-sdk-go/v3/pkg/statefun/types.go
new file mode 100644
index 0000000..e820a3a
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/types.go
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"encoding/json"
+	"errors"
+	"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+	"google.golang.org/protobuf/proto"
+	"io"
+	"log"
+)
+
+// SimpleType interface is the core abstraction used by Stateful
+// Function's type system, and consists of a few things
+// that StateFun uses to handle Message's and ValueSpec's
+//
+// 1. TypeName to identify the type.
+// 2. (De)serialization methods for marshalling and unmarshalling data
+//
+// Cross-language primitive types
+//
+// StateFun's type system has cross-language support for common primitive
+// types, such as boolean, integer (int32), long (int64), etc. These
+// primitive types have built-in SimpleType's implemented for them already
+// with predefined TypeName's.
+//
+// These primitives have standard encoding across all StateFun language
+// SDKs, so functions in various other languages (Java, Python, etc) can
+// message Golang functions by directly sending supported primitive
+// values as message arguments. Moreover, the type system is used for
+// state values as well; so you can expect that a function can safely
+// read previous state after reimplementing it in a different language.
+//
+// Common custom types
+//
+// The type system is also very easily extensible to support more complex types.
+// The Go SDK ships with predefined support for JSON and Protobuf - see MakeJsonType
+// MakeProtobufType. For other formats, it is just a matter of implementing
+// your own SimpleType with a custom typename and serializer.
+type SimpleType interface {
+	GetTypeName() TypeName
+
+	Deserialize(r io.Reader, receiver interface{}) error
+
+	Serialize(writer io.Writer, data interface{}) error
+}
+
+type PrimitiveType int
+
+const (
+	BoolType PrimitiveType = iota
+	Int32Type
+	Int64Type
+	Float32Type
+	Float64Type
+	StringType
+)
+
+var (
+	boolWrapperType    = MakeProtobufTypeWithTypeName(boolTypeName)
+	int32WrapperType   = MakeProtobufTypeWithTypeName(int32TypeName)
+	int64WrapperType   = MakeProtobufTypeWithTypeName(int64TypeName)
+	float32WrapperType = MakeProtobufTypeWithTypeName(float64TypeName)
+	float64WrapperType = MakeProtobufTypeWithTypeName(float64TypeName)
+	stringWrapperType  = MakeProtobufTypeWithTypeName(stringTypeName)
+)
+
+func (p PrimitiveType) GetTypeName() TypeName {
+	switch p {
+	case BoolType:
+		return boolTypeName
+	case Int32Type:
+		return int32TypeName
+	case Int64Type:
+		return int64TypeName
+	case Float32Type:
+		return float32TypeName
+	case Float64Type:
+		return float64TypeName
+	case StringType:
+		return stringTypeName
+	default:
+		log.Fatalf("unknown primitive type %v", p)
+		// unreachable
+		return nil
+	}
+}
+
+func (p PrimitiveType) Deserialize(r io.Reader, receiver interface{}) error {
+	switch p {
+	case BoolType:
+		switch data := receiver.(type) {
+		case *bool:
+			var wrapper protocol.BooleanWrapper
+			if err := boolWrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type bool or *bool")
+		}
+	case Int32Type:
+		switch data := receiver.(type) {
+		case *int32:
+			var wrapper protocol.IntWrapper
+			if err := int32WrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type *int32")
+		}
+	case Int64Type:
+		switch data := receiver.(type) {
+		case *int64:
+			var wrapper protocol.LongWrapper
+			if err := int64WrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type *int64")
+		}
+	case Float32Type:
+		switch data := receiver.(type) {
+		case *float32:
+			var wrapper protocol.FloatWrapper
+			if err := float32WrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type *float32")
+		}
+	case Float64Type:
+		switch data := receiver.(type) {
+		case *float64:
+			var wrapper protocol.DoubleWrapper
+			if err := float64WrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type *float64")
+		}
+	case StringType:
+		switch data := receiver.(type) {
+		case *string:
+			var wrapper protocol.StringWrapper
+			if err := stringWrapperType.Deserialize(r, &wrapper); err != nil {
+				return err
+			}
+
+			*data = wrapper.Value
+		default:
+			return errors.New("receiver must be of type *string")
+		}
+	default:
+		log.Fatalf("unknown primitive type %v", p)
+		// unreachable
+		return nil
+	}
+
+	return nil
+}
+
+func (p PrimitiveType) Serialize(writer io.Writer, data interface{}) error {
+	switch p {
+	case BoolType:
+		switch data := data.(type) {
+		case bool:
+			wrapper := protocol.BooleanWrapper{Value: data}
+			return boolWrapperType.Serialize(writer, &wrapper)
+		case *bool:
+			wrapper := protocol.BooleanWrapper{Value: *data}
+			return boolWrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type bool or *bool")
+		}
+	case Int32Type:
+		switch data := data.(type) {
+		case int32:
+			wrapper := protocol.IntWrapper{Value: data}
+			return int32WrapperType.Serialize(writer, &wrapper)
+		case *int32:
+			wrapper := protocol.IntWrapper{Value: *data}
+			return int32WrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type int32 or *int32")
+		}
+	case Int64Type:
+		switch data := data.(type) {
+		case int64:
+			wrapper := protocol.LongWrapper{Value: data}
+			return int64WrapperType.Serialize(writer, &wrapper)
+		case *int64:
+			wrapper := protocol.LongWrapper{Value: *data}
+			return int64WrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type int64 or *int64")
+		}
+	case Float32Type:
+		switch data := data.(type) {
+		case float32:
+			wrapper := protocol.FloatWrapper{Value: data}
+			return float32WrapperType.Serialize(writer, &wrapper)
+		case *float32:
+			wrapper := protocol.FloatWrapper{Value: *data}
+			return float32WrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type float32 or *float32")
+		}
+	case Float64Type:
+		switch data := data.(type) {
+		case float64:
+			wrapper := protocol.DoubleWrapper{Value: data}
+			return float64WrapperType.Serialize(writer, &wrapper)
+		case *float64:
+			wrapper := protocol.DoubleWrapper{Value: *data}
+			return float64WrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type float64 or *float64")
+		}
+	case StringType:
+		switch data := data.(type) {
+		case string:
+			wrapper := protocol.StringWrapper{Value: data}
+			return stringWrapperType.Serialize(writer, &wrapper)
+		case *string:
+			wrapper := protocol.StringWrapper{Value: *data}
+			return stringWrapperType.Serialize(writer, &wrapper)
+		default:
+			return errors.New("data must be of type string or *string")
+		}
+	default:
+		log.Fatalf("unknown primitive type %v", p)
+		// unreachable
+		return nil
+	}
+}
+
+type jsonType struct {
+	typeName TypeName
+}
+
+// MakeJsonType creates  a new SimpleType with a given TypeName
+// using the standard Go JSON library.
+func MakeJsonType(name TypeName) SimpleType {
+	return jsonType{typeName: name}
+}
+
+func (j jsonType) GetTypeName() TypeName {
+	return j.typeName
+}
+
+func (j jsonType) Deserialize(r io.Reader, receiver interface{}) error {
+	return json.NewDecoder(r).Decode(receiver)
+}
+
+func (j jsonType) Serialize(writer io.Writer, data interface{}) error {
+	return json.NewEncoder(writer).Encode(data)
+}
+
+type protoType struct {
+	typeName TypeName
+}
+
+// MakeProtobufType creates a new SimpleType for the given protobuf Message.
+func MakeProtobufType(m proto.Message) SimpleType {
+	name := proto.MessageName(m)
+	tName, _ := TypeNameFromParts("type.googleapis.com", string(name))
+	return MakeProtobufTypeWithTypeName(tName)
+}
+
+// MakeProtobufTypeWithTypeName creates a new SimpleType for the
+// given protobuf Message with a custom namespace.
+func MakeProtobufTypeWithTypeName(typeName TypeName) SimpleType {
+	return protoType{
+		typeName: typeName,
+	}
+}
+
+func (p protoType) GetTypeName() TypeName {
+	return p.typeName
+}
+
+func (p protoType) Deserialize(r io.Reader, receiver interface{}) (err error) {
+	switch receiver := receiver.(type) {
+	case proto.Message:
+		data, err := io.ReadAll(r)
+		if err != nil {
+			return err
+		}
+
+		return proto.Unmarshal(data, receiver)
+	default:
+		return errors.New("receiver must implement proto.Message")
+	}
+}
+
+func (p protoType) Serialize(writer io.Writer, data interface{}) error {
+	switch data := data.(type) {
+	case proto.Message:
+		if value, err := proto.Marshal(data); err != nil {
+			return err
+		} else {
+			_, err = writer.Write(value)
+			return err
+		}
+
+	default:
+		return errors.New("data must implement proto.Message")
+	}
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/types_test.go b/statefun-sdk-go/v3/pkg/statefun/types_test.go
new file mode 100644
index 0000000..4e00da6
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/types_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"bytes"
+	"github.com/stretchr/testify/assert"
+	"math"
+	"testing"
+)
+
+func TestBoolType(t *testing.T) {
+	testBool(t, true)
+	testBool(t, false)
+}
+
+func testBool(t *testing.T, data bool) {
+	buffer := bytes.Buffer{}
+	err := BoolType.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result bool
+	err = BoolType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+func TestIntType(t *testing.T) {
+	testInt32(t, 1)
+	testInt32(t, 1048576)
+	testInt32(t, math.MaxInt32)
+	testInt32(t, math.MinInt32)
+	testInt32(t, -1)
+}
+
+func testInt32(t *testing.T, data int32) {
+	buffer := bytes.Buffer{}
+	err := Int32Type.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result int32
+	err = Int32Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+func TestLongType(t *testing.T) {
+	testInt64(t, -1)
+	testInt64(t, 0)
+	testInt64(t, math.MinInt64)
+	testInt64(t, math.MaxInt64)
+}
+
+func testInt64(t *testing.T, data int64) {
+	buffer := bytes.Buffer{}
+	err := Int64Type.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result int64
+	err = Int64Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+func TestFloatType(t *testing.T) {
+	testFloat32(t, math.MaxFloat32)
+	testFloat32(t, math.SmallestNonzeroFloat32)
+	testFloat32(t, 2.1459)
+	testFloat32(t, -1e4)
+}
+
+func testFloat32(t *testing.T, data float32) {
+	buffer := bytes.Buffer{}
+	err := Float32Type.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result float32
+	err = Float32Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+func TestDoubleType(t *testing.T) {
+	testFloat64(t, math.MaxFloat64)
+	testFloat64(t, math.SmallestNonzeroFloat64)
+	testFloat64(t, 2.1459)
+	testFloat64(t, -1e4)
+}
+
+func testFloat64(t *testing.T, data float64) {
+	buffer := bytes.Buffer{}
+	err := Float64Type.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result float64
+	err = Float64Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+func TestStringType(t *testing.T) {
+	testString(t, "")
+	testString(t, "This is a string")
+}
+
+func testString(t *testing.T, data string) {
+	buffer := bytes.Buffer{}
+	err := StringType.Serialize(&buffer, data)
+	assert.NoError(t, err)
+
+	var result string
+	err = StringType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, data, result)
+}
+
+type User struct {
+	FirstName string `json:"first_name"`
+	LastName  string `json:"last_name"`
+}
+
+func TestJsonType(t *testing.T) {
+	buffer := bytes.Buffer{}
+	userType := MakeJsonType(TypeNameFrom("org.foo.bar/UserJson"))
+
+	err := userType.Serialize(&buffer, User{"bob", "mop"})
+	assert.NoError(t, err)
+
+	var result User
+	err = userType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+	assert.NoError(t, err)
+
+	assert.Equal(t, result, User{"bob", "mop"})
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/value_spec.go b/statefun-sdk-go/v3/pkg/statefun/value_spec.go
new file mode 100644
index 0000000..6d51cd8
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/value_spec.go
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+	"fmt"
+	"log"
+	"regexp"
+	"time"
+)
+
+type expirationType int
+
+const (
+	none expirationType = iota
+	expireAfterCall
+	expireAfterWrite
+)
+
+func (e expirationType) String() string {
+	switch e {
+	case expireAfterCall:
+		return "expire_after_call"
+	case expireAfterWrite:
+		return "expire_after_write"
+	case none:
+		return "none"
+	default:
+		panic("unknown Expiration type")
+	}
+}
+
+// Expiration Configuration
+//
+// Defines the way state can be auto expired by the runtime.
+// State Expiration (also known as TTL) can be used to keep
+// state from growing arbitrarily by assigning an Expiration
+// date to a value.
+//
+// State can be expired after a duration has passed since either
+// the last write to the state, or the last call to the Function.
+type Expiration struct {
+	expirationType
+	duration time.Duration
+}
+
+func (e Expiration) String() string {
+	return fmt.Sprintf("Expiration{mode=%v, duration=%v}", e.expirationType.String(), e.duration.String())
+}
+
+// ExpireAfterCall returns an Expiration configuration that would expire
+// a duration after the last invocation of the Function.
+func ExpireAfterCall(duration time.Duration) Expiration {
+	return Expiration{
+		expireAfterCall,
+		duration,
+	}
+}
+
+// ExpireAfterWrite returns an Expiration configuration that
+// would expire a duration after the last write.
+func ExpireAfterWrite(duration time.Duration) Expiration {
+	return Expiration{
+		expireAfterWrite,
+		duration,
+	}
+}
+
+// A ValueSpec identifies a registered persistent value of a function, which will be
+// managed by the Stateful Functions runtime for consistency and fault-tolerance. A
+// ValueSpec is registered for a function by configuring it on the function's
+// associated StatefulFunctionSpec.
+type ValueSpec struct {
+	// The given tpe of the persistent value. The tpe must be a valid
+	// identifier conforming to the following rules:
+	//
+	// 1. First character must be an alphabet letter [a-z] / [A-Z], or an underscore '_'.
+	// 2. Remaining characters can be an alphabet letter [a-z] / [A-Z], a digit [0-9], or
+	//    an underscore '-'.
+	// 3. Must not contain any spaces.
+	Name string
+
+	// The SimpleType of the persistent value. Either
+	// a built-in PrimitiveType or custom implementation.
+	ValueType SimpleType
+
+	// An optional expiration configuration.
+	Expiration Expiration
+}
+
+func (v *ValueSpec) String() string {
+	return "ValueSpec{name=" + v.Name + ", type=" + v.ValueType.GetTypeName().String() + ", expiration=" + v.Expiration.String() + "}"
+}
+
+const invalidNameMessage = `
+invalid state tpe %s. state names can only start with alphabet letters [a-z][A-Z] or an underscore '_' followed by zero or more characters that are alphanumeric or underscores
+`
+
+func validateValueSpec(s ValueSpec) error {
+	matched, err := regexp.MatchString("^[a-zA-Z_][a-zA-Z_\\d]*$", s.Name)
+	if err != nil {
+		log.Panicf("invalid regex; this is a bug: %v", err)
+	}
+
+	if !matched {
+		return fmt.Errorf(invalidNameMessage, s.Name)
+	}
+
+	return nil
+}
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
index dc280b6..b134417 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
@@ -22,6 +22,7 @@
 
 option java_package = "org.apache.flink.statefun.sdk.egress.generated";
 option java_multiple_files = true;
+option go_package = ".;protocol";
 
 message KafkaProducerRecord {
     string key = 1;
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
index a365443..626c7a8 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
@@ -22,6 +22,7 @@
 
 option java_package = "org.apache.flink.statefun.sdk.egress.generated";
 option java_multiple_files = true;
+option go_package = ".;protocol";
 
 message KinesisEgressRecord {
     string partition_key = 1;
diff --git a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
index ac72d7c..aeaaf06 100644
--- a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
+++ b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
@@ -22,6 +22,7 @@
 
 option java_package = "org.apache.flink.statefun.sdk.reqreply.generated";
 option java_multiple_files = true;
+option go_package = ".;protocol";
 
 // -------------------------------------------------------------------------------------------------------------------
 // Common message definitions
diff --git a/statefun-sdk-protos/src/main/protobuf/types/types.proto b/statefun-sdk-protos/src/main/protobuf/types/types.proto
index 3674546..a16f343 100644
--- a/statefun-sdk-protos/src/main/protobuf/types/types.proto
+++ b/statefun-sdk-protos/src/main/protobuf/types/types.proto
@@ -22,7 +22,7 @@
 
 option java_package = "org.apache.flink.statefun.sdk.types.generated";
 option java_multiple_files = true;
-
+option go_package = ".;protocol";
 
 // BooleanWrapper represents a StateFun primitive type of a boolean value. This is recognized as:
 // io.statefun.types/bool