blob: d1ed3eaa63d475d362308a2496c5ec1b1f3fd267 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package plugin
import (
"context"
"fmt"
"math/rand/v2"
"time"
"github.com/apache/answer/internal/entity"
"github.com/segmentfault/pacman/cache"
"github.com/segmentfault/pacman/log"
"xorm.io/builder"
"xorm.io/xorm"
)
// Error variables for KV storage operations
var (
// ErrKVKeyNotFound is returned when the requested key does not exist in the KV storage
ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage")
// ErrKVGroupEmpty is returned when a required group name is empty
ErrKVGroupEmpty = fmt.Errorf("group name is empty")
// ErrKVKeyEmpty is returned when a required key name is empty
ErrKVKeyEmpty = fmt.Errorf("key name is empty")
// ErrKVKeyAndGroupEmpty is returned when both key and group names are empty
ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty")
// ErrKVTransactionFailed is returned when a KV storage transaction operation fails
ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed")
)
// KVParams is the parameters for KV storage operations
type KVParams struct {
Group string
Key string
Value string
Page int
PageSize int
}
// KVOperator provides methods to interact with the key-value storage system for plugins
type KVOperator struct {
data *Data
session *xorm.Session
pluginSlugName string
cacheTTL time.Duration
}
// KVStorageOption defines a function type that configures a KVOperator
type KVStorageOption func(*KVOperator)
// WithCacheTTL is the option to set the cache TTL; the default value is 30 minutes.
// If ttl is less than 0, the cache will not be used
func WithCacheTTL(ttl time.Duration) KVStorageOption {
return func(kv *KVOperator) {
kv.cacheTTL = ttl
}
}
// Option is used to set the options for the KV storage
func (kv *KVOperator) Option(opts ...KVStorageOption) {
for _, opt := range opts {
opt(kv)
}
}
func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) {
session := kv.session
cleanup := func() {}
if session == nil {
session = kv.data.DB.NewSession().Context(ctx)
cleanup = func() {
if session != nil {
session.Close()
}
}
}
return session, cleanup
}
func (kv *KVOperator) getCacheKey(params KVParams) string {
return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, params.Group, params.Key)
}
func (kv *KVOperator) setCache(ctx context.Context, params KVParams) {
if kv.cacheTTL < 0 {
return
}
ttl := kv.cacheTTL
if ttl > 10 {
ttl += time.Duration(float64(ttl) * 0.1 * (1 - rand.Float64()))
}
cacheKey := kv.getCacheKey(params)
if err := kv.data.Cache.SetString(ctx, cacheKey, params.Value, ttl); err != nil {
log.Warnf("cache set failed: %v, key: %s", err, cacheKey)
}
}
func (kv *KVOperator) getCache(ctx context.Context, params KVParams) (string, bool, error) {
if kv.cacheTTL < 0 {
return "", false, nil
}
cacheKey := kv.getCacheKey(params)
return kv.data.Cache.GetString(ctx, cacheKey)
}
func (kv *KVOperator) cleanCache(ctx context.Context, params KVParams) {
if kv.cacheTTL < 0 {
return
}
if err := kv.data.Cache.Del(ctx, kv.getCacheKey(params)); err != nil {
log.Warnf("Failed to delete cache for key %s: %v", params.Key, err)
}
}
// Get retrieves a value from KV storage by group and key.
// Returns the value as a string or an error if the key is not found.
func (kv *KVOperator) Get(ctx context.Context, params KVParams) (string, error) {
if params.Key == "" {
return "", ErrKVKeyEmpty
}
if value, exist, err := kv.getCache(ctx, params); err == nil && exist {
return value, nil
}
// query
data := entity.PluginKVStorage{}
query, cleanup := kv.getSession(ctx)
defer cleanup()
query.Where(builder.Eq{
"plugin_slug_name": kv.pluginSlugName,
"`group`": params.Group,
"`key`": params.Key,
})
has, err := query.Get(&data)
if err != nil {
return "", err
}
if !has {
return "", ErrKVKeyNotFound
}
params.Value = data.Value
kv.setCache(ctx, params)
return data.Value, nil
}
// Set stores a value in KV storage with the specified group and key.
// Updates the value if it already exists.
func (kv *KVOperator) Set(ctx context.Context, params KVParams) error {
if params.Key == "" {
return ErrKVKeyEmpty
}
query, cleanup := kv.getSession(ctx)
defer cleanup()
data := &entity.PluginKVStorage{
PluginSlugName: kv.pluginSlugName,
Group: params.Group,
Key: params.Key,
Value: params.Value,
}
kv.cleanCache(ctx, params)
affected, err := query.Where(builder.Eq{
"plugin_slug_name": kv.pluginSlugName,
"`group`": params.Group,
"`key`": params.Key,
}).Cols("value").Update(data)
if err != nil {
return err
}
if affected == 0 {
_, err = query.Insert(data)
if err != nil {
return err
}
}
return nil
}
// Del removes values from KV storage by group and/or key.
// If both group and key are provided, only that specific entry is deleted.
// If only group is provided, all entries in that group are deleted.
// At least one of group or key must be provided.
func (kv *KVOperator) Del(ctx context.Context, params KVParams) error {
if params.Key == "" && params.Group == "" {
return ErrKVKeyAndGroupEmpty
}
kv.cleanCache(ctx, params)
session, cleanup := kv.getSession(ctx)
defer cleanup()
session.Where(builder.Eq{
"plugin_slug_name": kv.pluginSlugName,
})
if params.Group != "" {
session.Where(builder.Eq{"`group`": params.Group})
}
if params.Key != "" {
session.Where(builder.Eq{"`key`": params.Key})
}
_, err := session.Delete(&entity.PluginKVStorage{})
return err
}
// GetByGroup retrieves all key-value pairs for a specific group with pagination support.
// Returns a map of keys to values or an error if the group is empty or not found.
func (kv *KVOperator) GetByGroup(ctx context.Context, params KVParams) (map[string]string, error) {
if params.Group == "" {
return nil, ErrKVGroupEmpty
}
if params.Page < 1 {
params.Page = 1
}
if params.PageSize < 1 {
params.PageSize = 10
}
query, cleanup := kv.getSession(ctx)
defer cleanup()
var items []entity.PluginKVStorage
err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": params.Group}).
Limit(params.PageSize, (params.Page-1)*params.PageSize).
OrderBy("id ASC").
Find(&items)
if err != nil {
return nil, err
}
result := make(map[string]string, len(items))
for _, item := range items {
result[item.Key] = item.Value
}
return result, nil
}
// Tx executes a function within a transaction context. If the KVOperator already has a session,
// it will use that session. Otherwise, it creates a new transaction session.
// The transaction will be committed if the function returns nil, or rolled back if it returns an error.
func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error {
var (
txKv = kv
shouldCommit bool
)
if kv.session == nil {
session := kv.data.DB.NewSession().Context(ctx)
if err := session.Begin(); err != nil {
session.Close()
return fmt.Errorf("%w: begin transaction failed: %v", ErrKVTransactionFailed, err)
}
defer func() {
if !shouldCommit {
if rollbackErr := session.Rollback(); rollbackErr != nil {
log.Errorf("rollback failed: %v", rollbackErr)
}
}
session.Close()
}()
txKv = &KVOperator{
session: session,
data: kv.data,
pluginSlugName: kv.pluginSlugName,
}
shouldCommit = true
}
if err := fn(ctx, txKv); err != nil {
return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err)
}
if shouldCommit {
if err := txKv.session.Commit(); err != nil {
return fmt.Errorf("%w: commit failed: %v", ErrKVTransactionFailed, err)
}
}
return nil
}
// KVStorage defines the interface for plugins that need data storage capabilities
type KVStorage interface {
Info() Info
SetOperator(operator *KVOperator)
}
var (
CallKVStorage,
registerKVStorage = MakePlugin[KVStorage](true)
)
// NewKVOperator creates a new KV storage operator with the specified database engine, cache and plugin name.
// It returns a KVOperator instance that can be used to interact with the plugin's storage.
func NewKVOperator(db *xorm.Engine, cache cache.Cache, pluginSlugName string) *KVOperator {
return &KVOperator{
data: &Data{DB: db, Cache: cache},
pluginSlugName: pluginSlugName,
cacheTTL: 30 * time.Minute,
}
}