blob: 8d0c78691c19620fcd64efdaf034e4df8a8e2255 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package store
import (
"net/url"
"strings"
"time"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"github.com/pkg/errors"
"go.uber.org/multierr"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/plugins/resources/k8s"
"github.com/apache/dubbo-kubernetes/pkg/config/plugins/resources/mysql"
"github.com/apache/dubbo-kubernetes/pkg/config/plugins/resources/zookeeper"
config_types "github.com/apache/dubbo-kubernetes/pkg/config/types"
)
var _ config.Config = &StoreConfig{}
type StoreType = string
const (
KubernetesStore StoreType = "kubernetes"
MemoryStore StoreType = "memory"
MyStore StoreType = "mysql"
Traditional StoreType = "traditional"
)
// StoreConfig defines Resource Store configuration
type StoreConfig struct {
// Type of Store used in the Control Plane. Can be either "kubernetes", "postgres" or "memory"
Type StoreType `json:"type" envconfig:"dubbo_store_type"`
// Kubernetes Store configuration
Kubernetes *k8s.KubernetesStoreConfig `json:"kubernetes"`
// Zookeeper Store configuration
Zookeeper *zookeeper.ZookeeperStoreConfig `json:"zookeeper"`
// Mysql Store configuration
Mysql *mysql.MysqlStoreConfig `json:"mysql"`
Traditional Registry `json:"traditional"`
// Cache configuration
Cache CacheStoreConfig `json:"cache"`
// Upsert configuration
Upsert UpsertConfig `json:"upsert"`
// UnsafeDelete skips validation of resource delete.
// For example you don't have to delete all Dataplane objects before you delete a Mesh
UnsafeDelete bool `json:"unsafeDelete" envconfig:"dubbo_store_unsafe_delete"`
}
func DefaultStoreConfig() *StoreConfig {
return &StoreConfig{
Type: KubernetesStore,
Kubernetes: k8s.DefaultKubernetesStoreConfig(),
Cache: DefaultCacheStoreConfig(),
Upsert: DefaultUpsertConfig(),
Mysql: DefaultMysqlConfig(),
Traditional: DefaultTraditionalConfig(),
}
}
func (s *StoreConfig) Sanitize() {
s.Kubernetes.Sanitize()
s.Cache.Sanitize()
}
func (s *StoreConfig) PostProcess() error {
return multierr.Combine(
s.Kubernetes.PostProcess(),
s.Cache.PostProcess(),
)
}
func (s *StoreConfig) Validate() error {
switch s.Type {
case KubernetesStore:
if err := s.Kubernetes.Validate(); err != nil {
return errors.Wrap(err, "Kubernetes validation failed")
}
case MemoryStore:
return nil
case MyStore:
default:
return errors.Errorf("Type should be either %s or %s", KubernetesStore, MemoryStore)
}
if err := s.Cache.Validate(); err != nil {
return errors.Wrap(err, "Cache validation failed")
}
return nil
}
var _ config.Config = &CacheStoreConfig{}
type CacheStoreConfig struct {
config.BaseConfig
Enabled bool `json:"enabled" envconfig:"dubbo_store_cache_enabled"`
ExpirationTime config_types.Duration `json:"expirationTime" envconfig:"dubbo_store_cache_expiration_time"`
}
func DefaultCacheStoreConfig() CacheStoreConfig {
return CacheStoreConfig{
Enabled: true,
ExpirationTime: config_types.Duration{Duration: time.Second},
}
}
func DefaultUpsertConfig() UpsertConfig {
return UpsertConfig{
ConflictRetryBaseBackoff: config_types.Duration{Duration: 200 * time.Millisecond},
ConflictRetryMaxTimes: 10,
ConflictRetryJitterPercent: 30,
}
}
func DefaultMysqlConfig() *mysql.MysqlStoreConfig {
return &mysql.MysqlStoreConfig{
MysqlDsn: "127.0.0.1:6379",
}
}
func DefaultTraditionalConfig() Registry {
return Registry{
ConfigCenter: "zookeeper://127.0.0.1:2181",
Registry: AddressConfig{
Address: "zookeeper://127.0.0.1:2181",
},
MetadataReport: AddressConfig{
Address: "zookeeper://127.0.0.1:2181",
},
}
}
type UpsertConfig struct {
config.BaseConfig
// Base time for exponential backoff on upsert (get and update) operations when retry is enabled
ConflictRetryBaseBackoff config_types.Duration `json:"conflictRetryBaseBackoff" envconfig:"dubbo_store_upsert_conflict_retry_base_backoff"`
// Max retries on upsert (get and update) operation when retry is enabled
ConflictRetryMaxTimes uint `json:"conflictRetryMaxTimes" envconfig:"dubbo_store_upsert_conflict_retry_max_times"`
// Percentage of jitter. For example: if backoff is 20s, and this value 10, the backoff will be between 18s and 22s.
ConflictRetryJitterPercent uint `json:"conflictRetryJitterPercent" envconfig:"dubbo_store_upsert_conflict_retry_jitter_percent"`
}
func (u *UpsertConfig) Validate() error {
if u.ConflictRetryBaseBackoff.Duration < 0 {
return errors.New("RetryBaseBackoff cannot be lower than 0")
}
return nil
}
var _ config.Config = &UpsertConfig{}
type Registry struct {
ConfigCenter string `json:"config_center,omitempty"`
MetadataReport AddressConfig `json:"metadata_report,omitempty"`
Registry AddressConfig `json:"registry,omitempty"`
}
func (r *Registry) Sanitize() {}
func (r *Registry) Validate() error {
return nil
}
func (r *Registry) PostProcess() error {
return nil
}
var _ config.Config = &Registry{}
type AddressConfig struct {
Address string `json:"address,omitempty"`
Url *url.URL `json:"-"`
}
func (a *AddressConfig) Sanitize() {}
var _ config.Config = &AddressConfig{}
func (a *AddressConfig) PostProcess() error {
return nil
}
func (a *AddressConfig) Validate() error {
return nil
}
func (c *AddressConfig) GetProtocol() string {
return c.Url.Scheme
}
func (c *AddressConfig) GetAddress() string {
return c.Url.Host
}
func (c *AddressConfig) GetUrlMap() url.Values {
urlMap := url.Values{}
urlMap.Set(constant.ConfigNamespaceKey, c.param("namespace", ""))
urlMap.Set(constant.ConfigGroupKey, c.param(constant.GroupKey, "dubbo"))
urlMap.Set(constant.MetadataReportGroupKey, c.param(constant.GroupKey, "dubbo"))
urlMap.Set(constant.ClientNameKey, clientNameID(c.Url.Scheme, c.Url.Host))
return urlMap
}
func (c *AddressConfig) param(key string, defaultValue string) string {
param := c.Url.Query().Get(key)
if len(param) > 0 {
return param
}
return defaultValue
}
func (c *AddressConfig) ToURL() (*common.URL, error) {
return common.NewURL(c.GetAddress(),
common.WithProtocol(c.GetProtocol()),
common.WithParams(c.GetUrlMap()),
common.WithParamsValue("registry", c.GetProtocol()),
common.WithUsername(c.param("username", "")),
common.WithPassword(c.param("password", "")),
)
}
func clientNameID(protocol, address string) string {
return strings.Join([]string{protocol, address}, "-")
}