[FLINK-18810][sdk] Golang remote functions SDK
diff --git a/pom.xml b/pom.xml
index e963671..5c6b963 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
<module>statefun-sdk-protos</module>
<module>statefun-sdk-java</module>
<module>statefun-sdk-python</module>
+ <module>statefun-sdk-go</module>
<module>statefun-kafka-io</module>
<module>statefun-kinesis-io</module>
<module>statefun-flink</module>
@@ -228,18 +229,20 @@
<!-- Generated content -->
<exclude>**/target/**</exclude>
<exclude>**/_build/**</exclude>
- <exclude>docs/static/font-awesome/**</exclude>
- <exclude>docs/resources/**</exclude>
- <exclude>docs/public/**</exclude>
- <exclude>docs/themes/book/**</exclude>
- <exclude>docs/assets/github.css</exclude>
- <exclude>docs/static/js/anchor.min.js</exclude>
+ <exclude>docs/static/font-awesome/**</exclude>
+ <exclude>docs/resources/**</exclude>
+ <exclude>docs/public/**</exclude>
+ <exclude>docs/themes/book/**</exclude>
+ <exclude>docs/assets/github.css</exclude>
+ <exclude>docs/static/js/anchor.min.js</exclude>
<!-- Generated code -->
<exclude>**/generated/**</exclude>
<!-- Bundled license files -->
<exclude>**/LICENSE*</exclude>
<!-- Python venv -->
<exclude>**/venv/**</exclude>
+ <!-- Generated lock file -->
+ <exclude>**/go.sum/**</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/statefun-sdk-go/generate-dev-protos.sh b/statefun-sdk-go/generate-dev-protos.sh
new file mode 100644
index 0000000..690c0f9
--- /dev/null
+++ b/statefun-sdk-go/generate-dev-protos.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+SDK_PROTOS_DIR="${BASE_DIR}/../statefun-sdk-protos/src/main/protobuf"
+
+
+cd ${BASE_DIR}
+find ${SDK_PROTOS_DIR} -type f -name "*proto" -exec cp {} . \;
+protoc *proto --go_out=v3/internal/protocol/
+rm *proto
+cd ${CURR_DIR}
\ No newline at end of file
diff --git a/statefun-sdk-go/go.mod b/statefun-sdk-go/go.mod
new file mode 100644
index 0000000..a8b54b1
--- /dev/null
+++ b/statefun-sdk-go/go.mod
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module github.com/apache/flink-statefun/statefun-sdk-go
+
+go 1.16
+
+require (
+ github.com/stretchr/testify v1.7.0
+ google.golang.org/protobuf v1.26.0
+)
diff --git a/statefun-sdk-go/go.sum b/statefun-sdk-go/go.sum
new file mode 100644
index 0000000..cc352a6
--- /dev/null
+++ b/statefun-sdk-go/go.sum
@@ -0,0 +1,19 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/statefun-sdk-go/pom.xml b/statefun-sdk-go/pom.xml
new file mode 100644
index 0000000..4d15061
--- /dev/null
+++ b/statefun-sdk-go/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>3.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>statefun-sdk-go</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/statefun-sdk-go/run-unit-tests.sh b/statefun-sdk-go/run-unit-tests.sh
new file mode 100755
index 0000000..1070e9b
--- /dev/null
+++ b/statefun-sdk-go/run-unit-tests.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if ! command -v go &> /dev/null
+then
+ echo "Could not find go compiler; skipping tests"
+ exit 0
+fi
+
+CURR_DIR=`pwd`
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+GO_SDK_DIR="${BASE_DIR}/../statefun-sdk-go/"
+
+
+cd ${GO_SDK_DIR}
+go test ./...
+cd ${CURR_DIR}
\ No newline at end of file
diff --git a/statefun-sdk-go/v3/pkg/statefun/cancellation.go b/statefun-sdk-go/v3/pkg/statefun/cancellation.go
new file mode 100644
index 0000000..4747b95
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/cancellation.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "errors"
+ "fmt"
+)
+
+// CancellationToken tags a delayed message send with statefun.SendAfterWithCancellationToken.
+// It can then be used to cancel said message on a best effort basis with statefun.CancelDelayedMessage.
+// The underlying string token can be retrieved by invoking Token().
+type CancellationToken interface {
+ fmt.Stringer
+
+ // Token returns the underlying string
+ // used to create the CancellationToken.
+ Token() string
+
+ // prevents external implementations
+ // of the interface.
+ internal()
+}
+
+type token string
+
+func (t token) String() string {
+ return "CancellationToken(" + string(t) + ")"
+}
+
+func (t token) Token() string {
+ return string(t)
+}
+
+func (t token) internal() {}
+
+// NewCancellationToken creates a new cancellation token or
+// returns an error if the token is invalid.
+func NewCancellationToken(t string) (CancellationToken, error) {
+ if len(t) == 0 {
+ return nil, errors.New("cancellation token cannot be empty")
+ }
+ return token(t), nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go b/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go
new file mode 100644
index 0000000..a6e49b7
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/cancellation_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestNewCancellationToken(t *testing.T) {
+ _, err := NewCancellationToken("")
+ assert.Error(t, err, "empty strings should fail token validation")
+
+ token, err := NewCancellationToken("token")
+ assert.NoError(t, err, "failed to validate token")
+ assert.Equal(t, "token", token.Token(), "failed to return correct token")
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go b/statefun-sdk-go/v3/pkg/statefun/context.go
new file mode 100644
index 0000000..677cfcd
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/context.go
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "context"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "sync"
+ "time"
+)
+
+// A Context contains information about the current function invocation, such as the invoked
+// function instance's and caller's Address. It is also used for side effects as a result of
+// the invocation such as send messages to other functions or egresses, and provides access to
+// AddressScopedStorage scoped to the current Address. This type is also a context.Context
+// and can be used to ensure any spawned go routines do not outlive the current function
+// invocation.
+type Context interface {
+ context.Context
+
+ // Self is the current invoked function instance's Address.
+ Self() Address
+
+ // Caller is the caller function instance's Address, if applicable. This is nil
+ // if the message was sent to this function via an ingress.
+ Caller() *Address
+
+ // Send forwards out a MessageBuilder to another function.
+ Send(message MessageBuilder)
+
+ // SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay.
+ SendAfter(delay time.Duration, message MessageBuilder)
+
+ // SendAfterWithCancellationToken forwards out a MessageBuilder to another function,
+ // after a specified time.Duration delay. The message is tagged with a non-empty,
+ //unique token to attach to this message, to be used for message cancellation
+ SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder)
+
+ // CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken).
+ // NOTE: this is a best-effort operation, since the message might have been already delivered.
+ // If the message was delivered, this is a no-op operation.
+ CancelDelayedMessage(token CancellationToken)
+
+ // SendEgress forwards out an EgressBuilder to an egress.
+ SendEgress(egress EgressBuilder)
+
+ // Storage returns the AddressScopedStorage, providing access to stored values scoped to the
+ // current invoked function instance's Address (which is obtainable using Self()).
+ Storage() AddressScopedStorage
+}
+
+type statefunContext struct {
+ sync.Mutex
+ context.Context
+ self Address
+ caller *Address
+ storage *storage
+ response *protocol.FromFunction_InvocationResponse
+}
+
+func (s *statefunContext) Storage() AddressScopedStorage {
+ return s.storage
+}
+
+func (s *statefunContext) Self() Address {
+ return s.self
+}
+
+func (s *statefunContext) Caller() *Address {
+ return s.caller
+}
+
+func (s *statefunContext) Send(message MessageBuilder) {
+ msg, err := message.ToMessage()
+
+ if err != nil {
+ panic(err)
+ }
+
+ invocation := &protocol.FromFunction_Invocation{
+ Target: msg.target,
+ Argument: msg.typedValue,
+ }
+
+ s.Lock()
+ s.response.OutgoingMessages = append(s.response.OutgoingMessages, invocation)
+ s.Unlock()
+}
+
+func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) {
+ msg, err := message.ToMessage()
+
+ if err != nil {
+ panic(err)
+ }
+
+ invocation := &protocol.FromFunction_DelayedInvocation{
+ Target: msg.target,
+ Argument: msg.typedValue,
+ DelayInMs: delay.Milliseconds(),
+ }
+
+ s.Lock()
+ s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+ s.Unlock()
+}
+
+func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) {
+ msg, err := message.ToMessage()
+
+ if err != nil {
+ panic(err)
+ }
+
+ invocation := &protocol.FromFunction_DelayedInvocation{
+ CancellationToken: token.Token(),
+ Target: msg.target,
+ Argument: msg.typedValue,
+ DelayInMs: delay.Milliseconds(),
+ }
+
+ s.Lock()
+ s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+ s.Unlock()
+}
+
+func (s *statefunContext) CancelDelayedMessage(token CancellationToken) {
+ invocation := &protocol.FromFunction_DelayedInvocation{
+ IsCancellationRequest: true,
+ CancellationToken: token.Token(),
+ }
+
+ s.Lock()
+ s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation)
+ s.Unlock()
+}
+
+func (s *statefunContext) SendEgress(egress EgressBuilder) {
+ msg, err := egress.toEgressMessage()
+
+ if err != nil {
+ panic(err)
+ }
+
+ s.Lock()
+ s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
+ s.Unlock()
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go b/statefun-sdk-go/v3/pkg/statefun/context_test.go
new file mode 100644
index 0000000..18b3d1c
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/proto"
+ "testing"
+ "time"
+)
+
+func TestStatefunContext_Send(t *testing.T) {
+ context := createContext()
+
+ msg := MessageBuilder{
+ Target: Address{
+ FunctionType: TypeNameFrom("example/func"),
+ Id: "0",
+ },
+ Value: "hello",
+ }
+
+ context.Send(msg)
+ outgoing := context.response.GetOutgoingMessages()
+
+ assert.Equal(t, 1, len(outgoing), "incorrect number of outgoing messages")
+ assert.Equal(t, "example", outgoing[0].Target.Namespace, "incorrect target namespace")
+ assert.Equal(t, "func", outgoing[0].Target.Type, "incorrect target type")
+ assert.Equal(t, "0", outgoing[0].Target.Id, "incorrect target id")
+ assert.Equal(t, stringTypeName.String(), outgoing[0].Argument.Typename, "incorrect typename set for message")
+ assert.True(t, outgoing[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_SendAfter(t *testing.T) {
+ context := createContext()
+
+ msg := MessageBuilder{
+ Target: Address{
+ FunctionType: TypeNameFrom("example/func"),
+ Id: "0",
+ },
+ Value: "hello",
+ }
+
+ context.SendAfter(time.Duration(1)*time.Millisecond, msg)
+ delayed := context.response.GetDelayedInvocations()
+
+ assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+ assert.Equal(t, "example", delayed[0].Target.Namespace, "incorrect target namespace")
+ assert.Equal(t, "func", delayed[0].Target.Type, "incorrect target type")
+ assert.Equal(t, "0", delayed[0].Target.Id, "incorrect target id")
+ assert.Equal(t, int64(1), delayed[0].DelayInMs, "incorrect delay")
+ assert.Equal(t, "", delayed[0].CancellationToken, "set cancellation token")
+ assert.False(t, delayed[0].IsCancellationRequest, "delayed message should not be a cancellation request")
+ assert.Equal(t, stringTypeName.String(), delayed[0].Argument.Typename, "incorrect typename set for message")
+ assert.True(t, delayed[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_SendAfterWithCancellationTokenMessage(t *testing.T) {
+ context := createContext()
+
+ msg := MessageBuilder{
+ Target: Address{
+ FunctionType: TypeNameFrom("example/func"),
+ Id: "0",
+ },
+ Value: "hello",
+ }
+
+ token, err := NewCancellationToken("token")
+ assert.NoError(t, err, "failed to create token")
+
+ context.SendAfterWithCancellationToken(time.Duration(1)*time.Millisecond, token, msg)
+ delayed := context.response.GetDelayedInvocations()
+
+ assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+ assert.Equal(t, "example", delayed[0].Target.Namespace, "incorrect target namespace")
+ assert.Equal(t, "func", delayed[0].Target.Type, "incorrect target type")
+ assert.Equal(t, "0", delayed[0].Target.Id, "incorrect target id")
+ assert.Equal(t, int64(1), delayed[0].DelayInMs, "incorrect delay")
+ assert.Equal(t, token.Token(), delayed[0].CancellationToken, "failed to set cancellation token")
+ assert.False(t, delayed[0].IsCancellationRequest, "delayed message should not be a cancellation request")
+ assert.Equal(t, stringTypeName.String(), delayed[0].Argument.Typename, "incorrect typename set for message")
+ assert.True(t, delayed[0].Argument.HasValue, "argument does not have value")
+}
+
+func TestStatefunContext_CancelDelayedMessage(t *testing.T) {
+ context := createContext()
+ token, err := NewCancellationToken("token")
+ assert.NoError(t, err, "failed to create token")
+
+ context.CancelDelayedMessage(token)
+ delayed := context.response.GetDelayedInvocations()
+ assert.Equal(t, 1, len(delayed), "incorrect number of delayed messages")
+ assert.Equal(t, token.Token(), delayed[0].CancellationToken, "failed to set cancellation token")
+ assert.True(t, delayed[0].IsCancellationRequest, "delayed message should be a cancellation request")
+}
+
+func TestStatefunContext_SendEgress_Kafka(t *testing.T) {
+ context := createContext()
+
+ kafka := &KafkaEgressBuilder{
+ Target: TypeNameFrom("example/kafka"),
+ Topic: "topic",
+ Key: "key",
+ Value: "value",
+ }
+
+ context.SendEgress(kafka)
+ egress := context.response.GetOutgoingEgresses()
+
+ assert.Equal(t, 1, len(egress), "incorrect number of egress messages")
+ assert.Equal(t, "example", egress[0].EgressNamespace, "incorrect target namespace")
+ assert.Equal(t, "kafka", egress[0].EgressType, "incorrect target type")
+ assert.Equal(t, kafkaTypeName, egress[0].Argument.Typename, "incorrect typename")
+
+ kafkaRecord := protocol.KafkaProducerRecord{}
+ assert.NoError(t, proto.Unmarshal(egress[0].Argument.Value, &kafkaRecord), "failed to deserialize kafka record")
+ assert.Equal(t, "topic", kafkaRecord.Topic, "incorrect kafka topic")
+ assert.Equal(t, "key", kafkaRecord.Key, "incorrect kafka key")
+}
+
+func TestStatefunContext_SendEgress_Kinesis(t *testing.T) {
+ context := createContext()
+
+ kafka := &KinesisEgressBuilder{
+ Target: TypeNameFrom("example/kinesis"),
+ Stream: "stream",
+ PartitionKey: "key",
+ Value: "value",
+ }
+
+ context.SendEgress(kafka)
+ egress := context.response.GetOutgoingEgresses()
+
+ assert.Equal(t, 1, len(egress), "incorrect number of egress messages")
+ assert.Equal(t, "example", egress[0].EgressNamespace, "incorrect target namespace")
+ assert.Equal(t, "kinesis", egress[0].EgressType, "incorrect target type")
+ assert.Equal(t, kinesisTypeName, egress[0].Argument.Typename, "incorrect typename")
+
+ kinesis := protocol.KinesisEgressRecord{}
+ assert.NoError(t, proto.Unmarshal(egress[0].Argument.Value, &kinesis), "failed to deserialize kinesis record")
+ assert.Equal(t, "stream", kinesis.Stream, "incorrect kinesis stream")
+ assert.Equal(t, "key", kinesis.PartitionKey, "incorrect kinesis key")
+}
+
+// creates a context with the minimal state to
+// run tests.
+func createContext() *statefunContext {
+ return &statefunContext{
+ response: &protocol.FromFunction_InvocationResponse{},
+ }
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/egress.go b/statefun-sdk-go/v3/pkg/statefun/egress.go
new file mode 100644
index 0000000..20e7644
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/egress.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "google.golang.org/protobuf/proto"
+)
+
+const (
+ kafkaTypeName = "type.googleapis.com/io.statefun.sdk.egress.KafkaProducerRecord"
+ kinesisTypeName = "type.googleapis.com/io.statefun.sdk.egress.KinesisEgressRecord"
+)
+
+type EgressBuilder interface {
+ toEgressMessage() (*protocol.FromFunction_EgressMessage, error)
+}
+
+// KafkaEgressBuilder builds a message that can be emitted to a Kafka generic egress.
+// If a ValueType is provided, then Value will be serialized according to the
+// provided ValueType's serializer. Otherwise we will try to convert Value to bytes
+// if it is one of:
+// - utf-8 string
+// - []bytes
+// - an int (as defined by Kafka's serialization format)
+// - float (as defined by Kafka's serialization format)
+type KafkaEgressBuilder struct {
+ // The TypeName as specified in module.yaml
+ Target TypeName
+
+ // The Kafka destination topic for that record
+ Topic string
+
+ // The utf8 encoded string key to produce (can be empty)
+ Key string
+
+ // The value to produce
+ Value interface{}
+
+ // An optional hint to this values type
+ ValueType SimpleType
+}
+
+func (k KafkaEgressBuilder) isEnvelope() {}
+
+func (k KafkaEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+ if k.Target == nil {
+ return nil, errors.New("an egress record requires a Target")
+ }
+ if k.Topic == "" {
+ return nil, errors.New("A Kafka record requires a topic")
+ }
+
+ if k.Value == nil {
+ return nil, errors.New("A Kafka record requires a value")
+ }
+
+ buffer := bytes.Buffer{}
+ if k.ValueType != nil {
+ if err := k.ValueType.Serialize(&buffer, k.Value); err != nil {
+ return nil, err
+ }
+ } else {
+ switch value := k.Value.(type) {
+ case string:
+ _ = StringType.Serialize(&buffer, value)
+ case []byte:
+ buffer.Write(value)
+ case int, int32, int64, float32, float64:
+ if err := binary.Write(&buffer, binary.BigEndian, value); err != nil {
+ return nil, err
+ }
+ default:
+ return nil, errors.New("unable to convert value to bytes")
+ }
+ }
+
+ kafka := protocol.KafkaProducerRecord{
+ Key: k.Key,
+ ValueBytes: buffer.Bytes(),
+ Topic: k.Topic,
+ }
+
+ value, err := proto.Marshal(&kafka)
+ if err != nil {
+ return nil, err
+ }
+
+ return &protocol.FromFunction_EgressMessage{
+ EgressNamespace: k.Target.GetNamespace(),
+ EgressType: k.Target.GetType(),
+ Argument: &protocol.TypedValue{
+ Typename: kafkaTypeName,
+ HasValue: true,
+ Value: value,
+ },
+ }, nil
+}
+
+// KinesisEgressBuilder builds a message that can be emitted to a Kinesis generic egress.
+// If a ValueType is provided, then Value will be serialized according to the
+// provided ValueType's serializer. Otherwise we will try to convert Value to bytes
+// if it is one of:
+// - utf-8 string
+// - []byte
+type KinesisEgressBuilder struct {
+ // The TypeName as specified in module.yaml
+ Target TypeName
+
+ // The Kinesis destination stream for that record
+ Stream string
+
+ // The value to produce
+ Value interface{}
+
+ // An optional hint to this value type
+ ValueType SimpleType
+
+ // The utf8 encoded string partition key to use
+ PartitionKey string
+
+ // A utf8 encoded string explicit hash key to use (can be empty)
+ ExplicitHashKey string
+}
+
+func (k KinesisEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+ if k.Target == nil {
+ return nil, errors.New("an egress record requires a Target")
+ } else if k.Stream == "" {
+ return nil, errors.New("missing destination Kinesis stream")
+ } else if k.Value == nil {
+ return nil, errors.New("missing value")
+ } else if k.PartitionKey == "" {
+ return nil, errors.New("missing partition key")
+ }
+
+ buffer := bytes.Buffer{}
+ if k.ValueType != nil {
+ if err := k.ValueType.Serialize(&buffer, k.Value); err != nil {
+ return nil, err
+ }
+ } else {
+ switch value := k.Value.(type) {
+ case string:
+ _ = StringType.Serialize(&buffer, value)
+ case []byte:
+ buffer.Write(value)
+ default:
+ return nil, errors.New("unable to convert value to bytes")
+ }
+ }
+
+ kinesis := protocol.KinesisEgressRecord{
+ PartitionKey: k.PartitionKey,
+ ValueBytes: buffer.Bytes(),
+ Stream: k.Stream,
+ ExplicitHashKey: k.ExplicitHashKey,
+ }
+
+ value, err := proto.Marshal(&kinesis)
+ if err != nil {
+ return nil, err
+ }
+
+ return &protocol.FromFunction_EgressMessage{
+ EgressNamespace: k.Target.GetNamespace(),
+ EgressType: k.Target.GetType(),
+ Argument: &protocol.TypedValue{
+ Typename: kinesisTypeName,
+ HasValue: true,
+ Value: value,
+ },
+ }, nil
+}
+
+// GenericEgressBuilder create a generic egress record. For Kafka
+// and Kinesis see KafkaEgressBuilder and
+// KinesisEgressBuilder respectively
+type GenericEgressBuilder struct {
+ // The TypeName as specified when registered
+ Target TypeName
+
+ // The value to produce
+ Value interface{}
+
+ // The values type
+ ValueType SimpleType
+}
+
+func (g GenericEgressBuilder) toEgressMessage() (*protocol.FromFunction_EgressMessage, error) {
+ if g.Target == nil {
+ return nil, errors.New("an egress record requires a Target")
+ } else if g.ValueType == nil {
+ return nil, errors.New("missing value type")
+ } else if g.Value == nil {
+ return nil, errors.New("missing value")
+ }
+
+ buffer := bytes.Buffer{}
+ if err := g.ValueType.Serialize(&buffer, g.Value); err != nil {
+ return nil, err
+ }
+
+ return &protocol.FromFunction_EgressMessage{
+ EgressNamespace: g.Target.GetNamespace(),
+ EgressType: g.Target.GetType(),
+ Argument: &protocol.TypedValue{
+ Typename: g.ValueType.GetTypeName().String(),
+ HasValue: true,
+ Value: buffer.Bytes(),
+ },
+ }, nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/handler.go b/statefun-sdk-go/v3/pkg/statefun/handler.go
new file mode 100644
index 0000000..92e6f91
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/handler.go
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "google.golang.org/protobuf/proto"
+ "log"
+ "net/http"
+)
+
+// StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler
+// can be created from the registry that understands how to dispatch
+// invocation requests to the registered functions as well as encode
+// side-effects (e.g., sending messages to other functions or updating
+// values in storage) as the response.
+type StatefulFunctions interface {
+
+ // WithSpec registers a StatefulFunctionSpec, which will be
+ // used to build the runtime function. It returns an error
+ // if the specification is invalid and the handler
+ // fails to register the function.
+ WithSpec(spec StatefulFunctionSpec) error
+
+ // AsHandler creates a RequestReplyHandler from the registered
+ // function specs.
+ AsHandler() RequestReplyHandler
+}
+
+// The RequestReplyHandler processes messages
+// from the runtime, invokes functions, and encodes
+// side effects. The handler implements http.Handler
+// so it can easily be embedded in standard Go server
+// frameworks.
+type RequestReplyHandler interface {
+ http.Handler
+
+ // Invoke method provides compliance with AWS Lambda handler
+ Invoke(ctx context.Context, payload []byte) ([]byte, error)
+}
+
+// StatefulFunctionsBuilder creates a new StatefulFunctions registry.
+func StatefulFunctionsBuilder() StatefulFunctions {
+ return &handler{
+ module: map[TypeName]StatefulFunction{},
+ stateSpecs: map[TypeName]map[string]*protocol.FromFunction_PersistedValueSpec{},
+ }
+}
+
+type handler struct {
+ module map[TypeName]StatefulFunction
+ stateSpecs map[TypeName]map[string]*protocol.FromFunction_PersistedValueSpec
+}
+
+func (h *handler) WithSpec(spec StatefulFunctionSpec) error {
+ log.Printf("registering Stateful Function %v]n", spec.FunctionType)
+ if _, exists := h.module[spec.FunctionType]; exists {
+ err := fmt.Errorf("failed to register Stateful Function %s, there is already a spec registered under that type", spec.FunctionType)
+ log.Printf(err.Error())
+ return err
+ }
+
+ if spec.Function == nil {
+ err := fmt.Errorf("failed to register Stateful Function %s, the Function instance cannot be nil", spec.FunctionType)
+ log.Printf(err.Error())
+ return err
+ }
+
+ valueSpecs := make(map[string]*protocol.FromFunction_PersistedValueSpec, len(spec.States))
+
+ for _, state := range spec.States {
+ log.Printf("registering state specification %v\n", state)
+ if err := validateValueSpec(state); err != nil {
+ err := fmt.Errorf("failed to register Stateful Function %s: %w", spec.FunctionType, err)
+ log.Printf(err.Error())
+ return err
+ }
+
+ expiration := &protocol.FromFunction_ExpirationSpec{}
+ switch state.Expiration.expirationType {
+ case none:
+ expiration.Mode = protocol.FromFunction_ExpirationSpec_NONE
+ case expireAfterWrite:
+ expiration.Mode = protocol.FromFunction_ExpirationSpec_AFTER_WRITE
+ expiration.ExpireAfterMillis = state.Expiration.duration.Milliseconds()
+ case expireAfterCall:
+ expiration.Mode = protocol.FromFunction_ExpirationSpec_AFTER_INVOKE
+ expiration.ExpireAfterMillis = state.Expiration.duration.Milliseconds()
+ }
+
+ valueSpecs[state.Name] = &protocol.FromFunction_PersistedValueSpec{
+ StateName: state.Name,
+ ExpirationSpec: expiration,
+ TypeTypename: state.ValueType.GetTypeName().String(),
+ }
+ }
+
+ h.module[spec.FunctionType] = spec.Function
+ h.stateSpecs[spec.FunctionType] = valueSpecs
+
+ return nil
+}
+
+func (h *handler) AsHandler() RequestReplyHandler {
+ return h
+}
+
+func (h *handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
+ if request.Method != "POST" {
+ http.Error(writer, "invalid request method", http.StatusMethodNotAllowed)
+ return
+ }
+
+ contentType := request.Header.Get("Content-type")
+ if contentType != "" && contentType != "application/octet-stream" {
+ http.Error(writer, "invalid content type", http.StatusUnsupportedMediaType)
+ return
+ }
+
+ if request.Body == nil || request.ContentLength == 0 {
+ http.Error(writer, "empty request body", http.StatusBadRequest)
+ return
+ }
+
+ buffer := bytes.Buffer{}
+ if _, err := buffer.ReadFrom(request.Body); err != nil {
+ http.Error(writer, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ response, err := h.Invoke(request.Context(), buffer.Bytes())
+ if err != nil {
+ log.Printf(err.Error())
+ http.Error(writer, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ _, _ = writer.Write(response)
+}
+
+func (h *handler) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
+ toFunction := protocol.ToFunction{}
+ if err := proto.Unmarshal(payload, &toFunction); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal ToFunction: %w", err)
+ }
+
+ fromFunction, err := h.invoke(ctx, &toFunction)
+ if err != nil {
+ return nil, err
+ }
+
+ return proto.Marshal(fromFunction)
+}
+
+func (h *handler) invoke(ctx context.Context, toFunction *protocol.ToFunction) (from *protocol.FromFunction, err error) {
+ batch := toFunction.GetInvocation()
+ self := addressFromInternal(batch.Target)
+ function, exists := h.module[self.FunctionType]
+
+ defer func() {
+ if r := recover(); r != nil {
+ switch r := r.(type) {
+ case error:
+ err = fmt.Errorf("failed to execute invocation for %s: %w", batch.Target, r)
+ default:
+ log.Fatal(r)
+ }
+ }
+ }()
+
+ if !exists {
+ return nil, fmt.Errorf("unknown function type %s", self.FunctionType)
+ }
+
+ storageFactory := newStorageFactory(batch, h.stateSpecs[self.FunctionType])
+
+ if missing := storageFactory.getMissingSpecs(); missing != nil {
+ log.Printf("missing state specs for function type %v", self)
+ for _, spec := range missing {
+ log.Printf("registering missing specs %v", spec)
+ }
+ return &protocol.FromFunction{
+ Response: &protocol.FromFunction_IncompleteInvocationContext_{
+ IncompleteInvocationContext: &protocol.FromFunction_IncompleteInvocationContext{
+ MissingValues: missing,
+ },
+ },
+ }, nil
+ }
+
+ storage := storageFactory.getStorage()
+ response := &protocol.FromFunction_InvocationResponse{}
+
+ for _, invocation := range batch.Invocations {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ sContext := statefunContext{
+ self: self,
+ storage: storage,
+ response: response,
+ }
+
+ var cancel context.CancelFunc
+ sContext.Context, cancel = context.WithCancel(ctx)
+
+ var caller Address
+ if invocation.Caller != nil {
+ caller = addressFromInternal(invocation.Caller)
+ }
+ sContext.caller = &caller
+ msg := Message{
+ target: batch.Target,
+ typedValue: invocation.Argument,
+ }
+ err = function.Invoke(&sContext, msg)
+ cancel()
+
+ if err != nil {
+ return
+ }
+ }
+ }
+
+ response.StateMutations = storage.getStateMutations()
+ from = &protocol.FromFunction{
+ Response: &protocol.FromFunction_InvocationResult{
+ InvocationResult: response,
+ },
+ }
+
+ return
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/cell.go b/statefun-sdk-go/v3/pkg/statefun/internal/cell.go
new file mode 100644
index 0000000..f3479c8
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/cell.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "io"
+)
+
+// smallBufferSize is an initial allocation minimal capacity.
+// this is the same constant as used in bytes.Buffer.
+const smallBufferSize = 64
+
+// Cell is a mutable, persisted value.
+// This struct is not thread safe.
+type Cell struct {
+ buf []byte // contents are the bytes buf[off : len(buf)].
+ off int // read at &buf[off], writes always begin from buf[0]
+ mutated bool // tracker if the cell has been mutated
+ hasValue bool // tracker if the cell has a valid value
+ typeTypeName string // the typename of the type whose serialized contents are stored in the cell
+}
+
+// NewCell creates and initializes a new Cell using state's value as its
+// initial contents. The new Cell takes ownership of state, and the
+// caller should not use state after this call.
+func NewCell(state *protocol.ToFunction_PersistedValue, typeTypeName string) *Cell {
+ c := &Cell{
+ typeTypeName: typeTypeName,
+ }
+
+ if state.StateValue != nil {
+ c.hasValue = true
+ c.buf = state.StateValue.Value
+ }
+
+ return c
+}
+
+// SeekToBeginning resets the cell so the next
+// read starts from the beginning of the underlying
+// buffer, regardless of where the last read left off
+func (c *Cell) SeekToBeginning() {
+ c.off = 0
+}
+
+// Read reads up to len(p) bytes into p. It returns the number of bytes
+// read (0 <= n <= len(p)) and any error encountered. Read is resumable
+// and returns EOF when there are no more bytes to read. This behavior
+// is required for Cell to interoperate with the go standard library.
+// Users of Cell are required to call SeekToBeginning, before the first
+// read to ensure reads always begin at the start of the buffer.
+func (c *Cell) Read(p []byte) (n int, err error) {
+ if c.empty() {
+ if len(p) == 0 {
+ return 0, nil
+ }
+ return 0, io.EOF
+ }
+ n = copy(p, c.buf[c.off:])
+ c.off += n
+ return n, nil
+}
+
+// Write writes the given slice into the cell.
+// Unlike standard implementations of Write,
+// cells always overwrite any existing data.
+func (c *Cell) Write(p []byte) (n int, err error) {
+ c.mutated = true
+ c.hasValue = true
+
+ c.sizeBufferForCapacity(len(p))
+
+ c.off = 0
+ return copy(c.buf, p), nil
+}
+
+// sizeBufferForCapacity resizes the buffer to guarantee space for n bytes,
+// attempting to first re-slice the underlying buffer to avoid allocations.
+func (c *Cell) sizeBufferForCapacity(n int) {
+ if c.buf != nil && n <= cap(c.buf) {
+ c.buf = c.buf[:n]
+ } else if n <= smallBufferSize {
+ c.buf = make([]byte, n, smallBufferSize)
+ } else {
+ c.buf = make([]byte, n)
+ }
+}
+
+// Delete marks the value to be deleted and resets the buffer to be empty,
+// but it retains the underlying storage for use by future writes.
+func (c *Cell) Delete() {
+ c.mutated = true
+ c.hasValue = false
+ c.buf = c.buf[:0]
+ c.off = 0
+}
+
+// HasValue returns true if the cell contains a valid value,
+// if the value is false, calls to Read will consume 0 bytes
+func (c *Cell) HasValue() bool {
+ return c.hasValue
+}
+
+// GetStateMutation turns the final Cell into a FromFunction_PersistedValueMutation.
+// The new FromFunction_PersistedValueMutation takes ownership of the underlying
+// buffer and the cell should not be used after this function returns.
+func (c *Cell) GetStateMutation(name string) *protocol.FromFunction_PersistedValueMutation {
+ if !c.mutated {
+ return nil
+ }
+
+ mutation := &protocol.FromFunction_PersistedValueMutation{
+ MutationType: protocol.FromFunction_PersistedValueMutation_DELETE,
+ StateName: name,
+ }
+
+ if c.hasValue {
+ mutation.MutationType = protocol.FromFunction_PersistedValueMutation_MODIFY
+
+ mutation.StateValue = &protocol.TypedValue{
+ Typename: c.typeTypeName,
+ HasValue: true,
+ Value: c.buf,
+ }
+ }
+
+ return mutation
+}
+
+// empty reports whether the unread portion of the buffer is empty.
+func (c *Cell) empty() bool { return len(c.buf) <= c.off }
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go b/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go
new file mode 100644
index 0000000..198c74a
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/cell_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "testing"
+)
+
+func TestCellReadWrite(t *testing.T) {
+ cell := NewCell(&protocol.ToFunction_PersistedValue{
+ StateName: "state",
+ }, "typename")
+
+ data := []byte{0, 1, 0, 1}
+ n, err := cell.Write(data)
+
+ assert.NoError(t, err, "unexpected error writing data")
+ assert.Equal(t, len(data), n, "unexpected number of bytes written")
+
+ read := make([]byte, 4)
+
+ cell.SeekToBeginning()
+ n, err = cell.Read(read)
+
+ assert.NoError(t, err, "unexpected error reading data")
+ assert.Equal(t, len(read), n, "unexpected number of bytes read")
+
+ assert.Equal(t, data, read, "unexpected bytes read from cell")
+
+ cell.SeekToBeginning()
+ allBytes, _ := io.ReadAll(cell)
+
+ assert.Equal(t, len(allBytes), n, "unexpected number of bytes read")
+ assert.Equal(t, data, allBytes, "unexpected bytes read from cell")
+
+ secondRead := make([]byte, 4)
+
+ cell.SeekToBeginning()
+ n, err = cell.Read(secondRead)
+
+ assert.NoError(t, err, "unexpected error reading data")
+ assert.Equal(t, len(secondRead), n, "unexpected number of bytes read")
+
+ assert.Equal(t, data, secondRead, "unexpected bytes read from cell")
+
+ cell.SeekToBeginning()
+ allBytes, _ = io.ReadAll(cell)
+
+ assert.Equal(t, len(allBytes), n, "unexpected number of bytes read")
+ assert.Equal(t, data, allBytes, "unexpected bytes read from cell")
+}
+
+func TestCellReadWriteFromInitial(t *testing.T) {
+ initial := []byte{0, 1, 0, 1}
+ cell := NewCell(&protocol.ToFunction_PersistedValue{
+ StateName: "state",
+ StateValue: &protocol.TypedValue{
+ Typename: "typename",
+ HasValue: true,
+ Value: initial,
+ },
+ }, "typename")
+
+ read := make([]byte, 4)
+ n, err := cell.Read(read)
+
+ assert.NoError(t, err, "unexpected error reading data")
+ assert.Equal(t, len(read), n, "unexpected number of bytes read")
+
+ assert.Equal(t, initial, read, "unexpected bytes read from cell")
+
+ data := []byte{0, 0, 0, 1, 1, 1}
+ n, err = cell.Write(data)
+
+ assert.NoError(t, err, "unexpected error writing data")
+ assert.Equal(t, len(data), n, "unexpected number of bytes written")
+
+ secondRead := make([]byte, 6)
+ n, err = cell.Read(secondRead)
+
+ assert.NoError(t, err, "unexpected error reading data")
+ assert.Equal(t, len(secondRead), n, "unexpected number of bytes read")
+
+ assert.Equal(t, data, secondRead, "unexpected bytes read from cell")
+}
+
+func TestCell_EmptyWithNoValue(t *testing.T) {
+ initial := []byte{0, 1, 0, 1}
+ cell := NewCell(&protocol.ToFunction_PersistedValue{
+ StateName: "state",
+ StateValue: &protocol.TypedValue{
+ Typename: "typename",
+ HasValue: true,
+ Value: initial,
+ },
+ }, "typename")
+
+ assert.True(t, cell.HasValue())
+ assert.False(t, cell.empty(), "cells with a value should not be empty before any read")
+
+ cell.Delete()
+ assert.False(t, cell.HasValue(), "cells that have been deleted should not have values")
+ assert.True(t, cell.empty(), "cells that have been deleted should be empty")
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go
new file mode 100644
index 0000000..7beb156
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kafka-egress.pb.go
@@ -0,0 +1,183 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v3.14.0
+// source: kafka-egress.proto
+
+package protocol
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type KafkaProducerRecord struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ ValueBytes []byte `protobuf:"bytes,2,opt,name=value_bytes,json=valueBytes,proto3" json:"value_bytes,omitempty"`
+ Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
+}
+
+func (x *KafkaProducerRecord) Reset() {
+ *x = KafkaProducerRecord{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_kafka_egress_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *KafkaProducerRecord) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*KafkaProducerRecord) ProtoMessage() {}
+
+func (x *KafkaProducerRecord) ProtoReflect() protoreflect.Message {
+ mi := &file_kafka_egress_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use KafkaProducerRecord.ProtoReflect.Descriptor instead.
+func (*KafkaProducerRecord) Descriptor() ([]byte, []int) {
+ return file_kafka_egress_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *KafkaProducerRecord) GetKey() string {
+ if x != nil {
+ return x.Key
+ }
+ return ""
+}
+
+func (x *KafkaProducerRecord) GetValueBytes() []byte {
+ if x != nil {
+ return x.ValueBytes
+ }
+ return nil
+}
+
+func (x *KafkaProducerRecord) GetTopic() string {
+ if x != nil {
+ return x.Topic
+ }
+ return ""
+}
+
+var File_kafka_egress_proto protoreflect.FileDescriptor
+
+var file_kafka_egress_proto_rawDesc = []byte{
+ 0x0a, 0x12, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+ 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5e, 0x0a, 0x13,
+ 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x52, 0x65, 0x63,
+ 0x6f, 0x72, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x62,
+ 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18,
+ 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x3e, 0x0a, 0x2e,
+ 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b,
+ 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67,
+ 0x72, 0x65, 0x73, 0x73, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01,
+ 0x5a, 0x0a, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_kafka_egress_proto_rawDescOnce sync.Once
+ file_kafka_egress_proto_rawDescData = file_kafka_egress_proto_rawDesc
+)
+
+func file_kafka_egress_proto_rawDescGZIP() []byte {
+ file_kafka_egress_proto_rawDescOnce.Do(func() {
+ file_kafka_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_kafka_egress_proto_rawDescData)
+ })
+ return file_kafka_egress_proto_rawDescData
+}
+
+var file_kafka_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_kafka_egress_proto_goTypes = []interface{}{
+ (*KafkaProducerRecord)(nil), // 0: io.statefun.sdk.egress.KafkaProducerRecord
+}
+var file_kafka_egress_proto_depIdxs = []int32{
+ 0, // [0:0] is the sub-list for method output_type
+ 0, // [0:0] is the sub-list for method input_type
+ 0, // [0:0] is the sub-list for extension type_name
+ 0, // [0:0] is the sub-list for extension extendee
+ 0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_kafka_egress_proto_init() }
+func file_kafka_egress_proto_init() {
+ if File_kafka_egress_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_kafka_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*KafkaProducerRecord); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_kafka_egress_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 1,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_kafka_egress_proto_goTypes,
+ DependencyIndexes: file_kafka_egress_proto_depIdxs,
+ MessageInfos: file_kafka_egress_proto_msgTypes,
+ }.Build()
+ File_kafka_egress_proto = out.File
+ file_kafka_egress_proto_rawDesc = nil
+ file_kafka_egress_proto_goTypes = nil
+ file_kafka_egress_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go
new file mode 100644
index 0000000..078987d
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/kinesis-egress.pb.go
@@ -0,0 +1,195 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v3.14.0
+// source: kinesis-egress.proto
+
+package protocol
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type KinesisEgressRecord struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ PartitionKey string `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"`
+ ValueBytes []byte `protobuf:"bytes,2,opt,name=value_bytes,json=valueBytes,proto3" json:"value_bytes,omitempty"`
+ Stream string `protobuf:"bytes,3,opt,name=stream,proto3" json:"stream,omitempty"`
+ ExplicitHashKey string `protobuf:"bytes,4,opt,name=explicit_hash_key,json=explicitHashKey,proto3" json:"explicit_hash_key,omitempty"`
+}
+
+func (x *KinesisEgressRecord) Reset() {
+ *x = KinesisEgressRecord{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_kinesis_egress_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *KinesisEgressRecord) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*KinesisEgressRecord) ProtoMessage() {}
+
+func (x *KinesisEgressRecord) ProtoReflect() protoreflect.Message {
+ mi := &file_kinesis_egress_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use KinesisEgressRecord.ProtoReflect.Descriptor instead.
+func (*KinesisEgressRecord) Descriptor() ([]byte, []int) {
+ return file_kinesis_egress_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *KinesisEgressRecord) GetPartitionKey() string {
+ if x != nil {
+ return x.PartitionKey
+ }
+ return ""
+}
+
+func (x *KinesisEgressRecord) GetValueBytes() []byte {
+ if x != nil {
+ return x.ValueBytes
+ }
+ return nil
+}
+
+func (x *KinesisEgressRecord) GetStream() string {
+ if x != nil {
+ return x.Stream
+ }
+ return ""
+}
+
+func (x *KinesisEgressRecord) GetExplicitHashKey() string {
+ if x != nil {
+ return x.ExplicitHashKey
+ }
+ return ""
+}
+
+var File_kinesis_egress_proto protoreflect.FileDescriptor
+
+var file_kinesis_egress_proto_rawDesc = []byte{
+ 0x0a, 0x14, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x2d, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65,
+ 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x22, 0x9f,
+ 0x01, 0x0a, 0x13, 0x4b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73,
+ 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74,
+ 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70,
+ 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x76,
+ 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
+ 0x52, 0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x16, 0x0a, 0x06,
+ 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74,
+ 0x72, 0x65, 0x61, 0x6d, 0x12, 0x2a, 0x0a, 0x11, 0x65, 0x78, 0x70, 0x6c, 0x69, 0x63, 0x69, 0x74,
+ 0x5f, 0x68, 0x61, 0x73, 0x68, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x0f, 0x65, 0x78, 0x70, 0x6c, 0x69, 0x63, 0x69, 0x74, 0x48, 0x61, 0x73, 0x68, 0x4b, 0x65, 0x79,
+ 0x42, 0x3e, 0x0a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66,
+ 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64,
+ 0x6b, 0x2e, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74,
+ 0x65, 0x64, 0x50, 0x01, 0x5a, 0x0a, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
+ 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_kinesis_egress_proto_rawDescOnce sync.Once
+ file_kinesis_egress_proto_rawDescData = file_kinesis_egress_proto_rawDesc
+)
+
+func file_kinesis_egress_proto_rawDescGZIP() []byte {
+ file_kinesis_egress_proto_rawDescOnce.Do(func() {
+ file_kinesis_egress_proto_rawDescData = protoimpl.X.CompressGZIP(file_kinesis_egress_proto_rawDescData)
+ })
+ return file_kinesis_egress_proto_rawDescData
+}
+
+var file_kinesis_egress_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_kinesis_egress_proto_goTypes = []interface{}{
+ (*KinesisEgressRecord)(nil), // 0: io.statefun.sdk.egress.KinesisEgressRecord
+}
+var file_kinesis_egress_proto_depIdxs = []int32{
+ 0, // [0:0] is the sub-list for method output_type
+ 0, // [0:0] is the sub-list for method input_type
+ 0, // [0:0] is the sub-list for extension type_name
+ 0, // [0:0] is the sub-list for extension extendee
+ 0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_kinesis_egress_proto_init() }
+func file_kinesis_egress_proto_init() {
+ if File_kinesis_egress_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_kinesis_egress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*KinesisEgressRecord); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_kinesis_egress_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 1,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_kinesis_egress_proto_goTypes,
+ DependencyIndexes: file_kinesis_egress_proto_depIdxs,
+ MessageInfos: file_kinesis_egress_proto_msgTypes,
+ }.Build()
+ File_kinesis_egress_proto = out.File
+ file_kinesis_egress_proto_rawDesc = nil
+ file_kinesis_egress_proto_goTypes = nil
+ file_kinesis_egress_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go
new file mode 100644
index 0000000..ef5085d
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/request-reply.pb.go
@@ -0,0 +1,1602 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
+// source: request-reply.proto
+
+package protocol
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type FromFunction_PersistedValueMutation_MutationType int32
+
+const (
+ FromFunction_PersistedValueMutation_DELETE FromFunction_PersistedValueMutation_MutationType = 0
+ FromFunction_PersistedValueMutation_MODIFY FromFunction_PersistedValueMutation_MutationType = 1
+)
+
+// Enum value maps for FromFunction_PersistedValueMutation_MutationType.
+var (
+ FromFunction_PersistedValueMutation_MutationType_name = map[int32]string{
+ 0: "DELETE",
+ 1: "MODIFY",
+ }
+ FromFunction_PersistedValueMutation_MutationType_value = map[string]int32{
+ "DELETE": 0,
+ "MODIFY": 1,
+ }
+)
+
+func (x FromFunction_PersistedValueMutation_MutationType) Enum() *FromFunction_PersistedValueMutation_MutationType {
+ p := new(FromFunction_PersistedValueMutation_MutationType)
+ *p = x
+ return p
+}
+
+func (x FromFunction_PersistedValueMutation_MutationType) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FromFunction_PersistedValueMutation_MutationType) Descriptor() protoreflect.EnumDescriptor {
+ return file_request_reply_proto_enumTypes[0].Descriptor()
+}
+
+func (FromFunction_PersistedValueMutation_MutationType) Type() protoreflect.EnumType {
+ return &file_request_reply_proto_enumTypes[0]
+}
+
+func (x FromFunction_PersistedValueMutation_MutationType) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueMutation_MutationType.Descriptor instead.
+func (FromFunction_PersistedValueMutation_MutationType) EnumDescriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 0, 0}
+}
+
+type FromFunction_ExpirationSpec_ExpireMode int32
+
+const (
+ FromFunction_ExpirationSpec_NONE FromFunction_ExpirationSpec_ExpireMode = 0
+ FromFunction_ExpirationSpec_AFTER_WRITE FromFunction_ExpirationSpec_ExpireMode = 1
+ FromFunction_ExpirationSpec_AFTER_INVOKE FromFunction_ExpirationSpec_ExpireMode = 2
+)
+
+// Enum value maps for FromFunction_ExpirationSpec_ExpireMode.
+var (
+ FromFunction_ExpirationSpec_ExpireMode_name = map[int32]string{
+ 0: "NONE",
+ 1: "AFTER_WRITE",
+ 2: "AFTER_INVOKE",
+ }
+ FromFunction_ExpirationSpec_ExpireMode_value = map[string]int32{
+ "NONE": 0,
+ "AFTER_WRITE": 1,
+ "AFTER_INVOKE": 2,
+ }
+)
+
+func (x FromFunction_ExpirationSpec_ExpireMode) Enum() *FromFunction_ExpirationSpec_ExpireMode {
+ p := new(FromFunction_ExpirationSpec_ExpireMode)
+ *p = x
+ return p
+}
+
+func (x FromFunction_ExpirationSpec_ExpireMode) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FromFunction_ExpirationSpec_ExpireMode) Descriptor() protoreflect.EnumDescriptor {
+ return file_request_reply_proto_enumTypes[1].Descriptor()
+}
+
+func (FromFunction_ExpirationSpec_ExpireMode) Type() protoreflect.EnumType {
+ return &file_request_reply_proto_enumTypes[1]
+}
+
+func (x FromFunction_ExpirationSpec_ExpireMode) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FromFunction_ExpirationSpec_ExpireMode.Descriptor instead.
+func (FromFunction_ExpirationSpec_ExpireMode) EnumDescriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 5, 0}
+}
+
+// An Address is the unique identity of an individual StatefulFunction, containing
+// a function's type and an unique identifier within the type. The function's
+// type denotes the "class" of function to invoke, while the unique identifier addresses the
+// invocation to a specific function instance.
+type Address struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
+ Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"`
+ Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
+}
+
+func (x *Address) Reset() {
+ *x = Address{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Address) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Address) ProtoMessage() {}
+
+func (x *Address) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Address.ProtoReflect.Descriptor instead.
+func (*Address) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Address) GetNamespace() string {
+ if x != nil {
+ return x.Namespace
+ }
+ return ""
+}
+
+func (x *Address) GetType() string {
+ if x != nil {
+ return x.Type
+ }
+ return ""
+}
+
+func (x *Address) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+type TypedValue struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Typename string `protobuf:"bytes,1,opt,name=typename,proto3" json:"typename,omitempty"`
+ // has_value is set to differentiate a zero length value bytes explicitly set,
+ // or a non existing value.
+ HasValue bool `protobuf:"varint,2,opt,name=has_value,json=hasValue,proto3" json:"has_value,omitempty"`
+ Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *TypedValue) Reset() {
+ *x = TypedValue{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *TypedValue) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TypedValue) ProtoMessage() {}
+
+func (x *TypedValue) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use TypedValue.ProtoReflect.Descriptor instead.
+func (*TypedValue) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *TypedValue) GetTypename() string {
+ if x != nil {
+ return x.Typename
+ }
+ return ""
+}
+
+func (x *TypedValue) GetHasValue() bool {
+ if x != nil {
+ return x.HasValue
+ }
+ return false
+}
+
+func (x *TypedValue) GetValue() []byte {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
+// The following section contains all the message types that are sent
+// from Flink to a remote function.
+type ToFunction struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Types that are assignable to Request:
+ // *ToFunction_Invocation_
+ Request isToFunction_Request `protobuf_oneof:"request"`
+}
+
+func (x *ToFunction) Reset() {
+ *x = ToFunction{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ToFunction) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction) ProtoMessage() {}
+
+func (x *ToFunction) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction.ProtoReflect.Descriptor instead.
+func (*ToFunction) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{2}
+}
+
+func (m *ToFunction) GetRequest() isToFunction_Request {
+ if m != nil {
+ return m.Request
+ }
+ return nil
+}
+
+func (x *ToFunction) GetInvocation() *ToFunction_InvocationBatchRequest {
+ if x, ok := x.GetRequest().(*ToFunction_Invocation_); ok {
+ return x.Invocation
+ }
+ return nil
+}
+
+type isToFunction_Request interface {
+ isToFunction_Request()
+}
+
+type ToFunction_Invocation_ struct {
+ Invocation *ToFunction_InvocationBatchRequest `protobuf:"bytes,100,opt,name=invocation,proto3,oneof"`
+}
+
+func (*ToFunction_Invocation_) isToFunction_Request() {}
+
+// The following section contains messages sent from a remote function back to Flink.
+type FromFunction struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Response sent from the function, as a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.
+ // It can be one of the following types:
+ // - io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+ // - io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+ //
+ // Types that are assignable to Response:
+ // *FromFunction_InvocationResult
+ // *FromFunction_IncompleteInvocationContext_
+ Response isFromFunction_Response `protobuf_oneof:"response"`
+}
+
+func (x *FromFunction) Reset() {
+ *x = FromFunction{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction) ProtoMessage() {}
+
+func (x *FromFunction) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction.ProtoReflect.Descriptor instead.
+func (*FromFunction) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3}
+}
+
+func (m *FromFunction) GetResponse() isFromFunction_Response {
+ if m != nil {
+ return m.Response
+ }
+ return nil
+}
+
+func (x *FromFunction) GetInvocationResult() *FromFunction_InvocationResponse {
+ if x, ok := x.GetResponse().(*FromFunction_InvocationResult); ok {
+ return x.InvocationResult
+ }
+ return nil
+}
+
+func (x *FromFunction) GetIncompleteInvocationContext() *FromFunction_IncompleteInvocationContext {
+ if x, ok := x.GetResponse().(*FromFunction_IncompleteInvocationContext_); ok {
+ return x.IncompleteInvocationContext
+ }
+ return nil
+}
+
+type isFromFunction_Response interface {
+ isFromFunction_Response()
+}
+
+type FromFunction_InvocationResult struct {
+ InvocationResult *FromFunction_InvocationResponse `protobuf:"bytes,100,opt,name=invocation_result,json=invocationResult,proto3,oneof"`
+}
+
+type FromFunction_IncompleteInvocationContext_ struct {
+ IncompleteInvocationContext *FromFunction_IncompleteInvocationContext `protobuf:"bytes,101,opt,name=incomplete_invocation_context,json=incompleteInvocationContext,proto3,oneof"`
+}
+
+func (*FromFunction_InvocationResult) isFromFunction_Response() {}
+
+func (*FromFunction_IncompleteInvocationContext_) isFromFunction_Response() {}
+
+// PersistedValue represents a PersistedValue's value that is managed by Flink on behalf of a remote function.
+type ToFunction_PersistedValue struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The unique name of the persisted state.
+ StateName string `protobuf:"bytes,1,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+ // The serialized state value
+ StateValue *TypedValue `protobuf:"bytes,2,opt,name=state_value,json=stateValue,proto3" json:"state_value,omitempty"`
+}
+
+func (x *ToFunction_PersistedValue) Reset() {
+ *x = ToFunction_PersistedValue{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ToFunction_PersistedValue) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_PersistedValue) ProtoMessage() {}
+
+func (x *ToFunction_PersistedValue) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_PersistedValue.ProtoReflect.Descriptor instead.
+func (*ToFunction_PersistedValue) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{2, 0}
+}
+
+func (x *ToFunction_PersistedValue) GetStateName() string {
+ if x != nil {
+ return x.StateName
+ }
+ return ""
+}
+
+func (x *ToFunction_PersistedValue) GetStateValue() *TypedValue {
+ if x != nil {
+ return x.StateValue
+ }
+ return nil
+}
+
+// Invocation represents a remote function call, it associated with an (optional) return address,
+// and an argument.
+type ToFunction_Invocation struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The address of the function that requested the invocation (possibly absent)
+ Caller *Address `protobuf:"bytes,1,opt,name=caller,proto3" json:"caller,omitempty"`
+ // The invocation argument (aka the message sent to the target function)
+ Argument *TypedValue `protobuf:"bytes,2,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *ToFunction_Invocation) Reset() {
+ *x = ToFunction_Invocation{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ToFunction_Invocation) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_Invocation) ProtoMessage() {}
+
+func (x *ToFunction_Invocation) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_Invocation.ProtoReflect.Descriptor instead.
+func (*ToFunction_Invocation) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{2, 1}
+}
+
+func (x *ToFunction_Invocation) GetCaller() *Address {
+ if x != nil {
+ return x.Caller
+ }
+ return nil
+}
+
+func (x *ToFunction_Invocation) GetArgument() *TypedValue {
+ if x != nil {
+ return x.Argument
+ }
+ return nil
+}
+
+// InvocationBatchRequest represents a request to invoke a remote function. It is always associated with a target
+// address (the function to invoke), and a list of values for registered state.
+type ToFunction_InvocationBatchRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The address of the function to invoke
+ Target *Address `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
+ // A list of PersistedValues that were registered as a persisted state.
+ State []*ToFunction_PersistedValue `protobuf:"bytes,2,rep,name=state,proto3" json:"state,omitempty"`
+ // A non empty (at least one) list of invocations
+ Invocations []*ToFunction_Invocation `protobuf:"bytes,3,rep,name=invocations,proto3" json:"invocations,omitempty"`
+}
+
+func (x *ToFunction_InvocationBatchRequest) Reset() {
+ *x = ToFunction_InvocationBatchRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ToFunction_InvocationBatchRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ToFunction_InvocationBatchRequest) ProtoMessage() {}
+
+func (x *ToFunction_InvocationBatchRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ToFunction_InvocationBatchRequest.ProtoReflect.Descriptor instead.
+func (*ToFunction_InvocationBatchRequest) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{2, 2}
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetTarget() *Address {
+ if x != nil {
+ return x.Target
+ }
+ return nil
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetState() []*ToFunction_PersistedValue {
+ if x != nil {
+ return x.State
+ }
+ return nil
+}
+
+func (x *ToFunction_InvocationBatchRequest) GetInvocations() []*ToFunction_Invocation {
+ if x != nil {
+ return x.Invocations
+ }
+ return nil
+}
+
+// MutatePersistedValueCommand represents a command sent from a remote function to Flink,
+// requesting a change to a persisted value.
+type FromFunction_PersistedValueMutation struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ MutationType FromFunction_PersistedValueMutation_MutationType `protobuf:"varint,1,opt,name=mutation_type,json=mutationType,proto3,enum=io.statefun.sdk.reqreply.FromFunction_PersistedValueMutation_MutationType" json:"mutation_type,omitempty"`
+ StateName string `protobuf:"bytes,2,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+ StateValue *TypedValue `protobuf:"bytes,3,opt,name=state_value,json=stateValue,proto3" json:"state_value,omitempty"`
+}
+
+func (x *FromFunction_PersistedValueMutation) Reset() {
+ *x = FromFunction_PersistedValueMutation{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_PersistedValueMutation) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_PersistedValueMutation) ProtoMessage() {}
+
+func (x *FromFunction_PersistedValueMutation) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[7]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueMutation.ProtoReflect.Descriptor instead.
+func (*FromFunction_PersistedValueMutation) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 0}
+}
+
+func (x *FromFunction_PersistedValueMutation) GetMutationType() FromFunction_PersistedValueMutation_MutationType {
+ if x != nil {
+ return x.MutationType
+ }
+ return FromFunction_PersistedValueMutation_DELETE
+}
+
+func (x *FromFunction_PersistedValueMutation) GetStateName() string {
+ if x != nil {
+ return x.StateName
+ }
+ return ""
+}
+
+func (x *FromFunction_PersistedValueMutation) GetStateValue() *TypedValue {
+ if x != nil {
+ return x.StateValue
+ }
+ return nil
+}
+
+// Invocation represents a remote function call, it associated with a (mandatory) target address,
+// and an argument.
+type FromFunction_Invocation struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The target function to invoke
+ Target *Address `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
+ // The invocation argument (aka the message sent to the target function)
+ Argument *TypedValue `protobuf:"bytes,2,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_Invocation) Reset() {
+ *x = FromFunction_Invocation{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[8]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_Invocation) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_Invocation) ProtoMessage() {}
+
+func (x *FromFunction_Invocation) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[8]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_Invocation.ProtoReflect.Descriptor instead.
+func (*FromFunction_Invocation) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 1}
+}
+
+func (x *FromFunction_Invocation) GetTarget() *Address {
+ if x != nil {
+ return x.Target
+ }
+ return nil
+}
+
+func (x *FromFunction_Invocation) GetArgument() *TypedValue {
+ if x != nil {
+ return x.Argument
+ }
+ return nil
+}
+
+// DelayedInvocation represents a delayed remote function call with a target address, an argument
+// and a delay in milliseconds, after which this message to be sent.
+type FromFunction_DelayedInvocation struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // a boolean value (default false) that indicates rather this is a regular delayed message, or (true) a message
+ // cancellation request.
+ // in case of a regular delayed message all other fields are expected to be preset, otherwise only the
+ // cancellation_token is expected
+ IsCancellationRequest bool `protobuf:"varint,10,opt,name=is_cancellation_request,json=isCancellationRequest,proto3" json:"is_cancellation_request,omitempty"`
+ // an optional cancellation token that can be used to request the "unsending" of a delayed message.
+ CancellationToken string `protobuf:"bytes,11,opt,name=cancellation_token,json=cancellationToken,proto3" json:"cancellation_token,omitempty"`
+ // the amount of milliseconds to wait before sending this message
+ DelayInMs int64 `protobuf:"varint,1,opt,name=delay_in_ms,json=delayInMs,proto3" json:"delay_in_ms,omitempty"`
+ // the target address to send this message to
+ Target *Address `protobuf:"bytes,2,opt,name=target,proto3" json:"target,omitempty"`
+ // the invocation argument
+ Argument *TypedValue `protobuf:"bytes,3,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_DelayedInvocation) Reset() {
+ *x = FromFunction_DelayedInvocation{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[9]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_DelayedInvocation) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_DelayedInvocation) ProtoMessage() {}
+
+func (x *FromFunction_DelayedInvocation) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[9]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_DelayedInvocation.ProtoReflect.Descriptor instead.
+func (*FromFunction_DelayedInvocation) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 2}
+}
+
+func (x *FromFunction_DelayedInvocation) GetIsCancellationRequest() bool {
+ if x != nil {
+ return x.IsCancellationRequest
+ }
+ return false
+}
+
+func (x *FromFunction_DelayedInvocation) GetCancellationToken() string {
+ if x != nil {
+ return x.CancellationToken
+ }
+ return ""
+}
+
+func (x *FromFunction_DelayedInvocation) GetDelayInMs() int64 {
+ if x != nil {
+ return x.DelayInMs
+ }
+ return 0
+}
+
+func (x *FromFunction_DelayedInvocation) GetTarget() *Address {
+ if x != nil {
+ return x.Target
+ }
+ return nil
+}
+
+func (x *FromFunction_DelayedInvocation) GetArgument() *TypedValue {
+ if x != nil {
+ return x.Argument
+ }
+ return nil
+}
+
+// EgressMessage an argument to forward to an egress.
+// An egress is identified by a namespace and type (see EgressIdentifier SDK class).
+// The argument is an io.statefun.sdk.reqreply.TypedValue.
+type FromFunction_EgressMessage struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The target egress namespace
+ EgressNamespace string `protobuf:"bytes,1,opt,name=egress_namespace,json=egressNamespace,proto3" json:"egress_namespace,omitempty"`
+ // The target egress type
+ EgressType string `protobuf:"bytes,2,opt,name=egress_type,json=egressType,proto3" json:"egress_type,omitempty"`
+ // egress argument
+ Argument *TypedValue `protobuf:"bytes,3,opt,name=argument,proto3" json:"argument,omitempty"`
+}
+
+func (x *FromFunction_EgressMessage) Reset() {
+ *x = FromFunction_EgressMessage{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[10]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_EgressMessage) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_EgressMessage) ProtoMessage() {}
+
+func (x *FromFunction_EgressMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[10]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_EgressMessage.ProtoReflect.Descriptor instead.
+func (*FromFunction_EgressMessage) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 3}
+}
+
+func (x *FromFunction_EgressMessage) GetEgressNamespace() string {
+ if x != nil {
+ return x.EgressNamespace
+ }
+ return ""
+}
+
+func (x *FromFunction_EgressMessage) GetEgressType() string {
+ if x != nil {
+ return x.EgressType
+ }
+ return ""
+}
+
+func (x *FromFunction_EgressMessage) GetArgument() *TypedValue {
+ if x != nil {
+ return x.Argument
+ }
+ return nil
+}
+
+// InvocationResponse represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+// it contains a list of state mutation to preform as a result of computing this batch, and a list of outgoing messages.
+type FromFunction_InvocationResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ StateMutations []*FromFunction_PersistedValueMutation `protobuf:"bytes,1,rep,name=state_mutations,json=stateMutations,proto3" json:"state_mutations,omitempty"`
+ OutgoingMessages []*FromFunction_Invocation `protobuf:"bytes,2,rep,name=outgoing_messages,json=outgoingMessages,proto3" json:"outgoing_messages,omitempty"`
+ DelayedInvocations []*FromFunction_DelayedInvocation `protobuf:"bytes,3,rep,name=delayed_invocations,json=delayedInvocations,proto3" json:"delayed_invocations,omitempty"`
+ OutgoingEgresses []*FromFunction_EgressMessage `protobuf:"bytes,4,rep,name=outgoing_egresses,json=outgoingEgresses,proto3" json:"outgoing_egresses,omitempty"`
+}
+
+func (x *FromFunction_InvocationResponse) Reset() {
+ *x = FromFunction_InvocationResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[11]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_InvocationResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_InvocationResponse) ProtoMessage() {}
+
+func (x *FromFunction_InvocationResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[11]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_InvocationResponse.ProtoReflect.Descriptor instead.
+func (*FromFunction_InvocationResponse) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 4}
+}
+
+func (x *FromFunction_InvocationResponse) GetStateMutations() []*FromFunction_PersistedValueMutation {
+ if x != nil {
+ return x.StateMutations
+ }
+ return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetOutgoingMessages() []*FromFunction_Invocation {
+ if x != nil {
+ return x.OutgoingMessages
+ }
+ return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetDelayedInvocations() []*FromFunction_DelayedInvocation {
+ if x != nil {
+ return x.DelayedInvocations
+ }
+ return nil
+}
+
+func (x *FromFunction_InvocationResponse) GetOutgoingEgresses() []*FromFunction_EgressMessage {
+ if x != nil {
+ return x.OutgoingEgresses
+ }
+ return nil
+}
+
+// ExpirationSpec represents TTL (Time-To-Live) configuration for persisted states.
+type FromFunction_ExpirationSpec struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Mode FromFunction_ExpirationSpec_ExpireMode `protobuf:"varint,1,opt,name=mode,proto3,enum=io.statefun.sdk.reqreply.FromFunction_ExpirationSpec_ExpireMode" json:"mode,omitempty"`
+ ExpireAfterMillis int64 `protobuf:"varint,2,opt,name=expire_after_millis,json=expireAfterMillis,proto3" json:"expire_after_millis,omitempty"`
+}
+
+func (x *FromFunction_ExpirationSpec) Reset() {
+ *x = FromFunction_ExpirationSpec{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[12]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_ExpirationSpec) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_ExpirationSpec) ProtoMessage() {}
+
+func (x *FromFunction_ExpirationSpec) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[12]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_ExpirationSpec.ProtoReflect.Descriptor instead.
+func (*FromFunction_ExpirationSpec) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 5}
+}
+
+func (x *FromFunction_ExpirationSpec) GetMode() FromFunction_ExpirationSpec_ExpireMode {
+ if x != nil {
+ return x.Mode
+ }
+ return FromFunction_ExpirationSpec_NONE
+}
+
+func (x *FromFunction_ExpirationSpec) GetExpireAfterMillis() int64 {
+ if x != nil {
+ return x.ExpireAfterMillis
+ }
+ return 0
+}
+
+// PersistedValueSpec represents specifications of a function's persisted value state.
+type FromFunction_PersistedValueSpec struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ StateName string `protobuf:"bytes,1,opt,name=state_name,json=stateName,proto3" json:"state_name,omitempty"`
+ ExpirationSpec *FromFunction_ExpirationSpec `protobuf:"bytes,2,opt,name=expiration_spec,json=expirationSpec,proto3" json:"expiration_spec,omitempty"`
+ TypeTypename string `protobuf:"bytes,3,opt,name=type_typename,json=typeTypename,proto3" json:"type_typename,omitempty"`
+}
+
+func (x *FromFunction_PersistedValueSpec) Reset() {
+ *x = FromFunction_PersistedValueSpec{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[13]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_PersistedValueSpec) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_PersistedValueSpec) ProtoMessage() {}
+
+func (x *FromFunction_PersistedValueSpec) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[13]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_PersistedValueSpec.ProtoReflect.Descriptor instead.
+func (*FromFunction_PersistedValueSpec) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 6}
+}
+
+func (x *FromFunction_PersistedValueSpec) GetStateName() string {
+ if x != nil {
+ return x.StateName
+ }
+ return ""
+}
+
+func (x *FromFunction_PersistedValueSpec) GetExpirationSpec() *FromFunction_ExpirationSpec {
+ if x != nil {
+ return x.ExpirationSpec
+ }
+ return nil
+}
+
+func (x *FromFunction_PersistedValueSpec) GetTypeTypename() string {
+ if x != nil {
+ return x.TypeTypename
+ }
+ return ""
+}
+
+// IncompleteInvocationContext represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest,
+// which should be used as the response if the InvocationBatchRequest provided incomplete information about the
+// invocation, e.g. insufficient state values were provided.
+type FromFunction_IncompleteInvocationContext struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ MissingValues []*FromFunction_PersistedValueSpec `protobuf:"bytes,1,rep,name=missing_values,json=missingValues,proto3" json:"missing_values,omitempty"`
+}
+
+func (x *FromFunction_IncompleteInvocationContext) Reset() {
+ *x = FromFunction_IncompleteInvocationContext{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_request_reply_proto_msgTypes[14]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FromFunction_IncompleteInvocationContext) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FromFunction_IncompleteInvocationContext) ProtoMessage() {}
+
+func (x *FromFunction_IncompleteInvocationContext) ProtoReflect() protoreflect.Message {
+ mi := &file_request_reply_proto_msgTypes[14]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FromFunction_IncompleteInvocationContext.ProtoReflect.Descriptor instead.
+func (*FromFunction_IncompleteInvocationContext) Descriptor() ([]byte, []int) {
+ return file_request_reply_proto_rawDescGZIP(), []int{3, 7}
+}
+
+func (x *FromFunction_IncompleteInvocationContext) GetMissingValues() []*FromFunction_PersistedValueSpec {
+ if x != nil {
+ return x.MissingValues
+ }
+ return nil
+}
+
+var File_request_reply_proto protoreflect.FileDescriptor
+
+var file_request_reply_proto_rawDesc = []byte{
+ 0x0a, 0x13, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2d, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66,
+ 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x22,
+ 0x4b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61,
+ 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e,
+ 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02,
+ 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x5b, 0x0a, 0x0a,
+ 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x79,
+ 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x79,
+ 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x68, 0x61, 0x73, 0x5f, 0x76, 0x61,
+ 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x68, 0x61, 0x73, 0x56, 0x61,
+ 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xee, 0x04, 0x0a, 0x0a, 0x54, 0x6f,
+ 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5d, 0x0a, 0x0a, 0x69, 0x6e, 0x76, 0x6f,
+ 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x69,
+ 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
+ 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69,
+ 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74,
+ 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x69, 0x6e, 0x76,
+ 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x76, 0x0a, 0x0e, 0x50, 0x65, 0x72, 0x73, 0x69,
+ 0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61,
+ 0x74, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73,
+ 0x74, 0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74,
+ 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e,
+ 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e,
+ 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61,
+ 0x6c, 0x75, 0x65, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a,
+ 0x89, 0x01, 0x0a, 0x0a, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39,
+ 0x0a, 0x06, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21,
+ 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b,
+ 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
+ 0x73, 0x52, 0x06, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x40, 0x0a, 0x08, 0x61, 0x72, 0x67,
+ 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x69, 0x6f,
+ 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65,
+ 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75,
+ 0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0xf1, 0x01, 0x0a, 0x16,
+ 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+ 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+ 0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65,
+ 0x74, 0x12, 0x49, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x33, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46, 0x75,
+ 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64,
+ 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x51, 0x0a, 0x0b,
+ 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28,
+ 0x0b, 0x32, 0x2f, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e,
+ 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x6f, 0x46,
+ 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69,
+ 0x6f, 0x6e, 0x52, 0x0b, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42,
+ 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xac, 0x10, 0x0a, 0x0c, 0x46,
+ 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x68, 0x0a, 0x11, 0x69,
+ 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
+ 0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+ 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+ 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49,
+ 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x48, 0x00, 0x52, 0x10, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+ 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x88, 0x01, 0x0a, 0x1d, 0x69, 0x6e, 0x63, 0x6f, 0x6d, 0x70,
+ 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+ 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x65, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e,
+ 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e,
+ 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e,
+ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65,
+ 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78,
+ 0x74, 0x48, 0x00, 0x52, 0x1b, 0x69, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x49,
+ 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74,
+ 0x1a, 0x97, 0x02, 0x0a, 0x16, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64, 0x56, 0x61,
+ 0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x6f, 0x0a, 0x0d, 0x6d,
+ 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0e, 0x32, 0x4a, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e,
+ 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72,
+ 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69,
+ 0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f,
+ 0x6e, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0c,
+ 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1d, 0x0a, 0x0a,
+ 0x73, 0x74, 0x61, 0x74, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x09, 0x73, 0x74, 0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x73,
+ 0x74, 0x61, 0x74, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+ 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x56, 0x61, 0x6c,
+ 0x75, 0x65, 0x22, 0x26, 0x0a, 0x0c, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79,
+ 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x00, 0x12, 0x0a,
+ 0x0a, 0x06, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x59, 0x10, 0x01, 0x1a, 0x89, 0x01, 0x0a, 0x0a, 0x49,
+ 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72,
+ 0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73,
+ 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72,
+ 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61,
+ 0x72, 0x67, 0x65, 0x74, 0x12, 0x40, 0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74,
+ 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c,
+ 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72,
+ 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x97, 0x02, 0x0a, 0x11, 0x44, 0x65, 0x6c, 0x61, 0x79,
+ 0x65, 0x64, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x36, 0x0a, 0x17,
+ 0x69, 0x73, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+ 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x69,
+ 0x73, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61,
+ 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x11, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f,
+ 0x6b, 0x65, 0x6e, 0x12, 0x1e, 0x0a, 0x0b, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x5f, 0x69, 0x6e, 0x5f,
+ 0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x49,
+ 0x6e, 0x4d, 0x73, 0x12, 0x39, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+ 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x41,
+ 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x40,
+ 0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+ 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+ 0x1a, 0x9d, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x6e, 0x61, 0x6d,
+ 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x65, 0x67,
+ 0x72, 0x65, 0x73, 0x73, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1f, 0x0a,
+ 0x0b, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x0a, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x40,
+ 0x0a, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x24, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65,
+ 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x61, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74,
+ 0x1a, 0xaa, 0x03, 0x0a, 0x12, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x74, 0x65,
+ 0x5f, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x3d, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d,
+ 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74,
+ 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
+ 0x0e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
+ 0x5e, 0x0a, 0x11, 0x6f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x69, 0x6f, 0x2e,
+ 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71,
+ 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69,
+ 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x10, 0x6f,
+ 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12,
+ 0x69, 0x0a, 0x13, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x76, 0x6f, 0x63,
+ 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x38, 0x2e, 0x69,
+ 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
+ 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63,
+ 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x49, 0x6e, 0x76, 0x6f,
+ 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x49,
+ 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x61, 0x0a, 0x11, 0x6f, 0x75,
+ 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18,
+ 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65,
+ 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79,
+ 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x67,
+ 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x10, 0x6f, 0x75, 0x74,
+ 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x1a, 0xd1, 0x01,
+ 0x0a, 0x0e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63,
+ 0x12, 0x54, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x40,
+ 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b,
+ 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d, 0x46, 0x75,
+ 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f,
+ 0x6e, 0x53, 0x70, 0x65, 0x63, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x4d, 0x6f, 0x64, 0x65,
+ 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65,
+ 0x5f, 0x61, 0x66, 0x74, 0x65, 0x72, 0x5f, 0x6d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x03, 0x52, 0x11, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x41, 0x66, 0x74, 0x65, 0x72,
+ 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x22, 0x39, 0x0a, 0x0a, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65,
+ 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0f,
+ 0x0a, 0x0b, 0x41, 0x46, 0x54, 0x45, 0x52, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x01, 0x12,
+ 0x10, 0x0a, 0x0c, 0x41, 0x46, 0x54, 0x45, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x4f, 0x4b, 0x45, 0x10,
+ 0x02, 0x1a, 0xb8, 0x01, 0x0a, 0x12, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64, 0x56,
+ 0x61, 0x6c, 0x75, 0x65, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74,
+ 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74,
+ 0x61, 0x74, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x5e, 0x0a, 0x0f, 0x65, 0x78, 0x70, 0x69, 0x72,
+ 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x35, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73,
+ 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46, 0x72, 0x6f, 0x6d,
+ 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74,
+ 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0e, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74,
+ 0x69, 0x6f, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x12, 0x23, 0x0a, 0x0d, 0x74, 0x79, 0x70, 0x65, 0x5f,
+ 0x74, 0x79, 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c,
+ 0x74, 0x79, 0x70, 0x65, 0x54, 0x79, 0x70, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x1a, 0x7f, 0x0a, 0x1b,
+ 0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x49, 0x6e, 0x76, 0x6f, 0x63, 0x61,
+ 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x60, 0x0a, 0x0e, 0x6d,
+ 0x69, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x69, 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75,
+ 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x2e, 0x46,
+ 0x72, 0x6f, 0x6d, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x50, 0x65, 0x72, 0x73,
+ 0x69, 0x73, 0x74, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0d,
+ 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x42, 0x0a, 0x0a,
+ 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x0a, 0x30, 0x6f, 0x72, 0x67,
+ 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74,
+ 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x65, 0x71, 0x72, 0x65,
+ 0x70, 0x6c, 0x79, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01, 0x5a,
+ 0x0a, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_request_reply_proto_rawDescOnce sync.Once
+ file_request_reply_proto_rawDescData = file_request_reply_proto_rawDesc
+)
+
+func file_request_reply_proto_rawDescGZIP() []byte {
+ file_request_reply_proto_rawDescOnce.Do(func() {
+ file_request_reply_proto_rawDescData = protoimpl.X.CompressGZIP(file_request_reply_proto_rawDescData)
+ })
+ return file_request_reply_proto_rawDescData
+}
+
+var file_request_reply_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
+var file_request_reply_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
+var file_request_reply_proto_goTypes = []interface{}{
+ (FromFunction_PersistedValueMutation_MutationType)(0), // 0: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.MutationType
+ (FromFunction_ExpirationSpec_ExpireMode)(0), // 1: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.ExpireMode
+ (*Address)(nil), // 2: io.statefun.sdk.reqreply.Address
+ (*TypedValue)(nil), // 3: io.statefun.sdk.reqreply.TypedValue
+ (*ToFunction)(nil), // 4: io.statefun.sdk.reqreply.ToFunction
+ (*FromFunction)(nil), // 5: io.statefun.sdk.reqreply.FromFunction
+ (*ToFunction_PersistedValue)(nil), // 6: io.statefun.sdk.reqreply.ToFunction.PersistedValue
+ (*ToFunction_Invocation)(nil), // 7: io.statefun.sdk.reqreply.ToFunction.Invocation
+ (*ToFunction_InvocationBatchRequest)(nil), // 8: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+ (*FromFunction_PersistedValueMutation)(nil), // 9: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation
+ (*FromFunction_Invocation)(nil), // 10: io.statefun.sdk.reqreply.FromFunction.Invocation
+ (*FromFunction_DelayedInvocation)(nil), // 11: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation
+ (*FromFunction_EgressMessage)(nil), // 12: io.statefun.sdk.reqreply.FromFunction.EgressMessage
+ (*FromFunction_InvocationResponse)(nil), // 13: io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+ (*FromFunction_ExpirationSpec)(nil), // 14: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec
+ (*FromFunction_PersistedValueSpec)(nil), // 15: io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec
+ (*FromFunction_IncompleteInvocationContext)(nil), // 16: io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+}
+var file_request_reply_proto_depIdxs = []int32{
+ 8, // 0: io.statefun.sdk.reqreply.ToFunction.invocation:type_name -> io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
+ 13, // 1: io.statefun.sdk.reqreply.FromFunction.invocation_result:type_name -> io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+ 16, // 2: io.statefun.sdk.reqreply.FromFunction.incomplete_invocation_context:type_name -> io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
+ 3, // 3: io.statefun.sdk.reqreply.ToFunction.PersistedValue.state_value:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 2, // 4: io.statefun.sdk.reqreply.ToFunction.Invocation.caller:type_name -> io.statefun.sdk.reqreply.Address
+ 3, // 5: io.statefun.sdk.reqreply.ToFunction.Invocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 2, // 6: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.target:type_name -> io.statefun.sdk.reqreply.Address
+ 6, // 7: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.state:type_name -> io.statefun.sdk.reqreply.ToFunction.PersistedValue
+ 7, // 8: io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.invocations:type_name -> io.statefun.sdk.reqreply.ToFunction.Invocation
+ 0, // 9: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.mutation_type:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.MutationType
+ 3, // 10: io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation.state_value:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 2, // 11: io.statefun.sdk.reqreply.FromFunction.Invocation.target:type_name -> io.statefun.sdk.reqreply.Address
+ 3, // 12: io.statefun.sdk.reqreply.FromFunction.Invocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 2, // 13: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation.target:type_name -> io.statefun.sdk.reqreply.Address
+ 3, // 14: io.statefun.sdk.reqreply.FromFunction.DelayedInvocation.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 3, // 15: io.statefun.sdk.reqreply.FromFunction.EgressMessage.argument:type_name -> io.statefun.sdk.reqreply.TypedValue
+ 9, // 16: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.state_mutations:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueMutation
+ 10, // 17: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.outgoing_messages:type_name -> io.statefun.sdk.reqreply.FromFunction.Invocation
+ 11, // 18: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.delayed_invocations:type_name -> io.statefun.sdk.reqreply.FromFunction.DelayedInvocation
+ 12, // 19: io.statefun.sdk.reqreply.FromFunction.InvocationResponse.outgoing_egresses:type_name -> io.statefun.sdk.reqreply.FromFunction.EgressMessage
+ 1, // 20: io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.mode:type_name -> io.statefun.sdk.reqreply.FromFunction.ExpirationSpec.ExpireMode
+ 14, // 21: io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec.expiration_spec:type_name -> io.statefun.sdk.reqreply.FromFunction.ExpirationSpec
+ 15, // 22: io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext.missing_values:type_name -> io.statefun.sdk.reqreply.FromFunction.PersistedValueSpec
+ 23, // [23:23] is the sub-list for method output_type
+ 23, // [23:23] is the sub-list for method input_type
+ 23, // [23:23] is the sub-list for extension type_name
+ 23, // [23:23] is the sub-list for extension extendee
+ 0, // [0:23] is the sub-list for field type_name
+}
+
+func init() { file_request_reply_proto_init() }
+func file_request_reply_proto_init() {
+ if File_request_reply_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_request_reply_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Address); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*TypedValue); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ToFunction); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ToFunction_PersistedValue); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ToFunction_Invocation); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ToFunction_InvocationBatchRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_PersistedValueMutation); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_Invocation); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_DelayedInvocation); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_EgressMessage); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_InvocationResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_ExpirationSpec); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_PersistedValueSpec); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_request_reply_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FromFunction_IncompleteInvocationContext); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ file_request_reply_proto_msgTypes[2].OneofWrappers = []interface{}{
+ (*ToFunction_Invocation_)(nil),
+ }
+ file_request_reply_proto_msgTypes[3].OneofWrappers = []interface{}{
+ (*FromFunction_InvocationResult)(nil),
+ (*FromFunction_IncompleteInvocationContext_)(nil),
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_request_reply_proto_rawDesc,
+ NumEnums: 2,
+ NumMessages: 15,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_request_reply_proto_goTypes,
+ DependencyIndexes: file_request_reply_proto_depIdxs,
+ EnumInfos: file_request_reply_proto_enumTypes,
+ MessageInfos: file_request_reply_proto_msgTypes,
+ }.Build()
+ File_request_reply_proto = out.File
+ file_request_reply_proto_rawDesc = nil
+ file_request_reply_proto_goTypes = nil
+ file_request_reply_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go
new file mode 100644
index 0000000..5b54383
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/internal/protocol/types.pb.go
@@ -0,0 +1,486 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
+// source: types.proto
+
+package protocol
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// BooleanWrapper represents a StateFun primitive type of a boolean value. This is recognized as:
+// io.statefun.types/bool
+type BooleanWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value bool `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *BooleanWrapper) Reset() {
+ *x = BooleanWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *BooleanWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BooleanWrapper) ProtoMessage() {}
+
+func (x *BooleanWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use BooleanWrapper.ProtoReflect.Descriptor instead.
+func (*BooleanWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *BooleanWrapper) GetValue() bool {
+ if x != nil {
+ return x.Value
+ }
+ return false
+}
+
+// IntWrapper represents a StateFun primitive type of an signed 32 bit integer value. This is recognized as:
+// io.statefun.types/int
+type IntWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value int32 `protobuf:"fixed32,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *IntWrapper) Reset() {
+ *x = IntWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *IntWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IntWrapper) ProtoMessage() {}
+
+func (x *IntWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use IntWrapper.ProtoReflect.Descriptor instead.
+func (*IntWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *IntWrapper) GetValue() int32 {
+ if x != nil {
+ return x.Value
+ }
+ return 0
+}
+
+// FloatWrapper represents a StateFun primitive type of a signed float value. This is recognized as:
+// io.statefun.types/float
+type FloatWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value float32 `protobuf:"fixed32,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *FloatWrapper) Reset() {
+ *x = FloatWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FloatWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FloatWrapper) ProtoMessage() {}
+
+func (x *FloatWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FloatWrapper.ProtoReflect.Descriptor instead.
+func (*FloatWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *FloatWrapper) GetValue() float32 {
+ if x != nil {
+ return x.Value
+ }
+ return 0
+}
+
+// LongWrapper represents a StateFun primitive type of a signed 64 bit long value. This is recognized as:
+// io.statefun.types/long
+type LongWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value int64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *LongWrapper) Reset() {
+ *x = LongWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *LongWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*LongWrapper) ProtoMessage() {}
+
+func (x *LongWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use LongWrapper.ProtoReflect.Descriptor instead.
+func (*LongWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *LongWrapper) GetValue() int64 {
+ if x != nil {
+ return x.Value
+ }
+ return 0
+}
+
+// DoubleWrapper represents a StateFun primitive type of a double value. This is recognized as:
+// io.statefun.types/double
+type DoubleWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *DoubleWrapper) Reset() {
+ *x = DoubleWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *DoubleWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*DoubleWrapper) ProtoMessage() {}
+
+func (x *DoubleWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use DoubleWrapper.ProtoReflect.Descriptor instead.
+func (*DoubleWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *DoubleWrapper) GetValue() float64 {
+ if x != nil {
+ return x.Value
+ }
+ return 0
+}
+
+// StringWrapper represents a StateFun string. This is recognized as:
+// io.statefun.types/string
+type StringWrapper struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *StringWrapper) Reset() {
+ *x = StringWrapper{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_types_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *StringWrapper) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*StringWrapper) ProtoMessage() {}
+
+func (x *StringWrapper) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use StringWrapper.ProtoReflect.Descriptor instead.
+func (*StringWrapper) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *StringWrapper) GetValue() string {
+ if x != nil {
+ return x.Value
+ }
+ return ""
+}
+
+var File_types_proto protoreflect.FileDescriptor
+
+var file_types_proto_rawDesc = []byte{
+ 0x0a, 0x0b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x69,
+ 0x6f, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x74,
+ 0x79, 0x70, 0x65, 0x73, 0x22, 0x26, 0x0a, 0x0e, 0x42, 0x6f, 0x6f, 0x6c, 0x65, 0x61, 0x6e, 0x57,
+ 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x22, 0x0a, 0x0a,
+ 0x49, 0x6e, 0x74, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61,
+ 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+ 0x22, 0x24, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x61, 0x74, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72,
+ 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x02, 0x52,
+ 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x23, 0x0a, 0x0b, 0x4c, 0x6f, 0x6e, 0x67, 0x57, 0x72,
+ 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x10, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x44,
+ 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05,
+ 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x61, 0x70,
+ 0x70, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3d, 0x0a, 0x2d, 0x6f, 0x72, 0x67,
+ 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74,
+ 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73,
+ 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x50, 0x01, 0x5a, 0x0a, 0x2e, 0x3b,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_types_proto_rawDescOnce sync.Once
+ file_types_proto_rawDescData = file_types_proto_rawDesc
+)
+
+func file_types_proto_rawDescGZIP() []byte {
+ file_types_proto_rawDescOnce.Do(func() {
+ file_types_proto_rawDescData = protoimpl.X.CompressGZIP(file_types_proto_rawDescData)
+ })
+ return file_types_proto_rawDescData
+}
+
+var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_types_proto_goTypes = []interface{}{
+ (*BooleanWrapper)(nil), // 0: io.statefun.sdk.types.BooleanWrapper
+ (*IntWrapper)(nil), // 1: io.statefun.sdk.types.IntWrapper
+ (*FloatWrapper)(nil), // 2: io.statefun.sdk.types.FloatWrapper
+ (*LongWrapper)(nil), // 3: io.statefun.sdk.types.LongWrapper
+ (*DoubleWrapper)(nil), // 4: io.statefun.sdk.types.DoubleWrapper
+ (*StringWrapper)(nil), // 5: io.statefun.sdk.types.StringWrapper
+}
+var file_types_proto_depIdxs = []int32{
+ 0, // [0:0] is the sub-list for method output_type
+ 0, // [0:0] is the sub-list for method input_type
+ 0, // [0:0] is the sub-list for extension type_name
+ 0, // [0:0] is the sub-list for extension extendee
+ 0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_types_proto_init() }
+func file_types_proto_init() {
+ if File_types_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_types_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*BooleanWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_types_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*IntWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_types_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FloatWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_types_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*LongWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_types_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*DoubleWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_types_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*StringWrapper); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_types_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 6,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_types_proto_goTypes,
+ DependencyIndexes: file_types_proto_depIdxs,
+ MessageInfos: file_types_proto_msgTypes,
+ }.Build()
+ File_types_proto = out.File
+ file_types_proto_rawDesc = nil
+ file_types_proto_goTypes = nil
+ file_types_proto_depIdxs = nil
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/message.go b/statefun-sdk-go/v3/pkg/statefun/message.go
new file mode 100644
index 0000000..dff5fdf
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/message.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+)
+
+type MessageBuilder struct {
+ Target Address
+ Value interface{}
+ ValueType SimpleType
+}
+
+func (m MessageBuilder) ToMessage() (Message, error) {
+ if m.Target == (Address{}) {
+ return Message{}, errors.New("a message must have a non-empty target")
+ }
+
+ if m.Value == nil {
+ return Message{}, errors.New("a message cannot have a nil value")
+ }
+
+ if m.ValueType == nil {
+ switch m.Value.(type) {
+ case int:
+ return Message{}, errors.New("ambiguous integer type; please specify int32 or int64")
+ case bool, *bool:
+ m.ValueType = BoolType
+ case int32, *int32:
+ m.ValueType = Int32Type
+ case int64, *int64:
+ m.ValueType = Int64Type
+ case float32, *float32:
+ m.ValueType = Float32Type
+ case float64, *float64:
+ m.ValueType = Float64Type
+ case string, *string:
+ m.ValueType = StringType
+ default:
+ return Message{}, errors.New("message contains non-primitive type, please supply a non-nil SimpleType")
+ }
+ }
+
+ buffer := bytes.Buffer{}
+ err := m.ValueType.Serialize(&buffer, m.Value)
+ if err != nil {
+ return Message{}, err
+ }
+
+ return Message{
+ target: &protocol.Address{
+ Namespace: m.Target.FunctionType.GetNamespace(),
+ Type: m.Target.FunctionType.GetType(),
+ Id: m.Target.Id,
+ },
+ typedValue: &protocol.TypedValue{
+ Typename: m.ValueType.GetTypeName().String(),
+ HasValue: true,
+ Value: buffer.Bytes(),
+ },
+ }, nil
+}
+
+type Message struct {
+ target *protocol.Address
+ typedValue *protocol.TypedValue
+}
+
+func (m *Message) IsBool() bool {
+ return m.Is(BoolType)
+}
+
+func (m *Message) AsBool() bool {
+ var receiver bool
+ if err := BoolType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+ return receiver
+}
+
+func (m *Message) IsInt32() bool {
+ return m.Is(Int32Type)
+}
+
+func (m *Message) AsInt32() int32 {
+ var receiver int32
+ if err := Int32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+ return receiver
+}
+
+func (m *Message) IsInt64() bool {
+ return m.Is(Int64Type)
+}
+
+func (m *Message) AsInt64() int64 {
+ var receiver int64
+ if err := Int64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+ return receiver
+}
+
+func (m *Message) IsFloat32() bool {
+ return m.Is(Float32Type)
+}
+
+func (m *Message) AsFloat32() float32 {
+ var receiver float32
+ if err := Float32Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+ return receiver
+}
+
+func (m *Message) IsFloat64() bool {
+ return m.Is(Float64Type)
+}
+
+func (m *Message) AsFloat64() float64 {
+ var receiver float64
+ if err := Float64Type.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+ return receiver
+}
+
+func (m *Message) IsString() bool {
+ return m.Is(StringType)
+}
+
+func (m *Message) AsString() string {
+ var receiver string
+ if err := StringType.Deserialize(bytes.NewReader(m.typedValue.Value), &receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize message: %w", err))
+ }
+
+ return receiver
+}
+
+func (m *Message) Is(t SimpleType) bool {
+ return t.GetTypeName().String() == m.typedValue.Typename
+}
+
+func (m *Message) As(t SimpleType, receiver interface{}) error {
+ return t.Deserialize(bytes.NewReader(m.typedValue.Value), receiver)
+}
+
+func (m *Message) ValueTypeName() TypeName {
+ return TypeNameFrom(m.typedValue.Typename)
+}
+
+func (m *Message) RawValue() []byte {
+ return m.typedValue.Value
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/message_test.go b/statefun-sdk-go/v3/pkg/statefun/message_test.go
new file mode 100644
index 0000000..2b18894
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/message_test.go
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBasicIntMessage(t *testing.T) {
+ typename, err := ParseTypeName("foo/bar")
+ assert.NoError(t, err)
+
+ message, err := MessageBuilder{
+ Target: Address{
+ FunctionType: typename,
+ Id: "a",
+ },
+ Value: int32(1),
+ }.ToMessage()
+
+ assert.NoError(t, err)
+ assert.True(t, message.IsInt32())
+
+ value := message.AsInt32()
+ assert.Equal(t, value, int32(1))
+}
+
+func TestMessageWithType(t *testing.T) {
+ typename, err := ParseTypeName("foo/bar")
+ assert.NoError(t, err)
+
+ message, err := MessageBuilder{
+ Target: Address{
+ FunctionType: typename,
+ Id: "a",
+ },
+ Value: float32(5.0),
+ ValueType: Float32Type,
+ }.ToMessage()
+
+ assert.NoError(t, err)
+ assert.True(t, message.IsFloat32())
+
+ value := message.AsFloat32()
+ assert.Equal(t, value, float32(5.0))
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/stateful_function.go b/statefun-sdk-go/v3/pkg/statefun/stateful_function.go
new file mode 100644
index 0000000..de95be3
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/stateful_function.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+// A StatefulFunction is a user-defined function that can be invoked with a given input.
+// This is the primitive building block for a Stateful Functions application.
+//
+// Concept
+//
+// Each individual StatefulFunction is an uniquely invokable "instance" of a registered
+// StatefulFunctionSpec. Each instance is identified by an Address, representing the
+// function's unique id (a string) within its type. From a user's perspective, it would seem as if
+// for each unique function id, there exists a stateful instance of the function that is always
+// available to be invoked within a Stateful Functions application.
+//
+// Invoking a StatefulFunction
+//
+// An individual StatefulFunction can be invoked with arbitrary input from any another
+// StatefulFunction (including itself), or routed from ingresses. To invoke a
+// StatefulFunction, the caller simply needs to know the Address of the target function.
+//
+// As a result of invoking a StatefulFunction, the function may continue to invoke other
+// functions, access persisted values, or send messages to egresses.
+//
+// Persistent State
+//
+// Each individual StatefulFunction may have persistent values written to storage that is
+// maintained by the system, providing consistent exactly-once and fault-tolerant guarantees. Please
+// see docs in ValueSpec and AddressScopedStorage for an overview of how to
+// register persistent values and access the storage.
+type StatefulFunction interface {
+
+ // Invoke is the method called for each message. The passed Context
+ // is canceled as soon as Invoke returns as a signal to
+ // any spawned go routines. The method may return
+ // an Error to signal the invocation failed and should
+ // be reattempted.
+ Invoke(ctx Context, message Message) error
+}
+
+// StatefulFunctionSpec for a Stateful Function, identifiable
+// by a unique TypeName.
+type StatefulFunctionSpec struct {
+ // The unique TypeName associated
+ // the StatefulFunction being defined.
+ FunctionType TypeName
+
+ // A slice of registered ValueSpec's that will be used
+ // by this function. A function may only access values
+ // that have been eagerly registered as part of its spec.
+ States []ValueSpec
+
+ // The physical StatefulFunction instance.
+ Function StatefulFunction
+}
+
+// The StatefulFunctionPointer type is an adapter to allow the use of
+// ordinary functions as StatefulFunction's. If f is a function
+// with the appropriate signature, StatefulFunctionPointer(f) is a
+// StatefulFunction that calls f.
+type StatefulFunctionPointer func(Context, Message) error
+
+func (s StatefulFunctionPointer) Invoke(ctx Context, message Message) error {
+ return s(ctx, message)
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/storage.go b/statefun-sdk-go/v3/pkg/statefun/storage.go
new file mode 100644
index 0000000..57e5364
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/storage.go
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "fmt"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "sync"
+)
+
+// An AddressScopedStorage is used for reading and writing persistent
+// values that are managed by the Stateful Functions runtime for
+// fault-tolerance and consistency.
+//
+// All access to the storage is scoped to the current function instance,
+// identified by the instance's Address. This means that within an
+// invocation, function instances may only access its own persisted
+// values through this storage.
+type AddressScopedStorage interface {
+
+ // Get returnss the values of the provided ValueSpec, scoped to the
+ // current invoked Address and stores the result in the value
+ // pointed to by receiver. The method will return false
+ // if there is no value for the spec in storage
+ // so callers can differentiate between missing and
+ // the types zero value.
+ Get(spec ValueSpec, receiver interface{}) (exists bool)
+
+ // Set updates the value for the provided ValueSpec, scoped
+ // to the current invoked Address.
+ Set(spec ValueSpec, value interface{})
+
+ // Remove deletes the prior value set for the the provided
+ // ValueSpec, scoped to the current invoked Address.
+ //
+ // After removing the value, calling Get for the same
+ // spec under the same Address will return false.
+ Remove(spec ValueSpec)
+}
+
+type storage struct {
+ mutex sync.RWMutex
+ cells map[string]*internal.Cell
+}
+
+type storageFactory interface {
+ getStorage() *storage
+
+ getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec
+}
+
+func newStorageFactory(
+ batch *protocol.ToFunction_InvocationBatchRequest,
+ specs map[string]*protocol.FromFunction_PersistedValueSpec,
+) storageFactory {
+ storage := &storage{
+ cells: make(map[string]*internal.Cell, len(specs)),
+ }
+
+ states := make(map[string]*protocol.FromFunction_PersistedValueSpec, len(specs))
+ for k, v := range specs {
+ states[k] = v
+ }
+
+ if batch.State != nil {
+ for _, state := range batch.State {
+ spec, exists := states[state.StateName]
+ if !exists {
+ continue
+ }
+
+ delete(states, state.StateName)
+
+ storage.cells[state.StateName] = internal.NewCell(state, spec.TypeTypename)
+ }
+ }
+
+ if len(states) > 0 {
+ var missing = make([]*protocol.FromFunction_PersistedValueSpec, 0, len(states))
+ for _, spec := range states {
+ missing = append(missing, spec)
+ }
+
+ return MissingSpecs(missing)
+ } else {
+ return storage
+ }
+}
+
+func (s *storage) getStorage() *storage {
+ return s
+}
+
+func (s *storage) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
+ return nil
+}
+
+func (s *storage) Get(spec ValueSpec, receiver interface{}) bool {
+ s.mutex.RLock()
+ defer s.mutex.RUnlock()
+
+ cell, ok := s.cells[spec.Name]
+ if !ok {
+ panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+ }
+
+ if !cell.HasValue() {
+ return false
+ }
+
+ cell.SeekToBeginning()
+ if err := spec.ValueType.Deserialize(cell, receiver); err != nil {
+ panic(fmt.Errorf("failed to deserialize persisted value `%s`: %w", spec.Name, err))
+ }
+
+ return true
+}
+
+func (s *storage) Set(spec ValueSpec, value interface{}) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ cell, ok := s.cells[spec.Name]
+ if !ok {
+ panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+ }
+
+ err := spec.ValueType.Serialize(cell, value)
+ if err != nil {
+ panic(fmt.Errorf("failed to serialize %s: %w", spec.Name, err))
+ }
+}
+
+func (s *storage) Remove(spec ValueSpec) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ cell, ok := s.cells[spec.Name]
+ if !ok {
+ panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
+ }
+
+ cell.Delete()
+}
+
+func (s *storage) getStateMutations() []*protocol.FromFunction_PersistedValueMutation {
+ mutations := make([]*protocol.FromFunction_PersistedValueMutation, 0, len(s.cells))
+ for name, cell := range s.cells {
+ if mutation := cell.GetStateMutation(name); mutation != nil {
+ mutations = append(mutations, mutation)
+ }
+ }
+
+ return mutations
+}
+
+type MissingSpecs []*protocol.FromFunction_PersistedValueSpec
+
+func (m MissingSpecs) getStorage() *storage {
+ return nil
+}
+
+func (m MissingSpecs) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
+ return m
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/typename.go b/statefun-sdk-go/v3/pkg/statefun/typename.go
new file mode 100644
index 0000000..8491dc5
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/typename.go
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "errors"
+ "fmt"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "strings"
+)
+
+var (
+ boolTypeName = TypeNameFrom("io.statefun.types/bool")
+ int32TypeName = TypeNameFrom("io.statefun.types/int")
+ int64TypeName = TypeNameFrom("io.statefun.types/long")
+ float32TypeName = TypeNameFrom("io.statefun.types/float")
+ float64TypeName = TypeNameFrom("io.statefun.types/double")
+ stringTypeName = TypeNameFrom("io.statefun.types/string")
+)
+
+// A TypeName is used to uniquely identify objects within
+// a Stateful Functions application, including functions,
+// egresses, and types. TypeName's serve as an integral
+// part of identifying these objects for message delivery
+// as well as message data serialization and deserialization.
+type TypeName interface {
+ fmt.Stringer
+ GetNamespace() string
+ GetType() string
+}
+
+type typeName struct {
+ namespace string
+ tpe string
+ typenameString string
+}
+
+func (t typeName) String() string {
+ return t.typenameString
+}
+
+func (t typeName) GetNamespace() string {
+ return t.namespace
+}
+
+func (t typeName) GetType() string {
+ return t.tpe
+}
+
+// TypeNameFrom creates a TypeName from a canonical string
+// in the format `<namespace>/<type>`. This Function
+// assumes correctly formatted strings and will panic
+// on error. For runtime error handling please
+// see ParseTypeName.
+func TypeNameFrom(typename string) TypeName {
+ result, err := ParseTypeName(typename)
+ if err != nil {
+ panic(err)
+ }
+
+ return result
+}
+
+// ParseTypeName creates a TypeName from a canonical string
+// in the format `<namespace>/<type>`.
+func ParseTypeName(typename string) (TypeName, error) {
+ position := strings.LastIndex(typename, "/")
+ if position <= 0 || position == len(typename)-1 {
+ return nil, fmt.Errorf("%v does not conform to the <namespace>/<type> format", typename)
+ }
+
+ namespace := typename[:position]
+ name := typename[position+1:]
+
+ if namespace[len(namespace)-1] == '/' {
+ namespace = namespace[:len(namespace)-1]
+ }
+
+ return TypeNameFromParts(namespace, name)
+}
+
+func TypeNameFromParts(namespace, tpe string) (TypeName, error) {
+ if len(namespace) == 0 {
+ return nil, errors.New("namespace cannot be empty")
+ }
+
+ if len(tpe) == 0 {
+ return nil, errors.New("type cannot be empty")
+ }
+
+ return typeName{
+ namespace: namespace,
+ tpe: tpe,
+ typenameString: fmt.Sprintf("%s/%s", namespace, tpe),
+ }, nil
+}
+
+// An Address is the unique identity of an individual StatefulFunction,
+// containing the function's FunctionType and a unique identifier
+// within the type. The function's type denotes the type (or class) of function
+// to invoke, while the unique identifier addresses the invocation to a specific
+// function instance.
+type Address struct {
+ FunctionType TypeName
+ Id string
+}
+
+func (a Address) String() string {
+ return fmt.Sprintf("Address(%s, %s, %s)", a.FunctionType.GetNamespace(), a.FunctionType.GetType(), a.Id)
+}
+
+func addressFromInternal(a *protocol.Address) Address {
+ name, _ := TypeNameFromParts(a.Namespace, a.Type)
+ return Address{
+ FunctionType: name,
+ Id: a.Id,
+ }
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/typename_test.go b/statefun-sdk-go/v3/pkg/statefun/typename_test.go
new file mode 100644
index 0000000..c07cd19
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/typename_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestTypeNameParse(t *testing.T) {
+ typename, err := ParseTypeName("namespace/tpe")
+
+ assert.NoError(t, err)
+ assert.Equal(t, typename.GetNamespace(), "namespace")
+ assert.Equal(t, typename.GetType(), "tpe")
+}
+
+func TestNoNamespace(t *testing.T) {
+ _, err := ParseTypeName("/bar")
+ assert.Error(t, err)
+}
+
+func TestNoName(t *testing.T) {
+ _, err := ParseTypeName("n/")
+ assert.Error(t, err)
+}
+
+func TestNoNamespaceOrName(t *testing.T) {
+ _, err := ParseTypeName("/")
+ assert.Error(t, err)
+}
+
+func TestEmptyString(t *testing.T) {
+ _, err := ParseTypeName("")
+ assert.Error(t, err)
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/types.go b/statefun-sdk-go/v3/pkg/statefun/types.go
new file mode 100644
index 0000000..e820a3a
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/types.go
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "encoding/json"
+ "errors"
+ "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "log"
+)
+
+// SimpleType interface is the core abstraction used by Stateful
+// Function's type system, and consists of a few things
+// that StateFun uses to handle Message's and ValueSpec's
+//
+// 1. TypeName to identify the type.
+// 2. (De)serialization methods for marshalling and unmarshalling data
+//
+// Cross-language primitive types
+//
+// StateFun's type system has cross-language support for common primitive
+// types, such as boolean, integer (int32), long (int64), etc. These
+// primitive types have built-in SimpleType's implemented for them already
+// with predefined TypeName's.
+//
+// These primitives have standard encoding across all StateFun language
+// SDKs, so functions in various other languages (Java, Python, etc) can
+// message Golang functions by directly sending supported primitive
+// values as message arguments. Moreover, the type system is used for
+// state values as well; so you can expect that a function can safely
+// read previous state after reimplementing it in a different language.
+//
+// Common custom types
+//
+// The type system is also very easily extensible to support more complex types.
+// The Go SDK ships with predefined support for JSON and Protobuf - see MakeJsonType
+// MakeProtobufType. For other formats, it is just a matter of implementing
+// your own SimpleType with a custom typename and serializer.
+type SimpleType interface {
+ GetTypeName() TypeName
+
+ Deserialize(r io.Reader, receiver interface{}) error
+
+ Serialize(writer io.Writer, data interface{}) error
+}
+
+type PrimitiveType int
+
+const (
+ BoolType PrimitiveType = iota
+ Int32Type
+ Int64Type
+ Float32Type
+ Float64Type
+ StringType
+)
+
+var (
+ boolWrapperType = MakeProtobufTypeWithTypeName(boolTypeName)
+ int32WrapperType = MakeProtobufTypeWithTypeName(int32TypeName)
+ int64WrapperType = MakeProtobufTypeWithTypeName(int64TypeName)
+ float32WrapperType = MakeProtobufTypeWithTypeName(float64TypeName)
+ float64WrapperType = MakeProtobufTypeWithTypeName(float64TypeName)
+ stringWrapperType = MakeProtobufTypeWithTypeName(stringTypeName)
+)
+
+func (p PrimitiveType) GetTypeName() TypeName {
+ switch p {
+ case BoolType:
+ return boolTypeName
+ case Int32Type:
+ return int32TypeName
+ case Int64Type:
+ return int64TypeName
+ case Float32Type:
+ return float32TypeName
+ case Float64Type:
+ return float64TypeName
+ case StringType:
+ return stringTypeName
+ default:
+ log.Fatalf("unknown primitive type %v", p)
+ // unreachable
+ return nil
+ }
+}
+
+func (p PrimitiveType) Deserialize(r io.Reader, receiver interface{}) error {
+ switch p {
+ case BoolType:
+ switch data := receiver.(type) {
+ case *bool:
+ var wrapper protocol.BooleanWrapper
+ if err := boolWrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type bool or *bool")
+ }
+ case Int32Type:
+ switch data := receiver.(type) {
+ case *int32:
+ var wrapper protocol.IntWrapper
+ if err := int32WrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type *int32")
+ }
+ case Int64Type:
+ switch data := receiver.(type) {
+ case *int64:
+ var wrapper protocol.LongWrapper
+ if err := int64WrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type *int64")
+ }
+ case Float32Type:
+ switch data := receiver.(type) {
+ case *float32:
+ var wrapper protocol.FloatWrapper
+ if err := float32WrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type *float32")
+ }
+ case Float64Type:
+ switch data := receiver.(type) {
+ case *float64:
+ var wrapper protocol.DoubleWrapper
+ if err := float64WrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type *float64")
+ }
+ case StringType:
+ switch data := receiver.(type) {
+ case *string:
+ var wrapper protocol.StringWrapper
+ if err := stringWrapperType.Deserialize(r, &wrapper); err != nil {
+ return err
+ }
+
+ *data = wrapper.Value
+ default:
+ return errors.New("receiver must be of type *string")
+ }
+ default:
+ log.Fatalf("unknown primitive type %v", p)
+ // unreachable
+ return nil
+ }
+
+ return nil
+}
+
+func (p PrimitiveType) Serialize(writer io.Writer, data interface{}) error {
+ switch p {
+ case BoolType:
+ switch data := data.(type) {
+ case bool:
+ wrapper := protocol.BooleanWrapper{Value: data}
+ return boolWrapperType.Serialize(writer, &wrapper)
+ case *bool:
+ wrapper := protocol.BooleanWrapper{Value: *data}
+ return boolWrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type bool or *bool")
+ }
+ case Int32Type:
+ switch data := data.(type) {
+ case int32:
+ wrapper := protocol.IntWrapper{Value: data}
+ return int32WrapperType.Serialize(writer, &wrapper)
+ case *int32:
+ wrapper := protocol.IntWrapper{Value: *data}
+ return int32WrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type int32 or *int32")
+ }
+ case Int64Type:
+ switch data := data.(type) {
+ case int64:
+ wrapper := protocol.LongWrapper{Value: data}
+ return int64WrapperType.Serialize(writer, &wrapper)
+ case *int64:
+ wrapper := protocol.LongWrapper{Value: *data}
+ return int64WrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type int64 or *int64")
+ }
+ case Float32Type:
+ switch data := data.(type) {
+ case float32:
+ wrapper := protocol.FloatWrapper{Value: data}
+ return float32WrapperType.Serialize(writer, &wrapper)
+ case *float32:
+ wrapper := protocol.FloatWrapper{Value: *data}
+ return float32WrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type float32 or *float32")
+ }
+ case Float64Type:
+ switch data := data.(type) {
+ case float64:
+ wrapper := protocol.DoubleWrapper{Value: data}
+ return float64WrapperType.Serialize(writer, &wrapper)
+ case *float64:
+ wrapper := protocol.DoubleWrapper{Value: *data}
+ return float64WrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type float64 or *float64")
+ }
+ case StringType:
+ switch data := data.(type) {
+ case string:
+ wrapper := protocol.StringWrapper{Value: data}
+ return stringWrapperType.Serialize(writer, &wrapper)
+ case *string:
+ wrapper := protocol.StringWrapper{Value: *data}
+ return stringWrapperType.Serialize(writer, &wrapper)
+ default:
+ return errors.New("data must be of type string or *string")
+ }
+ default:
+ log.Fatalf("unknown primitive type %v", p)
+ // unreachable
+ return nil
+ }
+}
+
+type jsonType struct {
+ typeName TypeName
+}
+
+// MakeJsonType creates a new SimpleType with a given TypeName
+// using the standard Go JSON library.
+func MakeJsonType(name TypeName) SimpleType {
+ return jsonType{typeName: name}
+}
+
+func (j jsonType) GetTypeName() TypeName {
+ return j.typeName
+}
+
+func (j jsonType) Deserialize(r io.Reader, receiver interface{}) error {
+ return json.NewDecoder(r).Decode(receiver)
+}
+
+func (j jsonType) Serialize(writer io.Writer, data interface{}) error {
+ return json.NewEncoder(writer).Encode(data)
+}
+
+type protoType struct {
+ typeName TypeName
+}
+
+// MakeProtobufType creates a new SimpleType for the given protobuf Message.
+func MakeProtobufType(m proto.Message) SimpleType {
+ name := proto.MessageName(m)
+ tName, _ := TypeNameFromParts("type.googleapis.com", string(name))
+ return MakeProtobufTypeWithTypeName(tName)
+}
+
+// MakeProtobufTypeWithTypeName creates a new SimpleType for the
+// given protobuf Message with a custom namespace.
+func MakeProtobufTypeWithTypeName(typeName TypeName) SimpleType {
+ return protoType{
+ typeName: typeName,
+ }
+}
+
+func (p protoType) GetTypeName() TypeName {
+ return p.typeName
+}
+
+func (p protoType) Deserialize(r io.Reader, receiver interface{}) (err error) {
+ switch receiver := receiver.(type) {
+ case proto.Message:
+ data, err := io.ReadAll(r)
+ if err != nil {
+ return err
+ }
+
+ return proto.Unmarshal(data, receiver)
+ default:
+ return errors.New("receiver must implement proto.Message")
+ }
+}
+
+func (p protoType) Serialize(writer io.Writer, data interface{}) error {
+ switch data := data.(type) {
+ case proto.Message:
+ if value, err := proto.Marshal(data); err != nil {
+ return err
+ } else {
+ _, err = writer.Write(value)
+ return err
+ }
+
+ default:
+ return errors.New("data must implement proto.Message")
+ }
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/types_test.go b/statefun-sdk-go/v3/pkg/statefun/types_test.go
new file mode 100644
index 0000000..4e00da6
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/types_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "bytes"
+ "github.com/stretchr/testify/assert"
+ "math"
+ "testing"
+)
+
+func TestBoolType(t *testing.T) {
+ testBool(t, true)
+ testBool(t, false)
+}
+
+func testBool(t *testing.T, data bool) {
+ buffer := bytes.Buffer{}
+ err := BoolType.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result bool
+ err = BoolType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+func TestIntType(t *testing.T) {
+ testInt32(t, 1)
+ testInt32(t, 1048576)
+ testInt32(t, math.MaxInt32)
+ testInt32(t, math.MinInt32)
+ testInt32(t, -1)
+}
+
+func testInt32(t *testing.T, data int32) {
+ buffer := bytes.Buffer{}
+ err := Int32Type.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result int32
+ err = Int32Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+func TestLongType(t *testing.T) {
+ testInt64(t, -1)
+ testInt64(t, 0)
+ testInt64(t, math.MinInt64)
+ testInt64(t, math.MaxInt64)
+}
+
+func testInt64(t *testing.T, data int64) {
+ buffer := bytes.Buffer{}
+ err := Int64Type.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result int64
+ err = Int64Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+func TestFloatType(t *testing.T) {
+ testFloat32(t, math.MaxFloat32)
+ testFloat32(t, math.SmallestNonzeroFloat32)
+ testFloat32(t, 2.1459)
+ testFloat32(t, -1e4)
+}
+
+func testFloat32(t *testing.T, data float32) {
+ buffer := bytes.Buffer{}
+ err := Float32Type.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result float32
+ err = Float32Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+func TestDoubleType(t *testing.T) {
+ testFloat64(t, math.MaxFloat64)
+ testFloat64(t, math.SmallestNonzeroFloat64)
+ testFloat64(t, 2.1459)
+ testFloat64(t, -1e4)
+}
+
+func testFloat64(t *testing.T, data float64) {
+ buffer := bytes.Buffer{}
+ err := Float64Type.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result float64
+ err = Float64Type.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+func TestStringType(t *testing.T) {
+ testString(t, "")
+ testString(t, "This is a string")
+}
+
+func testString(t *testing.T, data string) {
+ buffer := bytes.Buffer{}
+ err := StringType.Serialize(&buffer, data)
+ assert.NoError(t, err)
+
+ var result string
+ err = StringType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, data, result)
+}
+
+type User struct {
+ FirstName string `json:"first_name"`
+ LastName string `json:"last_name"`
+}
+
+func TestJsonType(t *testing.T) {
+ buffer := bytes.Buffer{}
+ userType := MakeJsonType(TypeNameFrom("org.foo.bar/UserJson"))
+
+ err := userType.Serialize(&buffer, User{"bob", "mop"})
+ assert.NoError(t, err)
+
+ var result User
+ err = userType.Deserialize(bytes.NewReader(buffer.Bytes()), &result)
+ assert.NoError(t, err)
+
+ assert.Equal(t, result, User{"bob", "mop"})
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/value_spec.go b/statefun-sdk-go/v3/pkg/statefun/value_spec.go
new file mode 100644
index 0000000..6d51cd8
--- /dev/null
+++ b/statefun-sdk-go/v3/pkg/statefun/value_spec.go
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package statefun
+
+import (
+ "fmt"
+ "log"
+ "regexp"
+ "time"
+)
+
+type expirationType int
+
+const (
+ none expirationType = iota
+ expireAfterCall
+ expireAfterWrite
+)
+
+func (e expirationType) String() string {
+ switch e {
+ case expireAfterCall:
+ return "expire_after_call"
+ case expireAfterWrite:
+ return "expire_after_write"
+ case none:
+ return "none"
+ default:
+ panic("unknown Expiration type")
+ }
+}
+
+// Expiration Configuration
+//
+// Defines the way state can be auto expired by the runtime.
+// State Expiration (also known as TTL) can be used to keep
+// state from growing arbitrarily by assigning an Expiration
+// date to a value.
+//
+// State can be expired after a duration has passed since either
+// the last write to the state, or the last call to the Function.
+type Expiration struct {
+ expirationType
+ duration time.Duration
+}
+
+func (e Expiration) String() string {
+ return fmt.Sprintf("Expiration{mode=%v, duration=%v}", e.expirationType.String(), e.duration.String())
+}
+
+// ExpireAfterCall returns an Expiration configuration that would expire
+// a duration after the last invocation of the Function.
+func ExpireAfterCall(duration time.Duration) Expiration {
+ return Expiration{
+ expireAfterCall,
+ duration,
+ }
+}
+
+// ExpireAfterWrite returns an Expiration configuration that
+// would expire a duration after the last write.
+func ExpireAfterWrite(duration time.Duration) Expiration {
+ return Expiration{
+ expireAfterWrite,
+ duration,
+ }
+}
+
+// A ValueSpec identifies a registered persistent value of a function, which will be
+// managed by the Stateful Functions runtime for consistency and fault-tolerance. A
+// ValueSpec is registered for a function by configuring it on the function's
+// associated StatefulFunctionSpec.
+type ValueSpec struct {
+ // The given tpe of the persistent value. The tpe must be a valid
+ // identifier conforming to the following rules:
+ //
+ // 1. First character must be an alphabet letter [a-z] / [A-Z], or an underscore '_'.
+ // 2. Remaining characters can be an alphabet letter [a-z] / [A-Z], a digit [0-9], or
+ // an underscore '-'.
+ // 3. Must not contain any spaces.
+ Name string
+
+ // The SimpleType of the persistent value. Either
+ // a built-in PrimitiveType or custom implementation.
+ ValueType SimpleType
+
+ // An optional expiration configuration.
+ Expiration Expiration
+}
+
+func (v *ValueSpec) String() string {
+ return "ValueSpec{name=" + v.Name + ", type=" + v.ValueType.GetTypeName().String() + ", expiration=" + v.Expiration.String() + "}"
+}
+
+const invalidNameMessage = `
+invalid state tpe %s. state names can only start with alphabet letters [a-z][A-Z] or an underscore '_' followed by zero or more characters that are alphanumeric or underscores
+`
+
+func validateValueSpec(s ValueSpec) error {
+ matched, err := regexp.MatchString("^[a-zA-Z_][a-zA-Z_\\d]*$", s.Name)
+ if err != nil {
+ log.Panicf("invalid regex; this is a bug: %v", err)
+ }
+
+ if !matched {
+ return fmt.Errorf(invalidNameMessage, s.Name)
+ }
+
+ return nil
+}
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
index dc280b6..b134417 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
@@ -22,6 +22,7 @@
option java_package = "org.apache.flink.statefun.sdk.egress.generated";
option java_multiple_files = true;
+option go_package = ".;protocol";
message KafkaProducerRecord {
string key = 1;
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
index a365443..626c7a8 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
@@ -22,6 +22,7 @@
option java_package = "org.apache.flink.statefun.sdk.egress.generated";
option java_multiple_files = true;
+option go_package = ".;protocol";
message KinesisEgressRecord {
string partition_key = 1;
diff --git a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
index ac72d7c..aeaaf06 100644
--- a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
+++ b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
@@ -22,6 +22,7 @@
option java_package = "org.apache.flink.statefun.sdk.reqreply.generated";
option java_multiple_files = true;
+option go_package = ".;protocol";
// -------------------------------------------------------------------------------------------------------------------
// Common message definitions
diff --git a/statefun-sdk-protos/src/main/protobuf/types/types.proto b/statefun-sdk-protos/src/main/protobuf/types/types.proto
index 3674546..a16f343 100644
--- a/statefun-sdk-protos/src/main/protobuf/types/types.proto
+++ b/statefun-sdk-protos/src/main/protobuf/types/types.proto
@@ -22,7 +22,7 @@
option java_package = "org.apache.flink.statefun.sdk.types.generated";
option java_multiple_files = true;
-
+option go_package = ".;protocol";
// BooleanWrapper represents a StateFun primitive type of a boolean value. This is recognized as:
// io.statefun.types/bool