Supports synchronization in a single direction
diff --git a/syncer/pkg/utils/atomic_bool.go b/syncer/pkg/utils/atomic_bool.go
new file mode 100644
index 0000000..4336033
--- /dev/null
+++ b/syncer/pkg/utils/atomic_bool.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+// AtomicBool struct
+type AtomicBool struct {
+ m sync.Mutex
+ status uint32
+}
+
+// NewAtomicBool returns an atomic bool
+func NewAtomicBool(b bool) *AtomicBool {
+ var status uint32
+ if b {
+ status = 1
+ }
+ return &AtomicBool{status: status}
+}
+
+// Bool returns a bool value
+func (a *AtomicBool) Bool() bool {
+ return atomic.LoadUint32(&a.status)&1 == 1
+}
+
+// DoToReverse Do something and reverse the status
+func (a *AtomicBool) DoToReverse(when bool, fn func()) {
+ if a.Bool() != when {
+ return
+ }
+
+ a.m.Lock()
+ fn()
+ atomic.StoreUint32(&a.status, a.status^1)
+ a.m.Unlock()
+}
diff --git a/syncer/pkg/utils/atomic_bool_test.go b/syncer/pkg/utils/atomic_bool_test.go
new file mode 100644
index 0000000..e4455aa
--- /dev/null
+++ b/syncer/pkg/utils/atomic_bool_test.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAtomicBool(t *testing.T) {
+ b := NewAtomicBool(false)
+ assert.False(t, b.Bool())
+
+ b.DoToReverse(false, func() {})
+ assert.True(t, b.Bool())
+
+ b.DoToReverse(true, func() {})
+ assert.False(t, b.Bool())
+
+ nb := NewAtomicBool(true)
+ assert.True(t, nb.Bool())
+
+ nb.DoToReverse(false, func() {})
+ assert.True(t, nb.Bool())
+
+ nb.DoToReverse(true, func() {})
+ assert.False(t, nb.Bool())
+}
diff --git a/syncer/server/convert.go b/syncer/server/convert.go
index 1f56762..45e3024 100644
--- a/syncer/server/convert.go
+++ b/syncer/server/convert.go
@@ -29,6 +29,7 @@
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/serf"
+ "github.com/apache/servicecomb-service-center/syncer/task"
)
func convertSerfConfig(c *config.Config) *serf.Config {
@@ -64,16 +65,12 @@
return conf
}
-func convertTickerInterval(c *config.Config) int {
- strNum := ""
+func convertTaskOptions(c *config.Config) []task.Option {
+ opts := make([]task.Option, 0, len(c.Task.Params))
for _, label := range c.Task.Params {
- if label.Key == "interval" {
- strNum = label.Value
- break
- }
+ opts = append(opts, task.WithAddKV(label.Key, label.Value))
}
- interval, _ := time.ParseDuration(strNum)
- return int(interval.Seconds())
+ return opts
}
func convertSCConfigOption(c *config.Config) []plugins.SCConfigOption {
diff --git a/syncer/server/server.go b/syncer/server/server.go
index e5eafdf..17a2b60 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -31,15 +31,19 @@
"github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/grpc"
"github.com/apache/servicecomb-service-center/syncer/pkg/syssig"
- "github.com/apache/servicecomb-service-center/syncer/pkg/ticker"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/apache/servicecomb-service-center/syncer/servicecenter"
+ "github.com/apache/servicecomb-service-center/syncer/task"
// import plugins
_ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
_ "github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
+
+ // import task
+ _ "github.com/apache/servicecomb-service-center/syncer/task/idle"
+ _ "github.com/apache/servicecomb-service-center/syncer/task/ticker"
)
var stopChanErr = errors.New("stopped syncer by stopCh")
@@ -61,8 +65,8 @@
// Syncer configuration
conf *config.Config
- // Ticker for Syncer
- tick *ticker.TaskTicker
+ // task for discovery
+ task task.Tasker
// Wrap the servicecenter
servicecenter servicecenter.Servicecenter
@@ -126,7 +130,12 @@
s.servicecenter.SetStorageEngine(s.etcd.Storage())
s.agent.RegisterEventHandler(s)
- gopool.Go(s.tick.Start)
+
+ s.task.Handle(func() {
+ s.tickHandler(ctx)
+ })
+
+ s.task.Run(ctx)
log.Info("start service done")
@@ -138,10 +147,6 @@
// Stop Syncer Server
func (s *Server) Stop() {
- if s.tick != nil {
- s.tick.Stop()
- }
-
if s.agent != nil {
// removes the serf eventHandler
s.agent.DeregisterEventHandler(s)
@@ -192,7 +197,11 @@
s.etcdConf = convertEtcdConfig(s.conf)
s.etcd = etcd.NewAgent(s.etcdConf)
- s.tick = ticker.NewTaskTicker(convertTickerInterval(s.conf), s.tickHandler)
+ s.task, err = task.GenerateTasker(s.conf.Task.Kind, convertTaskOptions(s.conf)...)
+ if err != nil {
+ log.Errorf(err, "Create tasker failed, %s", err)
+ return
+ }
s.servicecenter, err = servicecenter.NewServicecenter(convertSCConfigOption(s.conf)...)
if err != nil {
diff --git a/syncer/task/idle/idle.go b/syncer/task/idle/idle.go
new file mode 100644
index 0000000..dbf7fa3
--- /dev/null
+++ b/syncer/task/idle/idle.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package idle
+
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/syncer/task"
+)
+
+const (
+ // TaskName task name
+ TaskName = "idle"
+)
+
+func init() {
+ task.RegisterTasker(TaskName, NewIdle)
+}
+
+// Idle struct
+type Idle struct{}
+
+// NewIdle returns an idle task
+func NewIdle(params map[string]string) (task.Tasker, error) {
+ return &Idle{}, nil
+}
+
+// Run task
+func (t *Idle) Run(ctx context.Context) {}
+
+// Handle task trigger
+func (t *Idle) Handle(handler func()) {}
diff --git a/syncer/task/idle/idle_test.go b/syncer/task/idle/idle_test.go
new file mode 100644
index 0000000..01fdbd6
--- /dev/null
+++ b/syncer/task/idle/idle_test.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package idle
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestIdle(t *testing.T) {
+ task, err := NewIdle(map[string]string{})
+ assert.Nil(t, err)
+
+ task.Handle(func() {})
+
+ task.Run(context.Background())
+
+}
diff --git a/syncer/task/task.go b/syncer/task/task.go
new file mode 100644
index 0000000..e2f9bd2
--- /dev/null
+++ b/syncer/task/task.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task
+
+import (
+ "context"
+ "errors"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+var taskMgr = map[string]generator{}
+
+type generator func(params map[string]string) (Tasker, error)
+
+// Tasker interface
+type Tasker interface {
+ Run(ctx context.Context)
+ Handle(handler func())
+}
+
+// RegisterTasker register an tasker to manager
+func RegisterTasker(name string, fn generator) {
+ if _, ok := taskMgr[name]; ok {
+ log.Warnf("task generator is already exist, name = %s", name)
+ }
+ taskMgr[name] = fn
+}
+
+// GenerateTasker generate an tasker by name from manager
+func GenerateTasker(name string, ops ...Option) (Tasker, error) {
+ fn, ok := taskMgr[name]
+ if !ok {
+ err := errors.New("trigger generator is not found")
+ log.Errorf(err, "name = %s", name)
+ return nil, err
+ }
+ return fn(toMap(ops...))
+}
+
+// Option task option
+type Option func(map[string]string)
+
+// WithAddKV wrap the key and value to an option
+func WithAddKV(key, value string) Option {
+ return func(m map[string]string) { m[key] = value }
+}
+
+func toMap(ops ...Option) map[string]string {
+ m := make(map[string]string, len(ops))
+ for _, op := range ops {
+ op(m)
+ }
+ return m
+}
diff --git a/syncer/task/task_test.go b/syncer/task/task_test.go
new file mode 100644
index 0000000..7c63dd9
--- /dev/null
+++ b/syncer/task/task_test.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// empty struct
+type empty struct{}
+
+// newEmpty returns an empty task
+func newEmpty(params map[string]string) (Tasker, error) {
+ return &empty{}, nil
+}
+
+// Run task
+func (t *empty) Run(ctx context.Context) {}
+
+// Handle task trigger
+func (t *empty) Handle(handler func()) {}
+
+func TestTasker(t *testing.T) {
+ taskName := "empty"
+ _, err := GenerateTasker(taskName)
+ assert.NotNil(t, err)
+
+ RegisterTasker(taskName, newEmpty)
+ RegisterTasker(taskName, newEmpty)
+
+ _, err = GenerateTasker(taskName, WithAddKV("test", "test"))
+ assert.Nil(t, err)
+}
diff --git a/syncer/task/ticker/ticker.go b/syncer/task/ticker/ticker.go
new file mode 100644
index 0000000..34b6ac9
--- /dev/null
+++ b/syncer/task/ticker/ticker.go
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ticker
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/apache/servicecomb-service-center/syncer/task"
+ "github.com/pkg/errors"
+)
+
+const (
+ TaskName = "ticker"
+ intervalKey = "interval"
+)
+
+func init() {
+ task.RegisterTasker(TaskName, NewTicker)
+}
+
+// Ticker struct
+type Ticker struct {
+ interval time.Duration
+ handler func()
+ running *utils.AtomicBool
+ ticker *time.Ticker
+}
+
+// NewTicker returns a ticker as a tasker
+func NewTicker(params map[string]string) (task.Tasker, error) {
+ val, ok := params[intervalKey]
+ if !ok {
+ return nil, errors.New("ticker: param interval notfound")
+ }
+
+ interval, err := time.ParseDuration(val)
+ if err != nil {
+ return nil, errors.Wrap(err, "ticker: parse interval duration failed")
+ }
+
+ return &Ticker{
+ interval: interval,
+ running: utils.NewAtomicBool(false),
+ }, nil
+}
+
+// Run ticker task
+func (t *Ticker) Run(ctx context.Context) {
+ t.running.DoToReverse(false, func() {
+ t.ticker = time.NewTicker(t.interval)
+ t.handler()
+ go t.wait(ctx)
+ })
+}
+
+// Handle ticker task
+func (t *Ticker) Handle(handler func()) {
+ t.handler = handler
+}
+
+func (t *Ticker) wait(ctx context.Context) {
+ for {
+ select {
+ case <-t.ticker.C:
+ t.handler()
+ case <-ctx.Done():
+ t.stop()
+ return
+ }
+ }
+}
+
+// stop ticker task
+func (t *Ticker) stop() {
+ t.running.DoToReverse(true, func() {
+ if t.ticker != nil {
+ t.ticker.Stop()
+ }
+ log.Info("ticker stop")
+ })
+
+ log.Info("ticker done")
+}
diff --git a/syncer/task/ticker/ticker_test.go b/syncer/task/ticker/ticker_test.go
new file mode 100644
index 0000000..f2847de
--- /dev/null
+++ b/syncer/task/ticker/ticker_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ticker
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestTicker(t *testing.T) {
+ params := map[string]string{}
+ _, err := NewTicker(params)
+ assert.NotNil(t, err)
+
+ params[intervalKey] = "1ams"
+ _, err = NewTicker(params)
+ assert.NotNil(t, err)
+
+ params[intervalKey] = "1s"
+
+ task, err := NewTicker(params)
+ assert.Nil(t, err)
+
+ isRunning := false
+ stopped := make(chan struct{})
+ task.Handle(func() {
+ if isRunning {
+ close(stopped)
+ } else {
+ isRunning = true
+ }
+ })
+
+ ctx, cancel := context.WithCancel(context.Background())
+ go task.Run(ctx)
+
+ <-stopped
+ assert.True(t, isRunning)
+ ctx.Done()
+ cancel()
+ time.Sleep(time.Second)
+}