blob: f3479c834566bfc1b005e155a82ecfacf4ab8b3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
"io"
)
// smallBufferSize is an initial allocation minimal capacity.
// this is the same constant as used in bytes.Buffer.
const smallBufferSize = 64
// Cell is a mutable, persisted value.
// This struct is not thread safe.
type Cell struct {
buf []byte // contents are the bytes buf[off : len(buf)].
off int // read at &buf[off], writes always begin from buf[0]
mutated bool // tracker if the cell has been mutated
hasValue bool // tracker if the cell has a valid value
typeTypeName string // the typename of the type whose serialized contents are stored in the cell
}
// NewCell creates and initializes a new Cell using state's value as its
// initial contents. The new Cell takes ownership of state, and the
// caller should not use state after this call.
func NewCell(state *protocol.ToFunction_PersistedValue, typeTypeName string) *Cell {
c := &Cell{
typeTypeName: typeTypeName,
}
if state.StateValue != nil {
c.hasValue = true
c.buf = state.StateValue.Value
}
return c
}
// SeekToBeginning resets the cell so the next
// read starts from the beginning of the underlying
// buffer, regardless of where the last read left off
func (c *Cell) SeekToBeginning() {
c.off = 0
}
// Read reads up to len(p) bytes into p. It returns the number of bytes
// read (0 <= n <= len(p)) and any error encountered. Read is resumable
// and returns EOF when there are no more bytes to read. This behavior
// is required for Cell to interoperate with the go standard library.
// Users of Cell are required to call SeekToBeginning, before the first
// read to ensure reads always begin at the start of the buffer.
func (c *Cell) Read(p []byte) (n int, err error) {
if c.empty() {
if len(p) == 0 {
return 0, nil
}
return 0, io.EOF
}
n = copy(p, c.buf[c.off:])
c.off += n
return n, nil
}
// Write writes the given slice into the cell.
// Unlike standard implementations of Write,
// cells always overwrite any existing data.
func (c *Cell) Write(p []byte) (n int, err error) {
c.mutated = true
c.hasValue = true
c.sizeBufferForCapacity(len(p))
c.off = 0
return copy(c.buf, p), nil
}
// sizeBufferForCapacity resizes the buffer to guarantee space for n bytes,
// attempting to first re-slice the underlying buffer to avoid allocations.
func (c *Cell) sizeBufferForCapacity(n int) {
if c.buf != nil && n <= cap(c.buf) {
c.buf = c.buf[:n]
} else if n <= smallBufferSize {
c.buf = make([]byte, n, smallBufferSize)
} else {
c.buf = make([]byte, n)
}
}
// Delete marks the value to be deleted and resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
func (c *Cell) Delete() {
c.mutated = true
c.hasValue = false
c.buf = c.buf[:0]
c.off = 0
}
// HasValue returns true if the cell contains a valid value,
// if the value is false, calls to Read will consume 0 bytes
func (c *Cell) HasValue() bool {
return c.hasValue
}
// GetStateMutation turns the final Cell into a FromFunction_PersistedValueMutation.
// The new FromFunction_PersistedValueMutation takes ownership of the underlying
// buffer and the cell should not be used after this function returns.
func (c *Cell) GetStateMutation(name string) *protocol.FromFunction_PersistedValueMutation {
if !c.mutated {
return nil
}
mutation := &protocol.FromFunction_PersistedValueMutation{
MutationType: protocol.FromFunction_PersistedValueMutation_DELETE,
StateName: name,
}
if c.hasValue {
mutation.MutationType = protocol.FromFunction_PersistedValueMutation_MODIFY
mutation.StateValue = &protocol.TypedValue{
Typename: c.typeTypeName,
HasValue: true,
Value: c.buf,
}
}
return mutation
}
// empty reports whether the unread portion of the buffer is empty.
func (c *Cell) empty() bool { return len(c.buf) <= c.off }