blob: cdd29ab2e483647ed90f37032e10d174f7583e39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package delegate
import (
"encoding/json"
"runtime/debug"
"sync"
"time"
)
import (
"github.com/go-co-op/gocron"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
)
const (
// defaultMetadataReportRetryTimes is defined for max times to retry
defaultMetadataReportRetryTimes int64 = 100
// defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second
defaultMetadataReportRetryPeriod int64 = 3
// defaultMetadataReportRetryPeriod is defined for cycle report or not
defaultMetadataReportCycleReport bool = true
)
// metadataReportRetry is a scheduler for retrying task
type metadataReportRetry struct {
retryPeriod int64
retryLimit int64
scheduler *gocron.Scheduler
job *gocron.Job
retryCounter *atomic.Int64
// if no failed report, wait how many times to run retry task.
retryTimesIfNonFail int64
}
// newMetadataReportRetry will create a scheduler for retry task
func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) {
s1 := gocron.NewScheduler(time.UTC)
mrr := &metadataReportRetry{
retryPeriod: retryPeriod,
retryLimit: retryLimit,
scheduler: s1,
retryCounter: atomic.NewInt64(0),
retryTimesIfNonFail: 600,
}
newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do(
func() {
mrr.retryCounter.Inc()
logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load())
if mrr.retryCounter.Load() > mrr.retryLimit {
mrr.scheduler.Clear()
} else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail {
mrr.scheduler.Clear() // may not interrupt the running job
}
})
mrr.job = newJob
return mrr, err
}
// startRetryTask will make scheduler with retry task run
func (mrr *metadataReportRetry) startRetryTask() {
mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
mrr.scheduler.Start()
}
// MetadataReport is a absolute delegate for MetadataReport
type MetadataReport struct {
reportUrl common.URL
syncReport bool
metadataReportRetry *metadataReportRetry
failedReports map[*identifier.MetadataIdentifier]interface{}
failedReportsLock sync.RWMutex
// allMetadataReports store all the metdadata reports records in memory
allMetadataReports map[*identifier.MetadataIdentifier]interface{}
allMetadataReportsLock sync.RWMutex
}
// NewMetadataReport will create a MetadataReport with initiation
func NewMetadataReport() (*MetadataReport, error) {
url := instance.GetMetadataReportUrl()
bmr := &MetadataReport{
reportUrl: url,
syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false),
failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
}
mrr, err := newMetadataReportRetry(
url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod),
url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes),
bmr.retry,
)
if err != nil {
return nil, err
}
bmr.metadataReportRetry = mrr
if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) {
scheduler := gocron.NewScheduler(time.UTC)
_, err := scheduler.Every(1).Day().Do(
func() {
logger.Infof("start to publish all metadata in metadata report %s.", url.String())
bmr.allMetadataReportsLock.RLock()
bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
bmr.allMetadataReportsLock.RUnlock()
})
if err != nil {
return nil, err
}
scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
scheduler.Start()
}
return bmr, nil
}
// retry will do metadata failed reports collection by call metadata report sdk
func (mr *MetadataReport) retry() bool {
mr.failedReportsLock.RLock()
defer mr.failedReportsLock.RUnlock()
return mr.doHandlerMetadataCollection(mr.failedReports)
}
// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition
func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
if mr.syncReport {
mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
go mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
// storeMetadataTask will delegate to call remote metadata's sdk to store
func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer)
mr.allMetadataReportsLock.Lock()
mr.allMetadataReports[identifier] = definer
mr.allMetadataReportsLock.Unlock()
mr.failedReportsLock.Lock()
delete(mr.failedReports, identifier)
mr.failedReportsLock.Unlock()
// data is store the json marshaled definition
var (
data []byte
err error
)
defer func() {
if r := recover(); r != nil {
mr.failedReportsLock.Lock()
mr.failedReports[identifier] = definer
mr.failedReportsLock.Unlock()
mr.metadataReportRetry.startRetryTask()
logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n",
identifier, string(data), r, string(debug.Stack()))
}
}()
data, err = json.Marshal(definer)
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err)
panic(err)
}
report := instance.GetMetadataReportInstance()
if role == common.PROVIDER {
err = report.StoreProviderMetadata(identifier, string(data))
} else if role == common.CONSUMER {
err = report.StoreConsumerMetadata(identifier, string(data))
}
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err)
panic(err)
}
}
// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition
func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
if mr.syncReport {
mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
go mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata
func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.SaveServiceMetadata(identifier, url)
}
go report.SaveServiceMetadata(identifier, url)
return nil
}
// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata
func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.RemoveServiceMetadata(identifier)
}
go report.RemoveServiceMetadata(identifier)
return nil
}
// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}
// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
urlStrList := make([]string, 0, len(urls))
for _, url := range urls {
urlStrList = append(urlStrList, url.String())
}
bytes, err := json.Marshal(urlStrList)
if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.SaveSubscribedData(identifier, string(bytes))
}
go report.SaveSubscribedData(identifier, string(bytes))
return nil
}
// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetSubscribedURLs(identifier)
}
// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) (string, error) {
report := instance.GetMetadataReportInstance()
return report.GetServiceDefinition(identifier)
}
// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap
func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
if len(metadataMap) == 0 {
return true
}
for e := range metadataMap {
if common.RoleType(common.PROVIDER).Role() == e.Side {
mr.StoreProviderMetadata(e, metadataMap[e].(*definition.ServiceDefinition))
} else if common.RoleType(common.CONSUMER).Role() == e.Side {
mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
}
}
return false
}