blob: 57e53642e7d34b5b0c25fae500058015cf50161c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statefun
import (
"fmt"
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal"
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
"sync"
)
// An AddressScopedStorage is used for reading and writing persistent
// values that are managed by the Stateful Functions runtime for
// fault-tolerance and consistency.
//
// All access to the storage is scoped to the current function instance,
// identified by the instance's Address. This means that within an
// invocation, function instances may only access its own persisted
// values through this storage.
type AddressScopedStorage interface {
// Get returnss the values of the provided ValueSpec, scoped to the
// current invoked Address and stores the result in the value
// pointed to by receiver. The method will return false
// if there is no value for the spec in storage
// so callers can differentiate between missing and
// the types zero value.
Get(spec ValueSpec, receiver interface{}) (exists bool)
// Set updates the value for the provided ValueSpec, scoped
// to the current invoked Address.
Set(spec ValueSpec, value interface{})
// Remove deletes the prior value set for the the provided
// ValueSpec, scoped to the current invoked Address.
//
// After removing the value, calling Get for the same
// spec under the same Address will return false.
Remove(spec ValueSpec)
}
type storage struct {
mutex sync.RWMutex
cells map[string]*internal.Cell
}
type storageFactory interface {
getStorage() *storage
getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec
}
func newStorageFactory(
batch *protocol.ToFunction_InvocationBatchRequest,
specs map[string]*protocol.FromFunction_PersistedValueSpec,
) storageFactory {
storage := &storage{
cells: make(map[string]*internal.Cell, len(specs)),
}
states := make(map[string]*protocol.FromFunction_PersistedValueSpec, len(specs))
for k, v := range specs {
states[k] = v
}
if batch.State != nil {
for _, state := range batch.State {
spec, exists := states[state.StateName]
if !exists {
continue
}
delete(states, state.StateName)
storage.cells[state.StateName] = internal.NewCell(state, spec.TypeTypename)
}
}
if len(states) > 0 {
var missing = make([]*protocol.FromFunction_PersistedValueSpec, 0, len(states))
for _, spec := range states {
missing = append(missing, spec)
}
return MissingSpecs(missing)
} else {
return storage
}
}
func (s *storage) getStorage() *storage {
return s
}
func (s *storage) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
return nil
}
func (s *storage) Get(spec ValueSpec, receiver interface{}) bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
cell, ok := s.cells[spec.Name]
if !ok {
panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
}
if !cell.HasValue() {
return false
}
cell.SeekToBeginning()
if err := spec.ValueType.Deserialize(cell, receiver); err != nil {
panic(fmt.Errorf("failed to deserialize persisted value `%s`: %w", spec.Name, err))
}
return true
}
func (s *storage) Set(spec ValueSpec, value interface{}) {
s.mutex.Lock()
defer s.mutex.Unlock()
cell, ok := s.cells[spec.Name]
if !ok {
panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
}
err := spec.ValueType.Serialize(cell, value)
if err != nil {
panic(fmt.Errorf("failed to serialize %s: %w", spec.Name, err))
}
}
func (s *storage) Remove(spec ValueSpec) {
s.mutex.Lock()
defer s.mutex.Unlock()
cell, ok := s.cells[spec.Name]
if !ok {
panic(fmt.Errorf("unregistered ValueSpec %s", spec.Name))
}
cell.Delete()
}
func (s *storage) getStateMutations() []*protocol.FromFunction_PersistedValueMutation {
mutations := make([]*protocol.FromFunction_PersistedValueMutation, 0, len(s.cells))
for name, cell := range s.cells {
if mutation := cell.GetStateMutation(name); mutation != nil {
mutations = append(mutations, mutation)
}
}
return mutations
}
type MissingSpecs []*protocol.FromFunction_PersistedValueSpec
func (m MissingSpecs) getStorage() *storage {
return nil
}
func (m MissingSpecs) getMissingSpecs() []*protocol.FromFunction_PersistedValueSpec {
return m
}