Merge pull request #564 from ChinX/syncer
[SCB-1427] Move storage to syncer/servicecenter
diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go
index 1330b48..a52a511 100644
--- a/syncer/etcd/agent.go
+++ b/syncer/etcd/agent.go
@@ -22,7 +22,7 @@
"errors"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/syncer/servicecenter"
+ "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver/api/v3client"
)
@@ -31,7 +31,6 @@
type Agent struct {
conf *Config
etcd *embed.Etcd
- storage servicecenter.Storage
readyCh chan struct{}
errorCh chan error
}
@@ -84,11 +83,8 @@
}
// Storage returns etcd storage
-func (a *Agent) Storage() servicecenter.Storage {
- if a.storage == nil {
- a.storage = NewStorage(v3client.New(a.etcd.Server))
- }
- return a.storage
+func (a *Agent) Storage() *clientv3.Client {
+ return v3client.New(a.etcd.Server)
}
// Stop etcd agent
diff --git a/syncer/pkg/mock/mocksotrage/etcd.go b/syncer/pkg/mock/mocksotrage/etcd.go
new file mode 100644
index 0000000..3604a16
--- /dev/null
+++ b/syncer/pkg/mock/mocksotrage/etcd.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocksotrage
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "os"
+
+ "github.com/apache/servicecomb-service-center/syncer/etcd"
+ "github.com/coreos/etcd/clientv3"
+)
+
+const (
+ defaultName = "etcd_mock"
+ defaultDataDir = "mock-data/"
+ defaultListenPeerAddr = "http://127.0.0.1:30993"
+)
+
+type MockServer struct {
+ etcd *etcd.Agent
+}
+
+func NewKVServer() (svr *MockServer, err error) {
+ agent := etcd.NewAgent(defaultConfig())
+ go agent.Start(context.Background())
+ select {
+ case <-agent.Ready():
+ case err = <-agent.Error():
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &MockServer{agent}, nil
+}
+
+func (m *MockServer) Storage() *clientv3.Client {
+ return m.etcd.Storage()
+}
+
+func (m *MockServer) Stop() {
+ m.etcd.Stop()
+ os.RemoveAll(defaultDataDir)
+}
+
+func defaultConfig() *etcd.Config {
+ peer, _ := url.Parse(defaultListenPeerAddr)
+ conf := etcd.DefaultConfig()
+ conf.Name = defaultName
+ conf.Dir = defaultDataDir + defaultName
+ conf.APUrls = []url.URL{*peer}
+ conf.LPUrls = []url.URL{*peer}
+ conf.InitialCluster = fmt.Sprintf("%s=%s", defaultName, defaultListenPeerAddr)
+ return conf
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 213ba95..afed8d0 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -116,7 +116,7 @@
return
}
- s.servicecenter.SetStorage(s.etcd.Storage())
+ s.servicecenter.SetStorageEngine(s.etcd.Storage())
s.agent.RegisterEventHandler(s)
diff --git a/syncer/servicecenter/servicecenter.go b/syncer/servicecenter/servicecenter.go
index 63f2d29..c8a288a 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -23,11 +23,13 @@
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/plugins"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
+ "github.com/apache/servicecomb-service-center/syncer/servicecenter/storage"
+ "github.com/coreos/etcd/clientv3"
)
// Store interface of servicecenter
type Servicecenter interface {
- SetStorage(storage Storage)
+ SetStorageEngine(engine clientv3.KV)
FlushData()
Registry(clusterName string, data *pb.SyncData)
Discovery() *pb.SyncData
@@ -35,16 +37,7 @@
type servicecenter struct {
servicecenter plugins.Servicecenter
- storage Storage
-}
-
-type Storage interface {
- GetData() (data *pb.SyncData)
- UpdateData(data *pb.SyncData)
- GetMaps() (maps pb.SyncMapping)
- UpdateMaps(maps pb.SyncMapping)
- GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
- UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+ storage storage.Storage
}
// NewServicecenter new store with endpoints
@@ -59,8 +52,8 @@
}, nil
}
-func (s *servicecenter) SetStorage(storage Storage) {
- s.storage = storage
+func (s *servicecenter) SetStorageEngine(engine clientv3.KV) {
+ s.storage = storage.NewStorage(engine)
}
// FlushData flush data to servicecenter, update mapping data
diff --git a/syncer/servicecenter/servicecenter_test.go b/syncer/servicecenter/servicecenter_test.go
index 120b109..cef2040 100644
--- a/syncer/servicecenter/servicecenter_test.go
+++ b/syncer/servicecenter/servicecenter_test.go
@@ -57,7 +57,13 @@
t.Fatal(err)
return
}
- dc.SetStorage(mocksotrage.New())
+ mockServer, err := mocksotrage.NewKVServer()
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+ defer mockServer.Stop()
+ dc.SetStorageEngine(mockServer.Storage())
mockplugin.SetGetAll(func(ctx context.Context) (data *pb.SyncData, e error) {
return nil, errors.New("test error")
diff --git a/syncer/etcd/storage.go b/syncer/servicecenter/storage/storage.go
similarity index 89%
rename from syncer/etcd/storage.go
rename to syncer/servicecenter/storage/storage.go
index f77af90..c6abf44 100644
--- a/syncer/etcd/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package etcd
+package storage
import (
"context"
@@ -24,7 +24,6 @@
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
- "github.com/apache/servicecomb-service-center/syncer/servicecenter"
"github.com/coreos/etcd/clientv3"
"github.com/gogo/protobuf/proto"
)
@@ -38,15 +37,24 @@
instancesKey = "/syncer/v1/instances"
)
+type Storage interface {
+ GetData() (data *pb.SyncData)
+ UpdateData(data *pb.SyncData)
+ GetMaps() (maps pb.SyncMapping)
+ UpdateMaps(maps pb.SyncMapping)
+ GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
+ UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+}
+
type storage struct {
- client *clientv3.Client
+ engine clientv3.KV
data *pb.SyncData
lock sync.RWMutex
}
-func NewStorage(client *clientv3.Client) servicecenter.Storage {
+func NewStorage(engine clientv3.KV) Storage {
storage := &storage{
- client: client,
+ engine: engine,
data: &pb.SyncData{},
}
return storage
@@ -54,7 +62,7 @@
// getPrefixKey Get data from etcd based on the prefix key
func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) (next bool)) {
- resp, err := s.client.Get(context.Background(), prefix, clientv3.WithPrefix())
+ resp, err := s.engine.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
log.Errorf(err, "Get mapping from etcd failed: %s", err)
return
@@ -125,7 +133,7 @@
}
}
key := mappingsKey + "/" + entry.ClusterName + "/" + entry.OrgInstanceID
- if _, err := s.client.Delete(context.Background(), key); err != nil {
+ if _, err := s.engine.Delete(context.Background(), key); err != nil {
log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
}
@@ -142,7 +150,7 @@
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+ _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
if err != nil {
log.Errorf(err, "Save service to etcd failed: %s", err)
}
@@ -175,7 +183,7 @@
// DeleteServices Delete services from storage
func (s *storage) deleteService(serviceId string) {
key := servicesKey + "/" + serviceId
- _, err := s.client.Delete(context.Background(), key)
+ _, err := s.engine.Delete(context.Background(), key)
if err != nil {
log.Errorf(err, "Delete service from etcd failed: %s", err)
}
@@ -190,7 +198,7 @@
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+ _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
if err != nil {
log.Errorf(err, "Save instance to etcd failed: %s", err)
}
@@ -222,7 +230,7 @@
func (s *storage) deleteInstance(instanceID string) {
key := instancesKey + "/" + instanceID
- _, err := s.client.Delete(context.Background(), key)
+ _, err := s.engine.Delete(context.Background(), key)
if err != nil {
log.Errorf(err, "Delete instance from etcd failed: %s", err)
}
@@ -238,7 +246,7 @@
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+ _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
if err != nil {
log.Errorf(err, "Save mapping to etcd failed: %s", err)
}
@@ -275,7 +283,7 @@
log.Errorf(err, "Proto marshal failed: %s", err)
continue
}
- _, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+ _, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
if err != nil {
log.Errorf(err, "Save mapping to etcd failed: %s", err)
continue