Merge pull request #829 from zouyx/feature/to1.5.4
Fix: For release bug fix version - v1.5.4
diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go
index d385054..36b4b13 100644
--- a/filter/filter_impl/generic_filter.go
+++ b/filter/filter_impl/generic_filter.go
@@ -21,6 +21,7 @@
"context"
"reflect"
"strings"
+ "time"
)
import (
@@ -93,13 +94,21 @@
if t.Kind() == reflect.Struct {
result := make(map[string]interface{}, t.NumField())
for i := 0; i < t.NumField(); i++ {
- if v.Field(i).Kind() == reflect.Struct || v.Field(i).Kind() == reflect.Slice || v.Field(i).Kind() == reflect.Map {
- if v.Field(i).CanInterface() {
- setInMap(result, t.Field(i), struct2MapAll(v.Field(i).Interface()))
+ field := t.Field(i)
+ value := v.Field(i)
+ kind := value.Kind()
+ if kind == reflect.Struct || kind == reflect.Slice || kind == reflect.Map {
+ if value.CanInterface() {
+ tmp := value.Interface()
+ if _, ok := tmp.(time.Time); ok {
+ setInMap(result, field, tmp)
+ continue
+ }
+ setInMap(result, field, struct2MapAll(tmp))
}
} else {
- if v.Field(i).CanInterface() {
- setInMap(result, t.Field(i), v.Field(i).Interface())
+ if value.CanInterface() {
+ setInMap(result, field, value.Interface())
}
}
}
diff --git a/filter/filter_impl/generic_filter_test.go b/filter/filter_impl/generic_filter_test.go
index e407332..40cf743 100644
--- a/filter/filter_impl/generic_filter_test.go
+++ b/filter/filter_impl/generic_filter_test.go
@@ -20,6 +20,7 @@
import (
"reflect"
"testing"
+ "time"
)
import (
@@ -38,6 +39,8 @@
Xx string `m:"xx"`
} `m:"xxYy"`
} `m:"caCa"`
+ DaDa time.Time
+ EeEe int
}
testData.AaAa = "1"
testData.BaBa = "1"
@@ -45,6 +48,8 @@
testData.CaCa.AaAa = "2"
testData.CaCa.XxYy.xxXx = "3"
testData.CaCa.XxYy.Xx = "3"
+ testData.DaDa = time.Date(2020, 10, 29, 2, 34, 0, 0, time.Local)
+ testData.EeEe = 100
m := struct2MapAll(testData).(map[string]interface{})
assert.Equal(t, "1", m["aaAa"].(string))
assert.Equal(t, "1", m["baBa"].(string))
@@ -53,6 +58,8 @@
assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"]).Kind())
assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"].(map[string]interface{})["xxYy"]).Kind())
+ assert.Equal(t, "2020-10-29 02:34:00", m["daDa"].(time.Time).Format("2006-01-02 15:04:05"))
+ assert.Equal(t, 100, m["eeEe"].(int))
}
type testStruct struct {
diff --git a/go.mod b/go.mod
index 2ac1f85..317431f 100644
--- a/go.mod
+++ b/go.mod
@@ -4,12 +4,12 @@
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
- github.com/alibaba/sentinel-golang v0.6.1
+ github.com/alibaba/sentinel-golang v0.6.2
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.3.0
- github.com/dubbogo/go-zookeeper v1.0.1
+ github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/gost v1.9.1
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
diff --git a/go.sum b/go.sum
index fa3b0d8..20d5345 100644
--- a/go.sum
+++ b/go.sum
@@ -91,8 +91,8 @@
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
-github.com/alibaba/sentinel-golang v0.6.1 h1:Pxyw2X7ryklvToF40KG9l4uuO90jRZA2MWb8Z3d1wPo=
-github.com/alibaba/sentinel-golang v0.6.1/go.mod h1:5jemKdyCQCKVf+quEia53fo9a17OSe+wnl9HX2NbNpc=
+github.com/alibaba/sentinel-golang v0.6.2 h1:1OjjpljJbNKWp9p5RJKxOqS1gHGZPUWPlCcokv5xYJs=
+github.com/alibaba/sentinel-golang v0.6.2/go.mod h1:5jemKdyCQCKVf+quEia53fo9a17OSe+wnl9HX2NbNpc=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190808125512-07798873deee/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
@@ -208,8 +208,8 @@
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
-github.com/dubbogo/go-zookeeper v1.0.1 h1:irLzvOsDOTNsN8Sv9tvYYxVu6DCQfLtziZQtUHmZgz8=
-github.com/dubbogo/go-zookeeper v1.0.1/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
+github.com/dubbogo/go-zookeeper v1.0.2 h1:xmEnPL8SlCe3/+J5ZR9e8qE35LmFVYe8VVpDakjNM4A=
+github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.1 h1:0/PPFo13zPbjt4Ia0zYWMFi3C6rAe9X7O1J2Iv+BHNM=
github.com/dubbogo/gost v1.9.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
diff --git a/metadata/service/exporter/configurable/exporter.go b/metadata/service/exporter/configurable/exporter.go
index 5a930c5..75e52d8 100644
--- a/metadata/service/exporter/configurable/exporter.go
+++ b/metadata/service/exporter/configurable/exporter.go
@@ -19,6 +19,7 @@
import (
"context"
+ "errors"
"sync"
)
@@ -46,12 +47,18 @@
}
// Export will export the metadataService
-func (exporter *MetadataServiceExporter) Export() error {
+func (exporter *MetadataServiceExporter) Export(url *common.URL) error {
if !exporter.IsExported() {
serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background())
serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
+ if url == nil || url.SubURL == nil {
+ return errors.New("metadata server url is nil, pls check your configuration")
+ }
serviceConfig.Protocols = map[string]*config.ProtocolConfig{
- constant.DEFAULT_PROTOCOL: generateMetadataProtocol(),
+ constant.DEFAULT_PROTOCOL: {
+ Name: url.SubURL.Protocol,
+ Port: url.SubURL.Port,
+ },
}
serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME
// identify this is a golang server
@@ -95,11 +102,3 @@
defer exporter.lock.RUnlock()
return exporter.ServiceConfig != nil && exporter.ServiceConfig.IsExport()
}
-
-// generateMetadataProtocol will return a default ProtocolConfig
-func generateMetadataProtocol() *config.ProtocolConfig {
- return &config.ProtocolConfig{
- Name: constant.DEFAULT_PROTOCOL,
- Port: "20000",
- }
-}
diff --git a/metadata/service/exporter/configurable/exporter_test.go b/metadata/service/exporter/configurable/exporter_test.go
index b304b91..ceda255 100644
--- a/metadata/service/exporter/configurable/exporter_test.go
+++ b/metadata/service/exporter/configurable/exporter_test.go
@@ -26,6 +26,7 @@
)
import (
+ "github.com/apache/dubbo-go/common"
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/filter/filter_impl"
@@ -55,12 +56,23 @@
mockInitProviderWithSingleRegistry()
metadataService, _ := inmemory.NewMetadataService()
exported := NewMetadataServiceExporter(metadataService)
- assert.Equal(t, false, exported.IsExported())
- assert.NoError(t, exported.Export())
- assert.Equal(t, true, exported.IsExported())
- assert.Regexp(t, "dubbo://:20000/MetadataService*", exported.GetExportedURLs()[0].String())
- exported.Unexport()
- assert.Equal(t, false, exported.IsExported())
+
+ t.Run("configurableExporterUrlNil", func(t *testing.T) {
+ assert.Equal(t, false, exported.IsExported())
+ assert.Error(t, exported.Export(nil), "metadata server url is nil, pls check your configuration")
+ })
+
+ t.Run("configurableExporter", func(t *testing.T) {
+ registryURL, _ := common.NewURL("service-discovery://localhost:12345")
+ subURL, _ := common.NewURL("dubbo://localhost:20003")
+ registryURL.SubURL = &subURL
+ assert.Equal(t, false, exported.IsExported())
+ assert.NoError(t, exported.Export(®istryURL))
+ assert.Equal(t, true, exported.IsExported())
+ assert.Regexp(t, "dubbo://:20003/MetadataService*", exported.GetExportedURLs()[0].String())
+ exported.Unexport()
+ assert.Equal(t, false, exported.IsExported())
+ })
}
// mockInitProviderWithSingleRegistry will init a mocked providerConfig
diff --git a/metadata/service/exporter/exporter.go b/metadata/service/exporter/exporter.go
index cfdef3a..33ceaca 100644
--- a/metadata/service/exporter/exporter.go
+++ b/metadata/service/exporter/exporter.go
@@ -23,7 +23,7 @@
// MetadataServiceExporter will export & unexport the metadata service, get exported url, and return is exported or not
type MetadataServiceExporter interface {
- Export() error
+ Export(url *common.URL) error
Unexport()
GetExportedURLs() []*common.URL
IsExported() bool
diff --git a/registry/base_registry.go b/registry/base_registry.go
index ad1a3b6..797ffb2 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -326,7 +326,16 @@
}
host += ":" + c.Port
- rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
+ //delete empty param key
+ for key, val := range params {
+ if len(val) > 0 && val[0] == "" {
+ params.Del(key)
+ }
+ }
+
+ s, _ := url.QueryUnescape(params.Encode())
+ rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+
// Print your own registration service providers.
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
diff --git a/registry/consul/registry.go b/registry/consul/registry.go
index c425c5e..b92e335 100644
--- a/registry/consul/registry.go
+++ b/registry/consul/registry.go
@@ -36,7 +36,8 @@
)
const (
- registryConnDelay = 3
+ registryConnDelay = 3
+ registryDestroyDefaultTimeout = time.Second * 3
)
func init() {
@@ -187,5 +188,25 @@
// Destroy consul registry center
func (r *consulRegistry) Destroy() {
+ if r.URL != nil {
+ done := make(chan struct{}, 1)
+ go func() {
+ defer func() {
+ if e := recover(); e != nil {
+ logger.Errorf("consulRegistry destory with panic: %v", e)
+ }
+ done <- struct{}{}
+ }()
+ if err := r.UnRegister(*r.URL); err != nil {
+ logger.Errorf("consul registry unregister with err: %s", err.Error())
+ }
+ }()
+ select {
+ case <-done:
+ logger.Infof("consulRegistry unregister done")
+ case <-time.After(registryDestroyDefaultTimeout):
+ logger.Errorf("consul unregister timeout")
+ }
+ }
close(r.done)
}
diff --git a/registry/consul/registry_test.go b/registry/consul/registry_test.go
index 94718f5..b300f75 100644
--- a/registry/consul/registry_test.go
+++ b/registry/consul/registry_test.go
@@ -55,3 +55,19 @@
assert.NoError(suite.t, err)
suite.listener = listener
}
+
+func (suite *consulRegistryTestSuite) testDestroy() {
+ consumerRegistryUrl := newConsumerRegistryUrl(registryHost, registryPort)
+ consumerRegistry, _ := newConsulRegistry(consumerRegistryUrl)
+ consulRegistryImp := consumerRegistry.(*consulRegistry)
+ assert.True(suite.t, consulRegistryImp.IsAvailable())
+ consulRegistryImp.Destroy()
+ assert.False(suite.t, consulRegistryImp.IsAvailable())
+
+ consumerRegistry, _ = newConsulRegistry(consumerRegistryUrl)
+ consulRegistryImp = consumerRegistry.(*consulRegistry)
+ consulRegistryImp.URL = nil
+ assert.True(suite.t, consulRegistryImp.IsAvailable())
+ consulRegistryImp.Destroy()
+ assert.False(suite.t, consulRegistryImp.IsAvailable())
+}
diff --git a/registry/consul/utils_test.go b/registry/consul/utils_test.go
index 939352d..0e5bffe 100644
--- a/registry/consul/utils_test.go
+++ b/registry/consul/utils_test.go
@@ -163,6 +163,7 @@
suite.testListener(remoting.EventTypeAdd)
suite.testUnregister()
suite.testListener(remoting.EventTypeDel)
+ suite.testDestroy()
}
// subscribe -> register -> unregister
@@ -183,6 +184,7 @@
suite.testListener(remoting.EventTypeAdd)
suite.testUnregister()
suite.testListener(remoting.EventTypeDel)
+ suite.testDestroy()
}
func TestConsulRegistry(t *testing.T) {
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 9cbc494..f3cc379 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -91,7 +91,7 @@
r,
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
- etcdv3.WithEndpoints(url.Location),
+ etcdv3.WithEndpoints(strings.Split(url.Location, ",")...),
); err != nil {
return nil, err
}
diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go
index dceaa99..e8d4aea 100644
--- a/registry/etcdv3/service_discovery.go
+++ b/registry/etcdv3/service_discovery.go
@@ -19,6 +19,7 @@
import (
"fmt"
+ "strings"
"sync"
"time"
)
@@ -313,7 +314,7 @@
client := etcdv3.NewServiceDiscoveryClient(
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
- etcdv3.WithEndpoints(remoteConfig.Address),
+ etcdv3.WithEndpoints(strings.Split(remoteConfig.Address, ",")...),
)
descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address)
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index 7576804..4db2c5a 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -75,7 +75,7 @@
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
- tryInitMetadataService()
+ tryInitMetadataService(url)
serviceDiscovery, err := creatServiceDiscovery(url)
if err != nil {
@@ -642,7 +642,7 @@
// tryInitMetadataService will try to initialize metadata service
// TODO (move to somewhere)
-func tryInitMetadataService() {
+func tryInitMetadataService(url *common.URL) {
ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
if err != nil {
@@ -662,7 +662,7 @@
expt := configurable.NewMetadataServiceExporter(ms)
- err = expt.Export()
+ err = expt.Export(url)
if err != nil {
logger.Errorf("could not export the metadata service", err)
}
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index 52b1cce..614ba9a 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -63,7 +63,7 @@
r.ClientLock().Lock()
clientName := RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
- endpoint := r.GetUrl().Location
+ endpoints := r.Client().endpoints
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
@@ -80,11 +80,11 @@
err = ValidateClient(
r,
WithName(clientName),
- WithEndpoints(endpoint),
+ WithEndpoints(endpoints...),
WithTimeout(timeout),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
- endpoint, perrors.WithStack(err))
+ endpoints, perrors.WithStack(err))
if err == nil && r.RestartCallBack() {
break
}
diff --git a/remoting/getty/pool.go b/remoting/getty/pool.go
index 464cff9..a072432 100644
--- a/remoting/getty/pool.go
+++ b/remoting/getty/pool.go
@@ -385,6 +385,7 @@
if d := now - conn.getActive(); d > p.ttl {
p.remove(conn)
go conn.close()
+ num = len(p.conns)
continue
}
conn.updateActive(now) //update active time
diff --git a/remoting/getty/pool_test.go b/remoting/getty/pool_test.go
new file mode 100644
index 0000000..1115a49
--- /dev/null
+++ b/remoting/getty/pool_test.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+func TestGetConnFromPool(t *testing.T) {
+ var rpcClient Client
+
+ clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second))
+
+ var conn1 gettyRPCClient
+ conn1.active = time.Now().Unix()
+ clientPoll.put(&conn1)
+ assert.Equal(t, 1, len(clientPoll.conns))
+
+ var conn2 gettyRPCClient
+ conn2.active = time.Now().Unix()
+ clientPoll.put(&conn2)
+ assert.Equal(t, 1, len(clientPoll.conns))
+ conn, err := clientPoll.get()
+ assert.Nil(t, err)
+ assert.Equal(t, &conn1, conn)
+ time.Sleep(6 * time.Second)
+ conn, err = clientPoll.get()
+ assert.Nil(t, conn)
+ assert.Nil(t, err)
+ assert.Equal(t, 0, len(clientPoll.conns))
+}