Merge pull request #1076 from wenxuwan/move_etcd_to_gost
move etcd client to gost
diff --git a/go.mod b/go.mod
index 35f179c..609f646 100644
--- a/go.mod
+++ b/go.mod
@@ -12,7 +12,8 @@
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.3
- github.com/dubbogo/gost v1.11.2
+ github.com/dubbogo/gost v1.11.3
+ github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.4.0
github.com/fsnotify/fsnotify v1.4.9
github.com/go-co-op/gocron v0.1.1
@@ -49,5 +50,5 @@
github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0
github.com/shirou/gopsutil => github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880
go.etcd.io/bbolt v1.3.4 => github.com/coreos/bbolt v1.3.3
- google.golang.org/grpc => google.golang.org/grpc v1.26.0
+ google.golang.org/grpc v1.33.1 => google.golang.org/grpc v1.26.0
)
diff --git a/go.sum b/go.sum
index 090ef8b..d8f40ef 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,4 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
@@ -124,6 +125,8 @@
github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
@@ -177,8 +180,8 @@
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
-github.com/dubbogo/gost v1.11.2 h1:NanyHmvzE1HrgI2T9H/jE/N1wkxFEj+IbM1A4RT9H7Q=
-github.com/dubbogo/gost v1.11.2/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc=
+github.com/dubbogo/gost v1.11.3 h1:PSP9KQyuRJugmPLqC18MFgoIL0g1G4n/9FTKgQYjjbE=
+github.com/dubbogo/gost v1.11.3/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
@@ -187,8 +190,9 @@
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
-github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
+github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
+github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633 h1:H2pdYOb3KQ1/YsqVWoWNLQO+fusocsw354rqGTZtAgw=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@@ -506,6 +510,7 @@
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linode/linodego v0.7.1 h1:4WZmMpSA2NRwlPZcc0+4Gyn7rr99Evk9bnr0B3gXRKE=
github.com/linode/linodego v0.7.1/go.mod h1:ga11n3ivecUrPCHN0rANxKmfWBJVkOXfLMZinAbj2sY=
+github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
@@ -837,6 +842,7 @@
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -899,6 +905,7 @@
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -958,9 +965,11 @@
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@@ -999,6 +1008,7 @@
google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -1019,8 +1029,23 @@
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 h1:fiNLklpBwWK1mth30Hlwk+fcdBmIALlgF5iy77O37Ig=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
+google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
+google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -1067,6 +1092,7 @@
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go
index 1939b91..604e6da 100644
--- a/metadata/report/etcd/report.go
+++ b/metadata/report/etcd/report.go
@@ -23,6 +23,10 @@
)
import (
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
+)
+
+import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
@@ -30,7 +34,6 @@
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
- "github.com/apache/dubbo-go/remoting/etcdv3"
)
const DEFAULT_ROOT = "dubbo"
@@ -43,7 +46,7 @@
// etcdMetadataReport is the implementation of MetadataReport based etcd
type etcdMetadataReport struct {
- client *etcdv3.Client
+ client *gxetcd.Client
root string
}
@@ -121,7 +124,7 @@
func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
timeout, _ := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
addresses := strings.Split(url.Location, ",")
- client, err := etcdv3.NewClient(etcdv3.MetadataETCDV3Client, addresses, timeout, 1)
+ client, err := gxetcd.NewClient(gxetcd.MetadataETCDV3Client, addresses, timeout, 1)
if err != nil {
logger.Errorf("Could not create etcd metadata report. URL: %s,error:{%v}", url.String(), err)
return nil
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 46ba6b6..c51d1fc 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -26,6 +26,7 @@
)
import (
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
@@ -50,7 +51,7 @@
type etcdV3Registry struct {
registry.BaseRegistry
cltLock sync.Mutex
- client *etcdv3.Client
+ client *gxetcd.Client
listenerLock sync.RWMutex
listener *etcdv3.EventListener
dataListener *dataListener
@@ -58,12 +59,12 @@
}
// Client gets the etcdv3 client
-func (r *etcdV3Registry) Client() *etcdv3.Client {
+func (r *etcdV3Registry) Client() *gxetcd.Client {
return r.client
}
// SetClient sets the etcdv3 client
-func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
+func (r *etcdV3Registry) SetClient(client *gxetcd.Client) {
r.client = client
}
@@ -88,9 +89,9 @@
if err := etcdv3.ValidateClient(
r,
- etcdv3.WithName(etcdv3.RegistryETCDV3Client),
- etcdv3.WithTimeout(timeout),
- etcdv3.WithEndpoints(strings.Split(url.Location, ",")...),
+ gxetcd.WithName(gxetcd.RegistryETCDV3Client),
+ gxetcd.WithTimeout(timeout),
+ gxetcd.WithEndpoints(strings.Split(url.Location, ",")...),
); err != nil {
return nil, err
}
diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go
index c9c3d43..5e3e98f 100644
--- a/registry/etcdv3/service_discovery.go
+++ b/registry/etcdv3/service_discovery.go
@@ -26,6 +26,7 @@
import (
gxset "github.com/dubbogo/gost/container/set"
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
perrors "github.com/pkg/errors"
@@ -56,7 +57,7 @@
// descriptor is a short string about the basic information of this instance
descriptor string
// client is current Etcdv3 client
- client *etcdv3.Client
+ client *gxetcd.Client
// serviceInstance is current serviceInstance
serviceInstance *registry.ServiceInstance
// services is when register or update will add service name
@@ -307,9 +308,9 @@
logger.Infof("etcd address is: %v,timeout is:%s", remoteConfig.Address, timeout.String())
client := etcdv3.NewServiceDiscoveryClient(
- etcdv3.WithName(etcdv3.RegistryETCDV3Client),
- etcdv3.WithTimeout(timeout),
- etcdv3.WithEndpoints(strings.Split(remoteConfig.Address, ",")...),
+ gxetcd.WithName(gxetcd.RegistryETCDV3Client),
+ gxetcd.WithTimeout(timeout),
+ gxetcd.WithEndpoints(strings.Split(remoteConfig.Address, ",")...),
)
descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address)
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 34ee31b..3acdbc4 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -18,110 +18,42 @@
package etcdv3
import (
- "context"
- "sync"
- "time"
-)
-
-import (
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/clientv3/concurrency"
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
- "google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
-const (
- // ConnDelay connection delay
- ConnDelay = 3
- // MaxFailTimes max failure times
- MaxFailTimes = 15
- // RegistryETCDV3Client client name
- RegistryETCDV3Client = "etcd registry"
- // metadataETCDV3Client client name
- MetadataETCDV3Client = "etcd metadata"
-)
-
-var (
- // Defines related errors
- ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
- ErrKVPairNotFound = perrors.New("k/v pair not found")
-)
-
-// nolint
-type Options struct {
- name string
- endpoints []string
- client *Client
- timeout time.Duration
- heartbeat int // heartbeat second
-}
-
-// Option will define a function of handling Options
-type Option func(*Options)
-
-// WithEndpoints sets etcd client endpoints
-func WithEndpoints(endpoints ...string) Option {
- return func(opt *Options) {
- opt.endpoints = endpoints
- }
-}
-
-// WithName sets etcd client name
-func WithName(name string) Option {
- return func(opt *Options) {
- opt.name = name
- }
-}
-
-// WithTimeout sets etcd client timeout
-func WithTimeout(timeout time.Duration) Option {
- return func(opt *Options) {
- opt.timeout = timeout
- }
-}
-
-// WithHeartbeat sets etcd client heartbeat
-func WithHeartbeat(heartbeat int) Option {
- return func(opt *Options) {
- opt.heartbeat = heartbeat
- }
-}
-
// ValidateClient validates client and sets options
-func ValidateClient(container clientFacade, opts ...Option) error {
- options := &Options{
- heartbeat: 1, // default heartbeat
- }
+func ValidateClient(container clientFacade, opts ...gxetcd.Option) error {
+ options := &gxetcd.Options{}
for _, opt := range opts {
opt(options)
}
-
lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
// new Client
if container.Client() == nil {
- newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
+ newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
- options.name, options.endpoints, options.timeout, err)
- return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
+ options.Name, options.Endpoints, options.Timeout, err)
+ return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints)
}
container.SetClient(newClient)
}
// Client lose connection with etcd server
- if container.Client().rawClient == nil {
- newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
+ if container.Client().GetRawClient() == nil {
+ newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
- options.name, options.endpoints, options.timeout, err)
- return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
+ options.Name, options.Endpoints, options.Timeout, err)
+ return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints)
}
container.SetClient(newClient)
}
@@ -130,368 +62,18 @@
}
// nolint
-func NewServiceDiscoveryClient(opts ...Option) *Client {
- options := &Options{
- heartbeat: 1, // default heartbeat
+func NewServiceDiscoveryClient(opts ...gxetcd.Option) *gxetcd.Client {
+ options := &gxetcd.Options{
+ Heartbeat: 1, // default heartbeat
}
for _, opt := range opts {
opt(options)
}
- newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
+ newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
- options.name, options.endpoints, options.timeout, err)
+ options.Name, options.Endpoints, options.Timeout, err)
}
return newClient
}
-
-// Client represents etcd client Configuration
-type Client struct {
- lock sync.RWMutex
-
- // these properties are only set once when they are started.
- name string
- endpoints []string
- timeout time.Duration
- heartbeat int
-
- ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
- cancel context.CancelFunc // cancel the ctx, all watcher will stopped
- rawClient *clientv3.Client
-
- exit chan struct{}
- Wait sync.WaitGroup
-}
-
-// nolint
-func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {
- ctx, cancel := context.WithCancel(context.Background())
- rawClient, err := clientv3.New(clientv3.Config{
- Context: ctx,
- Endpoints: endpoints,
- DialTimeout: timeout,
- DialOptions: []grpc.DialOption{grpc.WithBlock()},
- })
- if err != nil {
- return nil, perrors.WithMessage(err, "new raw client block connect to server")
- }
-
- c := &Client{
- name: name,
- timeout: timeout,
- endpoints: endpoints,
- heartbeat: heartbeat,
-
- ctx: ctx,
- cancel: cancel,
- rawClient: rawClient,
-
- exit: make(chan struct{}),
- }
-
- if err := c.maintenanceStatus(); err != nil {
- return nil, perrors.WithMessage(err, "client maintenance status")
- }
- return c, nil
-}
-
-// NOTICE: need to get the lock before calling this method
-func (c *Client) clean() {
- // close raw client
- c.rawClient.Close()
-
- // cancel ctx for raw client
- c.cancel()
-
- // clean raw client
- c.rawClient = nil
-}
-
-func (c *Client) stop() bool {
- select {
- case <-c.exit:
- return true
- default:
- close(c.exit)
- }
- return false
-}
-
-// nolint
-func (c *Client) Close() {
- if c == nil {
- return
- }
-
- // stop the client
- c.stop()
-
- // wait client maintenance status stop
- c.Wait.Wait()
-
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.rawClient != nil {
- c.clean()
- }
- logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
-}
-
-func (c *Client) maintenanceStatus() error {
- s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
- if err != nil {
- return perrors.WithMessage(err, "new session with server")
- }
-
- // must add wg before go maintenance status goroutine
- c.Wait.Add(1)
- go c.maintenanceStatusLoop(s)
- return nil
-}
-
-func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
- defer func() {
- c.Wait.Done()
- logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
- }()
-
- for {
- select {
- case <-c.Done():
- // Client be stopped, will clean the client hold resources
- return
- case <-s.Done():
- logger.Warn("etcd server stopped")
- c.lock.Lock()
- // when etcd server stopped, cancel ctx, stop all watchers
- c.clean()
- // when connection lose, stop client, trigger reconnect to etcd
- c.stop()
- c.lock.Unlock()
- return
- }
- }
-}
-
-// if k not exist will put k/v in etcd, otherwise return nil
-func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return ErrNilETCDV3Client
- }
-
- _, err := c.rawClient.Txn(c.ctx).
- If(clientv3.Compare(clientv3.Version(k), "<", 1)).
- Then(clientv3.OpPut(k, v, opts...)).
- Commit()
- return err
-}
-
-// if k not exist will put k/v in etcd
-// if k is already exist in etcd, replace it
-func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return ErrNilETCDV3Client
- }
-
- _, err := c.rawClient.Txn(c.ctx).
- If(clientv3.Compare(clientv3.Version(k), "!=", -1)).
- Then(clientv3.OpPut(k, v, opts...)).
- Commit()
- return err
-}
-
-func (c *Client) delete(k string) error {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return ErrNilETCDV3Client
- }
-
- _, err := c.rawClient.Delete(c.ctx, k)
- return err
-}
-
-func (c *Client) get(k string) (string, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return "", ErrNilETCDV3Client
- }
-
- resp, err := c.rawClient.Get(c.ctx, k)
- if err != nil {
- return "", err
- }
-
- if len(resp.Kvs) == 0 {
- return "", ErrKVPairNotFound
- }
-
- return string(resp.Kvs[0].Value), nil
-}
-
-// nolint
-func (c *Client) CleanKV() error {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return ErrNilETCDV3Client
- }
-
- _, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix())
- return err
-}
-
-func (c *Client) getChildren(k string) ([]string, []string, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return nil, nil, ErrNilETCDV3Client
- }
-
- resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix())
- if err != nil {
- return nil, nil, err
- }
-
- if len(resp.Kvs) == 0 {
- return nil, nil, ErrKVPairNotFound
- }
-
- kList := make([]string, 0, len(resp.Kvs))
- vList := make([]string, 0, len(resp.Kvs))
- for _, kv := range resp.Kvs {
- kList = append(kList, string(kv.Key))
- vList = append(vList, string(kv.Value))
- }
- return kList, vList, nil
-}
-
-func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return nil, ErrNilETCDV3Client
- }
-
- return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
-}
-
-func (c *Client) watch(k string) (clientv3.WatchChan, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return nil, ErrNilETCDV3Client
- }
-
- return c.rawClient.Watch(c.ctx, k), nil
-}
-
-func (c *Client) keepAliveKV(k string, v string) error {
- c.lock.RLock()
- defer c.lock.RUnlock()
-
- if c.rawClient == nil {
- return ErrNilETCDV3Client
- }
-
- // make lease time longer, since 1 second is too short
- lease, err := c.rawClient.Grant(c.ctx, int64(30*time.Second.Seconds()))
- if err != nil {
- return perrors.WithMessage(err, "grant lease")
- }
-
- keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
- if err != nil || keepAlive == nil {
- if _, revokeErr := c.rawClient.Revoke(c.ctx, lease.ID); revokeErr != nil {
- logger.Warnf("rawClient.Revoke() = error:%v", revokeErr)
- }
- if err != nil {
- return perrors.WithMessage(err, "keep alive lease")
- } else {
- return perrors.New("keep alive lease")
- }
- }
-
- _, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
- return perrors.WithMessage(err, "put k/v with lease")
-}
-
-// nolint
-func (c *Client) Done() <-chan struct{} {
- return c.exit
-}
-
-// nolint
-func (c *Client) Valid() bool {
- select {
- case <-c.exit:
- return false
- default:
- }
-
- c.lock.RLock()
- defer c.lock.RUnlock()
- return c.rawClient != nil
-}
-
-// nolint
-func (c *Client) Create(k string, v string) error {
- err := c.put(k, v)
- return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
-}
-
-// Update key value ...
-func (c *Client) Update(k, v string) error {
- err := c.update(k, v)
- return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
-}
-
-// nolint
-func (c *Client) Delete(k string) error {
- err := c.delete(k)
- return perrors.WithMessagef(err, "delete k/v (key %s)", k)
-}
-
-// RegisterTemp registers a temporary node
-func (c *Client) RegisterTemp(k, v string) error {
- err := c.keepAliveKV(k, v)
- return perrors.WithMessagef(err, "keepalive kv (key %s)", k)
-}
-
-// GetChildrenKVList gets children kv list by @k
-func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
- kList, vList, err := c.getChildren(k)
- return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k)
-}
-
-// Get gets value by @k
-func (c *Client) Get(k string) (string, error) {
- v, err := c.get(k)
- return v, perrors.WithMessagef(err, "get key value (key %s)", k)
-}
-
-// Watch watches on spec key
-func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
- wc, err := c.watch(k)
- return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
-}
-
-// WatchWithPrefix watches on spec prefix
-func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
- wc, err := c.watchWithPrefix(prefix)
- return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
-}
diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go
deleted file mode 100644
index 4f4fa21..0000000
--- a/remoting/etcdv3/client_test.go
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcdv3
-
-import (
- "net/url"
- "os"
- "path"
- "reflect"
- "strings"
- "sync"
- "testing"
- "time"
-)
-
-import (
- "github.com/coreos/etcd/embed"
- "github.com/coreos/etcd/mvcc/mvccpb"
- perrors "github.com/pkg/errors"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/suite"
- "google.golang.org/grpc/connectivity"
-)
-
-const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
-
-// tests dataset
-var tests = []struct {
- input struct {
- k string
- v string
- }
-}{
- {input: struct {
- k string
- v string
- }{k: "name", v: "scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "namePrefix", v: "prefix.scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "namePrefix1", v: "prefix1.scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "age", v: "27"}},
-}
-
-// test dataset prefix
-const prefix = "name"
-
-type ClientTestSuite struct {
- suite.Suite
-
- etcdConfig struct {
- name string
- endpoints []string
- timeout time.Duration
- heartbeat int
- }
-
- etcd *embed.Etcd
-
- client *Client
-}
-
-// start etcd server
-func (suite *ClientTestSuite) SetupSuite() {
- t := suite.T()
-
- DefaultListenPeerURLs := "http://localhost:2382"
- DefaultListenClientURLs := "http://localhost:2381"
- lpurl, _ := url.Parse(DefaultListenPeerURLs)
- lcurl, _ := url.Parse(DefaultListenClientURLs)
- cfg := embed.NewConfig()
- cfg.LPUrls = []url.URL{*lpurl}
- cfg.LCUrls = []url.URL{*lcurl}
- cfg.Dir = defaultEtcdV3WorkDir
- e, err := embed.StartEtcd(cfg)
- if err != nil {
- t.Fatal(err)
- }
- select {
- case <-e.Server.ReadyNotify():
- t.Log("Server is ready!")
- case <-time.After(60 * time.Second):
- e.Server.Stop() // trigger a shutdown
- t.Logf("Server took too long to start!")
- }
-
- suite.etcd = e
-}
-
-// stop etcd server
-func (suite *ClientTestSuite) TearDownSuite() {
- suite.etcd.Close()
- if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
- suite.FailNow(err.Error())
- }
-}
-
-func (suite *ClientTestSuite) setUpClient() *Client {
- c, err := NewClient(suite.etcdConfig.name,
- suite.etcdConfig.endpoints,
- suite.etcdConfig.timeout,
- suite.etcdConfig.heartbeat)
- if err != nil {
- suite.T().Fatal(err)
- }
- return c
-}
-
-// set up a client for suite
-func (suite *ClientTestSuite) SetupTest() {
- c := suite.setUpClient()
- err := c.CleanKV()
- suite.Nil(err)
- suite.client = c
-}
-
-func (suite *ClientTestSuite) TestClientClose() {
- c := suite.client
- t := suite.T()
-
- defer c.Close()
- if c.rawClient.ActiveConnection().GetState() != connectivity.Ready {
- t.Fatal(suite.client.rawClient.ActiveConnection().GetState())
- }
-}
-
-func (suite *ClientTestSuite) TestClientValid() {
- c := suite.client
- t := suite.T()
-
- if !c.Valid() {
- t.Fatal("client is not valid")
- }
- c.Close()
- if suite.client.Valid() != false {
- t.Fatal("client is valid")
- }
-}
-
-func (suite *ClientTestSuite) TestClientDone() {
- c := suite.client
-
- go func() {
- time.Sleep(2 * time.Second)
- c.Close()
- }()
-
- c.Wait.Wait()
-
- if c.Valid() {
- suite.T().Fatal("client should be invalid then")
- }
-}
-
-func (suite *ClientTestSuite) TestClientCreateKV() {
- tests := tests
-
- c := suite.client
- t := suite.T()
-
- defer suite.client.Close()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
- expect := tc.input.v
-
- if err := c.Create(k, v); err != nil {
- t.Fatal(err)
- }
-
- value, err := c.Get(k)
- if err != nil {
- t.Fatal(err)
- }
-
- if value != expect {
- t.Fatalf("expect %v but get %v", expect, value)
- }
-
- }
-}
-
-func (suite *ClientTestSuite) TestClientDeleteKV() {
- tests := tests
- c := suite.client
- t := suite.T()
-
- defer c.Close()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
- expect := ErrKVPairNotFound
-
- if err := c.Create(k, v); err != nil {
- t.Fatal(err)
- }
-
- if err := c.Delete(k); err != nil {
- t.Fatal(err)
- }
-
- _, err := c.Get(k)
- if perrors.Cause(err) == expect {
- continue
- }
-
- if err != nil {
- t.Fatal(err)
- }
- }
-}
-
-func (suite *ClientTestSuite) TestClientGetChildrenKVList() {
- tests := tests
-
- c := suite.client
- t := suite.T()
-
- var expectKList []string
- var expectVList []string
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if strings.Contains(k, prefix) {
- expectKList = append(expectKList, k)
- expectVList = append(expectVList, v)
- }
-
- if err := c.Create(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- kList, vList, err := c.GetChildrenKVList(prefix)
- if err != nil {
- t.Fatal(err)
- }
-
- if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) {
- return
- }
-
- t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList)
-}
-
-func (suite *ClientTestSuite) TestClientWatch() {
- tests := tests
-
- c := suite.client
- t := suite.T()
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- go func() {
- defer wg.Done()
-
- wc, err := c.watch(prefix)
- if err != nil {
- t.Error(err)
- }
-
- events := make([]mvccpb.Event, 0)
- var eCreate, eDelete mvccpb.Event
-
- for e := range wc {
- for _, event := range e.Events {
- events = append(events, (mvccpb.Event)(*event))
- if event.Type == mvccpb.PUT {
- eCreate = (mvccpb.Event)(*event)
- }
- if event.Type == mvccpb.DELETE {
- eDelete = (mvccpb.Event)(*event)
- }
- t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
- }
- }
-
- assert.Equal(t, 2, len(events))
- assert.Contains(t, events, eCreate)
- assert.Contains(t, events, eDelete)
- }()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if err := c.Create(k, v); err != nil {
- t.Fatal(err)
- }
-
- if err := c.delete(k); err != nil {
- t.Fatal(err)
- }
- }
-
- c.Close()
-
- wg.Wait()
-}
-
-func (suite *ClientTestSuite) TestClientRegisterTemp() {
- c := suite.client
- observeC := suite.setUpClient()
- t := suite.T()
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- go func() {
- defer wg.Done()
-
- completePath := path.Join("scott", "wang")
- wc, err := observeC.watch(completePath)
- if err != nil {
- t.Error(err)
- }
-
- events := make([]mvccpb.Event, 0)
- var eCreate, eDelete mvccpb.Event
-
- for e := range wc {
- for _, event := range e.Events {
- events = append(events, (mvccpb.Event)(*event))
- if event.Type == mvccpb.DELETE {
- eDelete = (mvccpb.Event)(*event)
- t.Logf("complete key (%s) is delete", completePath)
- observeC.Close()
- break
- }
- eCreate = (mvccpb.Event)(*event)
- t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
- }
- }
-
- assert.Equal(t, 2, len(events))
- assert.Contains(t, events, eCreate)
- assert.Contains(t, events, eDelete)
- }()
-
- err := c.RegisterTemp("scott/wang", "test")
- if err != nil {
- t.Fatal(err)
- }
-
- time.Sleep(2 * time.Second)
- c.Close()
-
- wg.Wait()
-}
-
-func TestClientSuite(t *testing.T) {
- suite.Run(t, &ClientTestSuite{
- etcdConfig: struct {
- name string
- endpoints []string
- timeout time.Duration
- heartbeat int
- }{
- name: "test",
- endpoints: []string{"localhost:2381"},
- timeout: time.Second,
- heartbeat: 1,
- },
- })
-}
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index 7a17691..8bd1e3a 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -24,6 +24,7 @@
import (
"github.com/apache/dubbo-getty"
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
@@ -34,8 +35,8 @@
)
type clientFacade interface {
- Client() *Client
- SetClient(*Client)
+ Client() *gxetcd.Client
+ SetClient(client *gxetcd.Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup // for wait group control, etcd client listener & etcd client container
Done() chan struct{} // for etcd client control
@@ -60,9 +61,9 @@
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
- clientName := RegistryETCDV3Client
+ clientName := gxetcd.RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
- endpoints := r.Client().endpoints
+ endpoints := r.Client().GetEndPoints()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
@@ -74,13 +75,14 @@
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
- case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
+ case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay)): // avoid connect frequent
}
err = ValidateClient(
r,
- WithName(clientName),
- WithEndpoints(endpoints...),
- WithTimeout(timeout),
+ gxetcd.WithName(clientName),
+ gxetcd.WithEndpoints(endpoints...),
+ gxetcd.WithTimeout(timeout),
+ gxetcd.WithHeartbeat(1),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
@@ -88,8 +90,8 @@
break
}
failTimes++
- if MaxFailTimes <= failTimes {
- failTimes = MaxFailTimes
+ if gxetcd.MaxFailTimes <= failTimes {
+ failTimes = gxetcd.MaxFailTimes
}
}
}
diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go
index 23ee727..e4ae694 100644
--- a/remoting/etcdv3/listener.go
+++ b/remoting/etcdv3/listener.go
@@ -25,6 +25,7 @@
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
@@ -35,14 +36,14 @@
// nolint
type EventListener struct {
- client *Client
+ client *gxetcd.Client
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
// NewEventListener returns a EventListener instance
-func NewEventListener(client *Client) *EventListener {
+func NewEventListener(client *gxetcd.Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}),
@@ -69,7 +70,7 @@
return false
// client ctx stop
- case <-l.client.ctx.Done():
+ case <-l.client.GetCtx().Done():
logger.Warnf("etcd client ctx cancel")
return false
@@ -147,7 +148,7 @@
return
// client ctx stop
- case <-l.client.ctx.Done():
+ case <-l.client.GetCtx().Done():
logger.Warnf("etcd client ctx cancel")
return
@@ -191,7 +192,7 @@
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
- keyList, valueList, err := l.client.getChildren(key)
+ keyList, valueList, err := l.client.GetChildren(key)
if err != nil {
logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go
index cfd8bff..6117123 100644
--- a/remoting/etcdv3/listener_test.go
+++ b/remoting/etcdv3/listener_test.go
@@ -18,10 +18,15 @@
package etcdv3
import (
+ "net/url"
+ "os"
+ "testing"
"time"
)
import (
+ "github.com/coreos/etcd/embed"
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
"github.com/stretchr/testify/assert"
)
@@ -29,6 +34,8 @@
"github.com/apache/dubbo-go/remoting"
)
+const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
+
var changedData = `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
@@ -51,7 +58,40 @@
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
-func (suite *ClientTestSuite) TestListener() {
+var etcd *embed.Etcd
+
+func SetUpEtcdServer(t *testing.T) {
+ var err error
+ DefaultListenPeerURLs := "http://localhost:2382"
+ DefaultListenClientURLs := "http://localhost:2381"
+ lpurl, _ := url.Parse(DefaultListenPeerURLs)
+ lcurl, _ := url.Parse(DefaultListenClientURLs)
+ cfg := embed.NewConfig()
+ cfg.LPUrls = []url.URL{*lpurl}
+ cfg.LCUrls = []url.URL{*lcurl}
+ cfg.Dir = defaultEtcdV3WorkDir
+ etcd, err = embed.StartEtcd(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case <-etcd.Server.ReadyNotify():
+ t.Log("Server is ready!")
+ case <-time.After(60 * time.Second):
+ etcd.Server.Stop() // trigger a shutdown
+ t.Logf("Server took too long to start!")
+ }
+}
+
+func ClearEtcdServer(t *testing.T) {
+ etcd.Close()
+ if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
+ t.Fail()
+ }
+}
+
+func TestListener(t *testing.T) {
+
tests := []struct {
input struct {
k string
@@ -63,9 +103,9 @@
v string
}{k: "/dubbo", v: changedData}},
}
-
- c := suite.client
- t := suite.T()
+ SetUpEtcdServer(t)
+ c, err := gxetcd.NewClient("test", []string{"localhost:2381"}, time.Second, 1)
+ assert.NoError(t, err)
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
@@ -84,11 +124,12 @@
}
msg := <-dataListener.rc
assert.Equal(t, changedData, msg.Content)
+ ClearEtcdServer(t)
}
type mockDataListener struct {
eventList []remoting.Event
- client *Client
+ client *gxetcd.Client
changedData string
rc chan remoting.Event