blob: 4df40acd2dda9a67083ae1ab6db5e3dd19dc5c55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package server
import (
"context"
"errors"
"strconv"
"testing"
"github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/syncer/config"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/go-chassis/cari/discovery"
"github.com/stretchr/testify/assert"
)
var s Server
func TestServer_IncrementPull(t *testing.T) {
confCreate()
s.channelMap = setChannel()
t.Run("increment when address exist", func(t *testing.T) {
iPReq := pb.IncrementPullRequest{
Addr: "1",
Length: 1,
}
syncData, err := s.IncrementPull(context.Background(), &iPReq)
assert.NoError(t, err, "no error when DeclareDataLength")
assert.NotEmpty(t, syncData, "no increase")
})
}
func TestServer_DeclareDataLength(t *testing.T) {
s.channelMap = setChannel()
t.Run("when address exist", func(t *testing.T) {
dReq := pb.DeclareRequest{
Addr: "3",
}
declareResp, err := s.DeclareDataLength(context.Background(), &dReq)
assert.NoError(t, err, "error when DeclareDataLength")
assert.Empty(t, err, "declareResp.SyncDataLength is empty")
assert.NotZero(t, declareResp.SyncDataLength, "declareResp.SyncDataLength is empty")
})
}
func TestService_incrementUserEvent(t *testing.T) {
t.Run("increment event fail", func(t *testing.T) {
//membersCreate()
svr := defaultServer()
s.serf = svr
confCreate()
result := s.incrementUserEvent([]byte("servicecenter"))
assert.Error(t, errors.New("members is nil"), "increment event fail when members is nil")
assert.False(t, result, "increment event fail with cluster name servicecenter")
})
}
func setChannel() map[string]chan *dump.WatchInstanceChangedEvent {
var channelMap = make(map[string]chan *dump.WatchInstanceChangedEvent)
var ch1 = make(chan *dump.WatchInstanceChangedEvent, 1000)
var ch2 = make(chan *dump.WatchInstanceChangedEvent, 1000)
var ch3 = make(chan *dump.WatchInstanceChangedEvent, 1000)
channelMap["1"] = ch1
channelMap["2"] = ch2
channelMap["3"] = ch3
var event1 = instanceAndServiceCreate(1)
for _, ch := range channelMap {
select {
case ch <- event1:
default:
}
}
return channelMap
}
func confCreate() {
tlsMount := config.Mount{
Enabled: false,
Name: "servicecenter",
}
tlsMount1 := config.Mount{
Enabled: false,
Name: "syncer",
}
listener := config.Listener{
BindAddr: "0.0.0.0:30190",
AdvertiseAddr: "",
RPCAddr: "0.0.0.0:30191",
PeerAddr: "127.0.0.1:30192",
TLSMount: tlsMount1,
}
registry := config.Registry{
Address: "http://127.0.0.1:30100",
Plugin: "servicecenter",
TLSMount: tlsMount,
}
join := config.Join{
Enabled: false,
Address: "127.0.0.1:30190",
RetryMax: 3,
RetryInterval: "30s",
}
lable := config.Label{
Key: "interval",
Value: "30s",
}
lables := append([]config.Label{}, lable)
task := config.Task{
Kind: "ticker",
Params: lables,
}
ciphers := []string{"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_RSA_WITH_AES_128_GCM_SHA256"}
tlsConfig1 := config.TLSConfig{
Name: "syncer",
VerifyPeer: true,
MinVersion: "TLSv1.2",
Passphrase: "",
CAFile: "./certs/trust.cer",
CertFile: "./certs/server.cer",
KeyFile: "./certs/server_key.pem",
Ciphers: ciphers,
}
tlsConfig2 := config.TLSConfig{
Name: "servicecenter",
VerifyPeer: false,
CAFile: "./certs/trust.cer",
CertFile: "./certs/server.cer",
KeyFile: "./certs/server_key.pem",
}
tlsConfigs := []*config.TLSConfig{&tlsConfig1, &tlsConfig2}
var conf = config.Config{
Mode: "signle",
Node: "syncer-node",
Cluster: "syncer-cluster",
DataDir: "./syncer-data/",
Listener: listener,
Join: join,
Task: task,
Registry: registry,
TLSConfigs: tlsConfigs,
}
s.conf = &conf
}
func instanceAndServiceCreate(i int) *dump.WatchInstanceChangedEvent {
var event = new(dump.WatchInstanceChangedEvent)
status := []string{"UNKNOWN", "UP", "DOWN"}
var ss = new(dump.Microservice)
var sv = new(discovery.MicroService)
sv.AppId = "serviceApp" + strconv.FormatInt(int64(i), 10)
sv.Environment = "env"
sv.ServiceId = "a59f99611a6945677a21f28c0aeb05abb" + strconv.FormatInt(int64(i/2), 10)
sv.Status = status[i%3]
sv.Version = "1.0.0"
var sk = new(dump.KV)
sk.Key = "/cse-sr/ms/files/default/default/" + sv.ServiceId
sk.Rev = int64(i)
ss.Value = sv
ss.KV = sk
is := new(dump.Instance)
insStatus := []string{"UNKNOWN", "UP", "STARTING", "DOWN", "OUTOFSERVICE"}
healthCheckModes := []string{"UNKNOWN", "PUSH", "PULL"}
healthCheck := discovery.HealthCheck{
Mode: healthCheckModes[i%3],
Interval: 30,
Times: 30,
}
var iv = new(discovery.MicroServiceInstance)
iv.HostName = "provider_demo" + strconv.FormatInt(int64(i), 10)
iv.Endpoints = []string{"rest://127.0.0.1:8080"}
iv.InstanceId = "5e1140fc232111eb9bb600acc8c56b5b" + strconv.FormatInt(int64(i/2), 10)
iv.HealthCheck = &healthCheck
if i == 10 {
iv.ServiceId = "a59f99611a6945677a21f28c0aeb05abb" + strconv.FormatInt(int64(i), 10)
} else {
iv.ServiceId = "a59f99611a6945677a21f28c0aeb05abb" + strconv.FormatInt(int64(i/2), 10)
}
iv.Status = insStatus[i%5]
iv.Version = "1.0.0"
var ik = new(dump.KV)
ik.Key = "/cse-sr/inst/files/default/default/" + sv.ServiceId + iv.InstanceId
ik.Rev = int64(i)
is.KV = ik
is.Value = iv
is.Rev = int64(i)
event.Instance = is
event.Service = ss
return event
}
func defaultServer() *serf.Server {
return serf.NewServer(
"",
serf.WithNode("syncer-test"),
serf.WithBindAddr("127.0.0.1"),
serf.WithBindPort(35151),
)
}