blob: e8f484c18527b1bf2d9fe26a9574947072201f9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcd
import (
_ "github.com/apache/servicecomb-service-center/server/plugin/pkg/tracing/buildin"
"github.com/stretchr/testify/assert"
)
import _ "github.com/apache/servicecomb-service-center/server/plugin/pkg/security/buildin"
import _ "github.com/apache/servicecomb-service-center/server/plugin/pkg/tls/buildin"
import (
context2 "context"
"fmt"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"github.com/apache/servicecomb-service-center/server/rpc"
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc/status"
)
const (
dialTimeout = 500 * time.Millisecond
)
var (
endpoint = registry.Configuration().ClusterAddresses
)
func TestInitCluster(t *testing.T) {
t.Run("normal", func(t *testing.T) {
registry.Configuration().ClusterAddresses = "127.0.0.1:2379"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "127.0.0.1:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
})
t.Run("not normal2", func(t *testing.T) {
registry.Configuration().ClusterAddresses = "127.0.0.1:2379,127.0.0.2:2379"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "127.0.0.1:2379,127.0.0.2:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
})
registry.Configuration().ClusterName = "sc-0"
registry.Configuration().ClusterAddresses = "sc-0=127.0.0.1:2379,127.0.0.2:2379"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "127.0.0.1:2379,127.0.0.2:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
if strings.Join(registry.Configuration().Clusters["sc-0"], ",") != "127.0.0.1:2379,127.0.0.2:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().Clusters)
}
registry.Configuration().ClusterName = "sc-0"
registry.Configuration().ClusterAddresses = "sc-1=127.0.0.1:2379,127.0.0.2:2379,sc-2=127.0.0.3:2379"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
if strings.Join(registry.Configuration().Clusters["sc-1"], ",") != "127.0.0.1:2379,127.0.0.2:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().Clusters)
}
if strings.Join(registry.Configuration().Clusters["sc-2"], ",") != "127.0.0.3:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().Clusters)
}
registry.Configuration().ClusterName = "sc-0"
registry.Configuration().ClusterAddresses = "sc-0=127.0.0.1:2379,sc-1=127.0.0.3:2379,127.0.0.4:2379"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "127.0.0.1:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
if strings.Join(registry.Configuration().Clusters["sc-1"], ",") != "127.0.0.3:2379,127.0.0.4:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().Clusters)
}
registry.Configuration().ClusterName = "sc-0"
registry.Configuration().ManagerAddress = "127.0.0.1:2379,127.0.0.2:2379"
registry.Configuration().ClusterAddresses = "sc-0=127.0.0.1:30100,sc-1=127.0.0.2:30100"
registry.Configuration().InitClusterInfo()
if strings.Join(registry.Configuration().RegistryAddresses(), ",") != "127.0.0.1:2379,127.0.0.2:2379" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().RegistryAddresses())
}
if strings.Join(registry.Configuration().Clusters["sc-1"], ",") != "127.0.0.2:30100" {
t.Fatalf("TestInitCluster failed, %v", registry.Configuration().Clusters)
}
}
func TestEtcdClient(t *testing.T) {
registry.Configuration().ClusterName = ""
registry.Configuration().ManagerAddress = ""
registry.Configuration().ClusterAddresses = endpoint
registry.Configuration().InitClusterInfo()
etcdc := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcdc.Initialize()
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcdc.Close()
select {
case <-etcdc.Ready():
default:
t.Fatalf("TestEtcdClient failed")
}
// base test
inst := NewRegistry()
if inst == nil || strings.Index(endpoint, firstEndpoint) < 0 {
t.Fatalf("TestEtcdClient failed, %s != %s", firstEndpoint, endpoint)
}
old1 := registry.Configuration().ClusterAddresses
old2 := registry.Configuration().DialTimeout
registry.Configuration().ClusterAddresses = "x"
registry.Configuration().InitClusterInfo()
registry.Configuration().DialTimeout = dialTimeout
inst = NewRegistry()
if inst == nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
select {
case <-inst.(*EtcdClient).Err():
default:
t.Fatalf("TestEtcdClient failed, %#v", err)
}
registry.Configuration().ClusterAddresses = old1
registry.Configuration().InitClusterInfo()
registry.Configuration().DialTimeout = old2
// case: etcdc do
// put
resp, err := etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_range/b"),
registry.WithStrValue("b"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/b"))
if err != nil || !resp.Succeeded || resp.Count != 1 ||
string(resp.Kvs[0].Key) != "/test_range/b" || string(resp.Kvs[0].Value) != "b" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_range/a"),
registry.WithStrValue("a"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/a"),
registry.WithKeyOnly())
if err != nil || !resp.Succeeded || resp.Count != 1 ||
string(resp.Kvs[0].Key) != "/test_range/a" || resp.Kvs[0].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/a"),
registry.WithCountOnly())
if err != nil || !resp.Succeeded || resp.Count != 1 || resp.Kvs != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_range/c"),
registry.WithStrValue("c"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_range/d"),
registry.WithStrValue("d"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_range/dd"),
registry.WithStrValue("dd"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// get prefix
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/d"),
registry.WithPrefix())
if err != nil || !resp.Succeeded || resp.Count != 2 ||
string(resp.Kvs[0].Key) != "/test_range/d" || string(resp.Kvs[0].Value) != "d" ||
string(resp.Kvs[1].Key) != "/test_range/dd" || string(resp.Kvs[1].Value) != "dd" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/d"),
registry.WithPrefix(), registry.WithKeyOnly())
if err != nil || !resp.Succeeded || resp.Count != 2 ||
string(resp.Kvs[0].Key) != "/test_range/d" || resp.Kvs[0].Value != nil ||
string(resp.Kvs[1].Key) != "/test_range/dd" || resp.Kvs[1].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/d"),
registry.WithPrefix(), registry.WithCountOnly())
if err != nil || !resp.Succeeded || resp.Count != 2 || resp.Kvs != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// get range
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd")) // [b, dd) !!!
if err != nil || !resp.Succeeded || resp.Count != 3 ||
string(resp.Kvs[0].Key) != "/test_range/b" || string(resp.Kvs[0].Value) != "b" ||
string(resp.Kvs[1].Key) != "/test_range/c" || string(resp.Kvs[1].Value) != "c" ||
string(resp.Kvs[2].Key) != "/test_range/d" || string(resp.Kvs[2].Value) != "d" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd"), registry.WithKeyOnly()) // [b, dd) !!!
if err != nil || !resp.Succeeded || resp.Count != 3 ||
string(resp.Kvs[0].Key) != "/test_range/b" || resp.Kvs[0].Value != nil ||
string(resp.Kvs[1].Key) != "/test_range/c" || resp.Kvs[1].Value != nil ||
string(resp.Kvs[2].Key) != "/test_range/d" || resp.Kvs[2].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd"), registry.WithCountOnly()) // [b, dd) !!!
if err != nil || !resp.Succeeded || resp.Count != 3 || resp.Kvs != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// get prefix paging
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/"), registry.WithPrefix(),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 5 || len(resp.Kvs) != 2 ||
string(resp.Kvs[0].Key) != "/test_range/c" || string(resp.Kvs[0].Value) != "c" ||
string(resp.Kvs[1].Key) != "/test_range/d" || string(resp.Kvs[1].Value) != "d" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/"), registry.WithPrefix(), registry.WithKeyOnly(),
registry.WithOffset(4), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 5 || len(resp.Kvs) != 1 ||
string(resp.Kvs[0].Key) != "/test_range/dd" || resp.Kvs[0].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/d"), registry.WithPrefix(), registry.WithKeyOnly(),
registry.WithOffset(0), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 2 || len(resp.Kvs) != 2 ||
string(resp.Kvs[0].Key) != "/test_range/d" || resp.Kvs[0].Value != nil ||
string(resp.Kvs[1].Key) != "/test_range/dd" || resp.Kvs[1].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/"), registry.WithPrefix(), registry.WithCountOnly(),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 5 || resp.Kvs != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/"), registry.WithPrefix(),
registry.WithOffset(6), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 5 || len(resp.Kvs) != 0 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// if offset < -1, just paging by limit
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/"), registry.WithPrefix(),
registry.WithOffset(-2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 5 || len(resp.Kvs) != 5 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// get range paging
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd"),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 3 || len(resp.Kvs) != 1 ||
string(resp.Kvs[0].Key) != "/test_range/d" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/a"),
registry.WithStrEndKey("/test_range/dd"),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 4 || len(resp.Kvs) != 2 ||
string(resp.Kvs[0].Key) != "/test_range/c" ||
string(resp.Kvs[1].Key) != "/test_range/d" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/a"),
registry.WithStrEndKey("/test_range/dd"), registry.WithKeyOnly(),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 4 || len(resp.Kvs) != 2 ||
string(resp.Kvs[0].Key) != "/test_range/c" || resp.Kvs[0].Value != nil ||
string(resp.Kvs[1].Key) != "/test_range/d" || resp.Kvs[1].Value != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/a"),
registry.WithStrEndKey("/test_range/dd"), registry.WithCountOnly(),
registry.WithOffset(2), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 4 || resp.Kvs != nil {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd"),
registry.WithOffset(5), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 3 || len(resp.Kvs) != 0 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_range/a"),
registry.WithStrEndKey("/test_range/dd"),
registry.WithOffset(4), registry.WithLimit(2))
if err != nil || !resp.Succeeded || resp.Count != 4 || len(resp.Kvs) != 0 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// delete range
resp, err = etcdc.Do(context.Background(), registry.DEL,
registry.WithStrKey("/test_range/b"),
registry.WithStrEndKey("/test_range/dd")) // [b, d) !!!
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Delete failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/"),
registry.WithPrefix())
if err != nil || !resp.Succeeded || len(resp.Kvs) != 2 || string(resp.Kvs[1].Key) != "/test_range/dd" {
t.Fatalf("TestEtcdClient_Delete failed, %#v", resp.Kvs)
}
// delete prefix
resp, err = etcdc.Do(context.Background(), registry.DEL, registry.WithStrKey("/test_range/"),
registry.WithPrefix())
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Delete failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET, registry.WithStrKey("/test_range/"),
registry.WithPrefix())
if err != nil || !resp.Succeeded || resp.Count != 0 {
t.Fatalf("TestEtcdClient_Delete failed, %#v", err)
}
// large data
var wg sync.WaitGroup
for i := 0; i < registry.DEFAULT_PAGE_COUNT+1; i++ {
wg.Add(1)
v := strconv.Itoa(i)
go func() {
defer wg.Done()
resp, err = etcdc.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_page/"+v),
registry.WithStrValue(v))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
}()
}
wg.Wait()
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_page/"),
registry.WithStrEndKey("/test_page/9999"))
if err != nil || !resp.Succeeded || resp.Count != registry.DEFAULT_PAGE_COUNT+1 ||
len(resp.Kvs) != registry.DEFAULT_PAGE_COUNT+1 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_page/"), registry.WithPrefix(), registry.WithDescendOrder())
if err != nil || !resp.Succeeded || resp.Count != registry.DEFAULT_PAGE_COUNT+1 ||
len(resp.Kvs) != registry.DEFAULT_PAGE_COUNT+1 ||
string(resp.Kvs[0].Key) != "/test_page/999" {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
// delete range
resp, err = etcdc.Do(context.Background(), registry.DEL,
registry.WithStrKey("/test_page/"),
registry.WithStrEndKey("/test_page/9999"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Delete failed, %#v", err)
}
resp, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_page/"), registry.WithPrefix())
if err != nil || !resp.Succeeded || resp.Count != 0 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
}
func TestEtcdClient_Compact(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcd.Close()
err = etcd.Compact(context.Background(), 0)
if err != nil {
t.Fatalf("TestEtcdClient_Compact failed")
}
err = etcd.Compact(context.Background(), 0)
if err == nil {
t.Fatalf("TestEtcdClient_Compact failed")
}
}
func TestEtcdClient_Txn(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcd.Close()
resp, err := etcd.Txn(context.Background(), nil)
if err == nil || resp != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
success, err := etcd.PutNoOverride(context.Background(), registry.WithStrKey("/test_txn/a"))
if err != nil || !success {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
success, err = etcd.PutNoOverride(context.Background(), registry.WithStrKey("/test_txn/a"), registry.WithStrValue("a"))
if err != nil || success {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
resp, err = etcd.Txn(context.Background(), []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_txn/a"), Value: []byte("a")},
{Action: registry.Put, Key: []byte("/test_txn/b"), Value: []byte("b")},
})
if err != nil || resp == nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
resp, err = etcd.Do(context.Background(), registry.GET, registry.WithStrKey("/test_txn/"),
registry.WithPrefix(), registry.WithCountOnly())
if err != nil || !resp.Succeeded || resp.Count != 2 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
resp, err = etcd.TxnWithCmp(context.Background(), []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_txn/a"), Value: []byte("a")},
{Action: registry.Put, Key: []byte("/test_txn/b"), Value: []byte("b")},
}, []registry.CompareOp{
{[]byte("/test_txn/a"), registry.CMP_VALUE, registry.CMP_EQUAL, "a"},
}, []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_txn/c"), Value: []byte("c")},
{Action: registry.Put, Key: []byte("/test_txn/d"), Value: []byte("d")},
})
if err != nil || resp == nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
// case: range request
resp, err = etcd.TxnWithCmp(context.Background(), nil, []registry.CompareOp{
{[]byte("/test_txn/c"), registry.CMP_VALUE, registry.CMP_EQUAL, "c"},
}, []registry.PluginOp{
{Action: registry.Get, Key: []byte("/test_txn/a")},
{Action: registry.Get, Key: []byte("/test_txn/"), Prefix: true},
})
if err != nil || resp == nil || resp.Succeeded || resp.Count != 3 { // a + [a,b]
t.Fatalf("TestEtcdClient failed, %#v", err)
}
// case: test key not exist
resp, err = etcd.TxnWithCmp(context.Background(), []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_txn/a"), Value: []byte("a")},
{Action: registry.Put, Key: []byte("/test_txn/b"), Value: []byte("b")},
}, []registry.CompareOp{
{[]byte("/test_txn/c"), registry.CMP_VALUE, registry.CMP_EQUAL, "c"},
}, []registry.PluginOp{
{Action: registry.Delete, Key: []byte("/test_txn/"), Prefix: true},
})
if err != nil || resp == nil || resp.Succeeded {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
resp, err = etcd.Do(context.Background(), registry.GET, registry.WithStrKey("/test_txn/"),
registry.WithPrefix(), registry.WithCountOnly())
if err != nil || !resp.Succeeded || resp.Count != 0 {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
}
func TestEtcdClient_LeaseRenew(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcd.Close()
id, err := etcd.LeaseGrant(context.Background(), -1)
if err != nil || id == 0 {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
id, err = etcd.LeaseGrant(context.Background(), 0)
if err != nil || id == 0 {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
id, err = etcd.LeaseGrant(context.Background(), 2)
if err != nil || id == 0 {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
ttl, err := etcd.LeaseRenew(context.Background(), id)
if err != nil || ttl != 2 {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
err = etcd.LeaseRevoke(context.Background(), id)
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
ttl, err = etcd.LeaseRenew(context.Background(), id)
if err == nil || ttl != 0 {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}
func TestEtcdClient_HealthCheck(t *testing.T) {
etcdc := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
AutoSyncInterval: time.Millisecond,
}
err := etcdc.Initialize()
assert.NoError(t, err)
defer etcdc.Close()
err = etcdc.ReOpen()
assert.NoError(t, err)
ctx, _ := context.WithTimeout(context.Background(), dialTimeout)
err = etcdc.SyncMembers(ctx)
assert.NoError(t, err)
etcdc.Endpoints = []string{"x"}
err = etcdc.ReOpen()
assert.Error(t, err)
t.Run("before check", func(t *testing.T) {
ctx, _ = context.WithTimeout(context.Background(), dialTimeout)
err = etcdc.SyncMembers(ctx)
assert.NoError(t, err)
})
etcdc.Endpoints = []string{endpoint}
etcdc.Close()
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
go etcdc.healthCheckLoop(ctx)
for {
_, err = etcdc.Do(context.Background(), registry.GET,
registry.WithStrKey("/test_health/"))
if err != nil {
time.Sleep(1 * time.Second)
continue
} else {
break
}
}
assert.NoError(t, err)
}
func TestEtcdClient_Watch(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcd.Close()
defer func() {
resp, err := etcd.Do(context.Background(), registry.DEL, registry.WithStrKey("/test_watch/"),
registry.WithPrefix())
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
cancel()
err = etcd.Watch(ctx, registry.WithStrKey("/test_watch/a"))
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
ch := make(chan struct{})
go func() {
defer func() { ch <- struct{}{} }()
err = etcd.Watch(context.Background(), registry.WithStrKey("/test_watch/a"),
registry.WithWatchCallback(func(message string, evt *registry.PluginResponse) error {
if evt.Count != 1 || len(evt.Kvs) != 1 || evt.Action != registry.Put ||
string(evt.Kvs[0].Key) != "/test_watch/a" || string(evt.Kvs[0].Value) != "a" {
t.Fatalf("TestEtcdClient failed, %#v", evt)
}
return fmt.Errorf("error")
}))
if err == nil || err.Error() != "error" {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}()
<-time.After(500 * time.Millisecond)
resp, err := etcd.Do(context.Background(), registry.PUT, registry.WithStrKey("/test_watch/a"),
registry.WithStrValue("a"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
<-ch
go func() {
defer func() { ch <- struct{}{} }()
err = etcd.Watch(context.Background(), registry.WithStrKey("/test_watch/"),
registry.WithPrefix(),
registry.WithWatchCallback(func(message string, evt *registry.PluginResponse) error {
equalA := evt.Action == registry.Put && string(evt.Kvs[0].Key) == "/test_watch/a" && string(evt.Kvs[0].Value) == "a"
equalB := evt.Action == registry.Put && string(evt.Kvs[1].Key) == "/test_watch/b" && string(evt.Kvs[0].Value) == "b"
if evt.Count != 2 || len(evt.Kvs) != 2 || !(equalA || equalB) {
t.Fatalf("TestEtcdClient failed, %#v", evt)
}
return fmt.Errorf("error")
}))
if err == nil || err.Error() != "error" {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}()
<-time.After(500 * time.Millisecond)
resp, err = etcd.Txn(context.Background(), []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_watch/a"), Value: []byte("a")},
{Action: registry.Put, Key: []byte("/test_watch/b"), Value: []byte("b")},
})
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
<-ch
// diff action type will be split
go func() {
defer func() { ch <- struct{}{} }()
var times = 3
err = etcd.Watch(context.Background(), registry.WithStrKey("/test_watch/"),
registry.WithPrefix(),
registry.WithWatchCallback(func(message string, evt *registry.PluginResponse) error {
equalA := evt.Action == registry.Delete && string(evt.Kvs[0].Key) == "/test_watch/a" && evt.Kvs[0].Value == nil
equalB := evt.Action == registry.Put && string(evt.Kvs[0].Key) == "/test_watch/b" && string(evt.Kvs[0].Value) == "b"
equalC := evt.Action == registry.Put && string(evt.Kvs[0].Key) == "/test_watch/c" && string(evt.Kvs[0].Value) == "c"
if evt.Count != 1 || len(evt.Kvs) != 1 || !(equalA || equalB || equalC) {
t.Fatalf("TestEtcdClient failed, %#v", evt)
}
times--
if times == 0 {
return fmt.Errorf("error")
}
return nil
}))
if err == nil || err.Error() != "error" {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}()
<-time.After(500 * time.Millisecond)
resp, err = etcd.Txn(context.Background(), []registry.PluginOp{
{Action: registry.Put, Key: []byte("/test_watch/c"), Value: []byte("c")},
{Action: registry.Delete, Key: []byte("/test_watch/a"), Value: []byte("a")},
{Action: registry.Put, Key: []byte("/test_watch/b"), Value: []byte("b")},
})
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
<-ch
// watch with rev
resp, err = etcd.Do(context.Background(), registry.DEL, registry.WithStrKey("/test_watch/c"),
registry.WithStrValue("a"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
rev := resp.Revision
go func() {
defer func() { ch <- struct{}{} }()
err = etcd.Watch(context.Background(), registry.WithStrKey("/test_watch/"),
registry.WithPrefix(),
registry.WithRev(rev),
registry.WithWatchCallback(func(message string, evt *registry.PluginResponse) error {
if evt.Count != 1 || len(evt.Kvs) != 1 || evt.Action != registry.Delete ||
string(evt.Kvs[0].Key) != "/test_watch/c" || evt.Kvs[0].Value != nil {
t.Fatalf("TestEtcdClient failed, %#v", evt)
}
return fmt.Errorf("error")
}))
if err == nil || err.Error() != "error" {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}()
<-ch
// delete with prevKV
go func() {
defer func() { ch <- struct{}{} }()
err = etcd.Watch(context.Background(), registry.WithStrKey("/test_watch/"),
registry.WithPrefix(), registry.WithPrevKv(),
registry.WithWatchCallback(func(message string, evt *registry.PluginResponse) error {
if len(evt.Kvs) != 1 || evt.Action != registry.Delete ||
string(evt.Kvs[0].Key) != "/test_watch/b" || string(evt.Kvs[0].Value) != "b" {
t.Fatalf("TestEtcdClient failed, %#v", evt)
}
return fmt.Errorf("error")
}))
if err == nil || err.Error() != "error" {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}()
<-time.After(500 * time.Millisecond)
resp, err = etcd.Do(context.Background(), registry.DEL, registry.WithStrKey("/test_watch/b"))
if err != nil || !resp.Succeeded {
t.Fatalf("TestEtcdClient_Do failed, %#v", err)
}
<-ch
}
type mockKVForPagine struct {
rangeCount int
countResp *clientv3.GetResponse
rangeResp1 *clientv3.GetResponse
rangeResp2 *clientv3.GetResponse
}
func (m *mockKVForPagine) Put(ctx context2.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
return nil, nil
}
func (m *mockKVForPagine) Get(ctx context2.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
op := &clientv3.Op{}
for _, o := range opts {
o(op)
}
if op.IsCountOnly() {
return m.countResp, nil
}
if m.rangeCount == 0 {
m.rangeCount = 1
return m.rangeResp1, nil
}
return m.rangeResp2, nil
}
func (m *mockKVForPagine) Delete(ctx context2.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
return nil, nil
}
func (m *mockKVForPagine) Compact(ctx context2.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
return nil, nil
}
func (m *mockKVForPagine) Do(ctx context2.Context, op clientv3.Op) (clientv3.OpResponse, error) {
return clientv3.OpResponse{}, nil
}
func (m *mockKVForPagine) Txn(ctx context2.Context) clientv3.Txn {
return nil
}
// test scenario: db data decreases during paging.
func TestEtcdClient_paging(t *testing.T) {
// key range: [startKey, endKey)
generateGetResp := func(startKey, endKey int) *clientv3.GetResponse {
resp := &clientv3.GetResponse{
Count: int64(endKey - startKey),
Header: &etcdserverpb.ResponseHeader{
Revision: 0,
},
Kvs: make([]*mvccpb.KeyValue, 0),
}
if resp.Count <= 0 {
return resp
}
for i := startKey; i < endKey; i++ {
kvPart := &mvccpb.KeyValue{
Key: []byte(fmt.Sprint(i)),
Value: []byte(""),
}
resp.Kvs = append(resp.Kvs, kvPart)
}
return resp
}
mockKv := &mockKVForPagine{
rangeCount: 0,
// if count only, return 4097 kvs
countResp: generateGetResp(0, 4097),
// the first paging request, return 4096 kvs
rangeResp1: generateGetResp(0, 4096),
// the second paging request, return 0 kv
// meaning data decreases during paging
rangeResp2: generateGetResp(0, 0),
}
c := EtcdClient{
Client: &clientv3.Client{
KV: mockKv,
},
}
op := registry.PluginOp{
Offset: -1,
Limit: registry.DEFAULT_PAGE_COUNT,
}
r, err := c.paging(context2.Background(), op)
if err != nil {
t.Fatalf("TestEtcdClient_paging failed, %#v", err)
}
if len(r.Kvs) <= 0 {
t.Fatalf("TestEtcdClient_paging failed")
}
}
func TestNewRegistry(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint, "0.0.0.0:2379"},
DialTimeout: dialTimeout,
AutoSyncInterval: time.Millisecond,
}
err := etcd.Initialize()
if err == nil {
// should be err, member list does not contain one of the endpoints.
t.Fatalf("TestEtcdClient failed, %#v", err)
}
}
func TestWithTLS(t *testing.T) {
sslRoot := "../../../../../examples/service_center/ssl/"
os.Setenv("SSL_ROOT", sslRoot)
core.ServerInfo.Config.SslEnabled = true
registry.Configuration().SslEnabled = true
defer func() {
core.ServerInfo.Config.SslEnabled = false
registry.Configuration().SslEnabled = false
os.Setenv("SSL_ROOT", "")
}()
svr, err := rpc.NewServer("127.0.0.1:0")
go func() {
svr.Serve()
}()
defer svr.Stop()
etcd := &EtcdClient{
DialTimeout: dialTimeout,
Endpoints: []string{svr.Listener.Addr().String()},
}
err = etcd.Initialize()
// initialize the etcd client will check member list firstly,
// so will raise an grpc error but not TLS errors.
if _, ok := status.FromError(err); !ok {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
defer etcd.Close()
}