Merge pull request #564 from ChinX/syncer

[SCB-1427] Move storage to syncer/servicecenter
diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go
index 1330b48..a52a511 100644
--- a/syncer/etcd/agent.go
+++ b/syncer/etcd/agent.go
@@ -22,7 +22,7 @@
 	"errors"
 
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/syncer/servicecenter"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/etcdserver/api/v3client"
 )
@@ -31,7 +31,6 @@
 type Agent struct {
 	conf    *Config
 	etcd    *embed.Etcd
-	storage servicecenter.Storage
 	readyCh chan struct{}
 	errorCh chan error
 }
@@ -84,11 +83,8 @@
 }
 
 // Storage returns etcd storage
-func (a *Agent) Storage() servicecenter.Storage {
-	if a.storage == nil {
-		a.storage = NewStorage(v3client.New(a.etcd.Server))
-	}
-	return a.storage
+func (a *Agent) Storage() *clientv3.Client {
+	return v3client.New(a.etcd.Server)
 }
 
 // Stop etcd agent
diff --git a/syncer/pkg/mock/mocksotrage/etcd.go b/syncer/pkg/mock/mocksotrage/etcd.go
new file mode 100644
index 0000000..3604a16
--- /dev/null
+++ b/syncer/pkg/mock/mocksotrage/etcd.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocksotrage
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"os"
+
+	"github.com/apache/servicecomb-service-center/syncer/etcd"
+	"github.com/coreos/etcd/clientv3"
+)
+
+const (
+	defaultName           = "etcd_mock"
+	defaultDataDir        = "mock-data/"
+	defaultListenPeerAddr = "http://127.0.0.1:30993"
+)
+
+type MockServer struct {
+	etcd *etcd.Agent
+}
+
+func NewKVServer() (svr *MockServer, err error) {
+	agent := etcd.NewAgent(defaultConfig())
+	go agent.Start(context.Background())
+	select {
+	case <-agent.Ready():
+	case err = <-agent.Error():
+	}
+	if err != nil {
+		return nil, err
+	}
+	return &MockServer{agent}, nil
+}
+
+func (m *MockServer) Storage() *clientv3.Client {
+	return m.etcd.Storage()
+}
+
+func (m *MockServer) Stop() {
+	m.etcd.Stop()
+	os.RemoveAll(defaultDataDir)
+}
+
+func defaultConfig() *etcd.Config {
+	peer, _ := url.Parse(defaultListenPeerAddr)
+	conf := etcd.DefaultConfig()
+	conf.Name = defaultName
+	conf.Dir = defaultDataDir + defaultName
+	conf.APUrls = []url.URL{*peer}
+	conf.LPUrls = []url.URL{*peer}
+	conf.InitialCluster = fmt.Sprintf("%s=%s", defaultName, defaultListenPeerAddr)
+	return conf
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 213ba95..afed8d0 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -116,7 +116,7 @@
 		return
 	}
 
-	s.servicecenter.SetStorage(s.etcd.Storage())
+	s.servicecenter.SetStorageEngine(s.etcd.Storage())
 
 	s.agent.RegisterEventHandler(s)
 
diff --git a/syncer/servicecenter/servicecenter.go b/syncer/servicecenter/servicecenter.go
index 63f2d29..c8a288a 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -23,11 +23,13 @@
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/syncer/plugins"
 	pb "github.com/apache/servicecomb-service-center/syncer/proto"
+	"github.com/apache/servicecomb-service-center/syncer/servicecenter/storage"
+	"github.com/coreos/etcd/clientv3"
 )
 
 // Store interface of servicecenter
 type Servicecenter interface {
-	SetStorage(storage Storage)
+	SetStorageEngine(engine clientv3.KV)
 	FlushData()
 	Registry(clusterName string, data *pb.SyncData)
 	Discovery() *pb.SyncData
@@ -35,16 +37,7 @@
 
 type servicecenter struct {
 	servicecenter plugins.Servicecenter
-	storage       Storage
-}
-
-type Storage interface {
-	GetData() (data *pb.SyncData)
-	UpdateData(data *pb.SyncData)
-	GetMaps() (maps pb.SyncMapping)
-	UpdateMaps(maps pb.SyncMapping)
-	GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
-	UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+	storage       storage.Storage
 }
 
 // NewServicecenter new store with endpoints
@@ -59,8 +52,8 @@
 	}, nil
 }
 
-func (s *servicecenter) SetStorage(storage Storage) {
-	s.storage = storage
+func (s *servicecenter) SetStorageEngine(engine clientv3.KV) {
+	s.storage = storage.NewStorage(engine)
 }
 
 // FlushData flush data to servicecenter, update mapping data
diff --git a/syncer/servicecenter/servicecenter_test.go b/syncer/servicecenter/servicecenter_test.go
index 120b109..cef2040 100644
--- a/syncer/servicecenter/servicecenter_test.go
+++ b/syncer/servicecenter/servicecenter_test.go
@@ -57,7 +57,13 @@
 		t.Fatal(err)
 		return
 	}
-	dc.SetStorage(mocksotrage.New())
+	mockServer, err := mocksotrage.NewKVServer()
+	if err != nil {
+		t.Fatal(err)
+		return
+	}
+	defer mockServer.Stop()
+	dc.SetStorageEngine(mockServer.Storage())
 
 	mockplugin.SetGetAll(func(ctx context.Context) (data *pb.SyncData, e error) {
 		return nil, errors.New("test error")
diff --git a/syncer/etcd/storage.go b/syncer/servicecenter/storage/storage.go
similarity index 89%
rename from syncer/etcd/storage.go
rename to syncer/servicecenter/storage/storage.go
index f77af90..c6abf44 100644
--- a/syncer/etcd/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package etcd
+package storage
 
 import (
 	"context"
@@ -24,7 +24,6 @@
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	pb "github.com/apache/servicecomb-service-center/syncer/proto"
-	"github.com/apache/servicecomb-service-center/syncer/servicecenter"
 	"github.com/coreos/etcd/clientv3"
 	"github.com/gogo/protobuf/proto"
 )
@@ -38,15 +37,24 @@
 	instancesKey = "/syncer/v1/instances"
 )
 
+type Storage interface {
+	GetData() (data *pb.SyncData)
+	UpdateData(data *pb.SyncData)
+	GetMaps() (maps pb.SyncMapping)
+	UpdateMaps(maps pb.SyncMapping)
+	GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
+	UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+}
+
 type storage struct {
-	client *clientv3.Client
+	engine clientv3.KV
 	data   *pb.SyncData
 	lock   sync.RWMutex
 }
 
-func NewStorage(client *clientv3.Client) servicecenter.Storage {
+func NewStorage(engine clientv3.KV) Storage {
 	storage := &storage{
-		client: client,
+		engine: engine,
 		data:   &pb.SyncData{},
 	}
 	return storage
@@ -54,7 +62,7 @@
 
 // getPrefixKey Get data from etcd based on the prefix key
 func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) (next bool)) {
-	resp, err := s.client.Get(context.Background(), prefix, clientv3.WithPrefix())
+	resp, err := s.engine.Get(context.Background(), prefix, clientv3.WithPrefix())
 	if err != nil {
 		log.Errorf(err, "Get mapping from etcd failed: %s", err)
 		return
@@ -125,7 +133,7 @@
 			}
 		}
 		key := mappingsKey + "/" + entry.ClusterName + "/" + entry.OrgInstanceID
-		if _, err := s.client.Delete(context.Background(), key); err != nil {
+		if _, err := s.engine.Delete(context.Background(), key); err != nil {
 			log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
 		}
 
@@ -142,7 +150,7 @@
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
 		if err != nil {
 			log.Errorf(err, "Save service to etcd failed: %s", err)
 		}
@@ -175,7 +183,7 @@
 // DeleteServices Delete services from storage
 func (s *storage) deleteService(serviceId string) {
 	key := servicesKey + "/" + serviceId
-	_, err := s.client.Delete(context.Background(), key)
+	_, err := s.engine.Delete(context.Background(), key)
 	if err != nil {
 		log.Errorf(err, "Delete service from etcd failed: %s", err)
 	}
@@ -190,7 +198,7 @@
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
 		if err != nil {
 			log.Errorf(err, "Save instance to etcd failed: %s", err)
 		}
@@ -222,7 +230,7 @@
 
 func (s *storage) deleteInstance(instanceID string) {
 	key := instancesKey + "/" + instanceID
-	_, err := s.client.Delete(context.Background(), key)
+	_, err := s.engine.Delete(context.Background(), key)
 	if err != nil {
 		log.Errorf(err, "Delete instance from etcd failed: %s", err)
 	}
@@ -238,7 +246,7 @@
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
 		if err != nil {
 			log.Errorf(err, "Save mapping to etcd failed: %s", err)
 		}
@@ -275,7 +283,7 @@
 			log.Errorf(err, "Proto marshal failed: %s", err)
 			continue
 		}
-		_, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+		_, err = s.engine.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
 		if err != nil {
 			log.Errorf(err, "Save mapping to etcd failed: %s", err)
 			continue