blob: b4016748ea8d29f3997317ced4f82a226373dc5e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pegasus
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
"testing"
"time"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/rpc"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
// This is the integration test of the client. Please start the pegasus onebox
// before you running the tests.
func testSingleKeyOperations(t *testing.T, tb TableConnector, hashKey []byte, sortKey []byte, value []byte) {
// read after write
assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
assert.Nil(t, tb.Set(context.Background(), hashKey, sortKey, value))
result, err := tb.Get(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Equal(t, value, result)
exist, err := tb.Exist(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Equal(t, true, exist)
// ensure GET a non-existed entry returns a nil value
assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
result = nil
result, err = tb.Get(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Nil(t, result)
exist, err = tb.Exist(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Equal(t, false, exist)
// === ttl === //
ttl, err := tb.TTL(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Equal(t, ttl, -2)
assert.Nil(t, tb.Set(context.Background(), hashKey, sortKey, value))
ttl, err = tb.TTL(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Equal(t, ttl, -1)
assert.Nil(t, tb.SetTTL(context.Background(), hashKey, sortKey, value, time.Second*10))
ttl, err = tb.TTL(context.Background(), hashKey, sortKey)
assert.Nil(t, err)
assert.Condition(t, func() bool {
// pegasus server may return a ttl slightly different
// from the value we set.
return ttl <= 11 && ttl >= 9
})
// test with invalid ttl
assert.Error(t, tb.SetTTL(context.Background(), hashKey, sortKey, value, -10))
assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
}
var testingCfg = Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}
func TestPegasusTableConnector_SingleKeyOperations(t *testing.T) {
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
// run sequentially
for i := 0; i < 10; i++ {
hashKey := []byte(fmt.Sprintf("h%d", i))
sortKey := []byte(fmt.Sprintf("s%d", i))
value := []byte(fmt.Sprintf("v%d", i))
testSingleKeyOperations(t, tb, hashKey, sortKey, value)
}
// run concurrently
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
id := i
go func() {
hashKey := []byte(fmt.Sprintf("h%d", id))
sortKey := []byte(fmt.Sprintf("s%d", id))
value := []byte(fmt.Sprintf("v%d", id))
testSingleKeyOperations(t, tb, hashKey, sortKey, value)
wg.Done()
}()
}
wg.Wait()
}
func TestPegasusTableConnector_EmptyInput(t *testing.T) {
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
// Get
_, err = tb.Get(context.Background(), nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
_, err = tb.Get(context.Background(), []byte{}, nil)
assert.Contains(t, err.Error(), "hashkey must not be empty")
_, err = tb.Get(context.Background(), []byte("h1"), nil)
assert.Contains(t, err.Error(), "sortkey must not be nil")
_, err = tb.Get(context.Background(), []byte("h1"), []byte(""))
assert.Nil(t, err)
// Set
err = tb.SetTTL(context.Background(), nil, nil, nil, 0)
assert.Contains(t, err.Error(), "hashkey must not be nil")
err = tb.Set(context.Background(), []byte{}, nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be empty")
err = tb.SetTTL(context.Background(), []byte("h1"), nil, []byte(""), 0)
assert.Contains(t, err.Error(), "sortkey must not be nil")
err = tb.SetTTL(context.Background(), []byte("h1"), []byte(""), nil, 0)
assert.Contains(t, err.Error(), "value must not be nil")
err = tb.SetTTL(context.Background(), []byte("h1"), []byte(""), []byte(""), 0)
assert.Nil(t, err)
// Del
err = tb.Del(context.Background(), nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
err = tb.Del(context.Background(), []byte{}, nil)
assert.Contains(t, err.Error(), "hashkey must not be empty")
err = tb.Del(context.Background(), []byte("h1"), nil)
assert.Contains(t, err.Error(), "sortkey must not be nil")
err = tb.Del(context.Background(), []byte("h1"), []byte(""))
assert.Nil(t, err)
// MultiGet
_, _, err = tb.MultiGet(context.Background(), nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
_, _, err = tb.MultiGetOpt(context.Background(), []byte{}, nil, &MultiGetOptions{})
assert.Contains(t, err.Error(), "hashkey must not be empty")
_, _, err = tb.MultiGet(context.Background(), []byte("h1"), nil)
assert.Nil(t, err)
_, _, err = tb.MultiGetOpt(context.Background(), []byte("h1"), [][]byte{}, &MultiGetOptions{})
assert.Nil(t, err)
_, _, err = tb.MultiGetOpt(context.Background(), []byte("h1"), [][]byte{nil}, &MultiGetOptions{})
assert.Contains(t, err.Error(), "sortkeys[0] must not be nil")
// MultiGetRange
_, _, err = tb.MultiGetRange(context.Background(), nil, nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
_, _, err = tb.MultiGetRangeOpt(context.Background(), []byte{}, nil, nil, &MultiGetOptions{})
assert.Contains(t, err.Error(), "hashkey must not be empty")
// MultiSet
err = tb.MultiSet(context.Background(), nil, nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
err = tb.MultiSetOpt(context.Background(), []byte{}, nil, nil, 0)
assert.Contains(t, err.Error(), "hashkey must not be empty")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{[]byte("s1")}, nil, 0)
assert.Contains(t, err.Error(), "values must not be nil")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{[]byte("s1")}, [][]byte{}, 0)
assert.Contains(t, err.Error(), "values must not be empty")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{[]byte("s1")}, [][]byte{nil}, 0)
assert.Contains(t, err.Error(), "values[0] must not be nil")
err = tb.MultiSet(context.Background(), []byte("h1"), nil, [][]byte{[]byte("v1")})
assert.Contains(t, err.Error(), "sortkeys must not be nil")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{}, [][]byte{[]byte("v1")}, 0)
assert.Contains(t, err.Error(), "sortkeys must not be empty")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{nil}, [][]byte{[]byte("v1")}, 0)
assert.Contains(t, err.Error(), "sortkeys[0] must not be nil")
err = tb.MultiSetOpt(context.Background(), []byte("h1"), [][]byte{[]byte("")}, [][]byte{[]byte("v1")}, 0)
assert.Nil(t, err)
// MultiDel
err = tb.MultiDel(context.Background(), nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
err = tb.MultiDel(context.Background(), []byte{}, nil)
assert.Contains(t, err.Error(), "hashkey must not be empty")
err = tb.MultiDel(context.Background(), []byte("h1"), nil)
assert.Contains(t, err.Error(), "sortkeys must not be nil")
err = tb.MultiDel(context.Background(), []byte("h1"), [][]byte{})
assert.Contains(t, err.Error(), "sortkeys must not be empty")
err = tb.MultiDel(context.Background(), []byte("h1"), [][]byte{nil})
assert.Contains(t, err.Error(), "sortkeys[0] must not be nil")
// TTL
_, err = tb.TTL(context.Background(), nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
_, err = tb.TTL(context.Background(), []byte{}, nil)
assert.Contains(t, err.Error(), "hashkey must not be empty")
_, err = tb.TTL(context.Background(), []byte("h1"), nil)
assert.Contains(t, err.Error(), "sortkey must not be nil")
_, err = tb.TTL(context.Background(), []byte("h1"), []byte(""))
assert.Nil(t, err)
}
func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) {
defer leaktest.Check(t)()
ptb := &pegasusTableConnector{
tableName: "temp",
meta: nil,
replica: nil,
confUpdateCh: make(chan bool, 1),
logger: pegalog.GetLogger(),
}
confUpdate, retry, err := ptb.handleReplicaError(nil, nil) // no error
assert.NoError(t, err)
assert.False(t, confUpdate)
assert.False(t, retry)
confUpdate, retry, err = ptb.handleReplicaError(errors.New("not nil"), nil) // unknown error
<-ptb.confUpdateCh // must trigger confUpdate
assert.Error(t, err)
assert.True(t, confUpdate)
assert.True(t, retry)
confUpdate, retry, err = ptb.handleReplicaError(base.ERR_OBJECT_NOT_FOUND, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.True(t, retry)
confUpdate, retry, err = ptb.handleReplicaError(base.ERR_INVALID_STATE, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)
confUpdate, retry, err = ptb.handleReplicaError(base.ERR_PARENT_PARTITION_MISUSED, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)
confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)
{ // Ensure: The following errors should not trigger configuration update
errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}
for _, err := range errorTypes {
channelEmpty := false
confUpdate, retry, err = ptb.handleReplicaError(err, nil)
select {
case <-ptb.confUpdateCh:
default:
channelEmpty = true
}
assert.True(t, channelEmpty)
assert.Error(t, err)
assert.False(t, confUpdate)
assert.False(t, retry)
}
}
}
func TestPegasusTableConnector_ValidateHashKey(t *testing.T) {
hashKey := []byte(nil)
assert.NotNil(t, validateHashKey(hashKey))
hashKey = make([]byte, 0)
assert.NotNil(t, validateHashKey(hashKey))
hashKey = make([]byte, math.MaxUint16+1)
assert.NotNil(t, validateHashKey(hashKey))
}
func TestPegasusTableConnector_HandleInvalidQueryConfigResp(t *testing.T) {
defer leaktest.Check(t)()
partitionCount := 8
p := &pegasusTableConnector{
tableName: "temp",
parts: make([]*replicaNode, partitionCount),
}
{
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_BUSY"}
err := p.handleQueryConfigResp(resp)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "ERR_BUSY")
}
{
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
err := p.handleQueryConfigResp(resp)
assert.NotNil(t, err)
resp.Partitions = make([]*replication.PartitionConfiguration, 10)
resp.PartitionCount = 5
err = p.handleQueryConfigResp(resp)
assert.NotNil(t, err)
assert.Equal(t, partitionCount, len(p.parts))
}
{
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
resp.Partitions = make([]*replication.PartitionConfiguration, 6)
resp.PartitionCount = 6
err := p.handleQueryConfigResp(resp)
assert.NotNil(t, err)
assert.Equal(t, partitionCount, len(p.parts))
}
{
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
resp.Partitions = make([]*replication.PartitionConfiguration, 2)
resp.PartitionCount = 2
err := p.handleQueryConfigResp(resp)
assert.NotNil(t, err)
assert.Equal(t, partitionCount, len(p.parts))
}
}
func TestPegasusTableConnector_QueryConfigRespWhileStartSplit(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
ptb, _ := tb.(*pegasusTableConnector)
partitionCount := len(ptb.parts)
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
resp.AppID = ptb.appID
resp.PartitionCount = int32(partitionCount * 2)
resp.Partitions = make([]*replication.PartitionConfiguration, partitionCount*2)
for i := 0; i < partitionCount*2; i++ {
if i < partitionCount {
resp.Partitions[i] = ptb.parts[i].pconf
} else {
conf := replication.NewPartitionConfiguration()
conf.Ballot = -1
conf.Pid = &base.Gpid{ptb.appID, int32(i)}
resp.Partitions[i] = conf
}
}
err = ptb.handleQueryConfigResp(resp)
assert.Nil(t, err)
assert.Equal(t, partitionCount*2, len(ptb.parts))
ptb.Close()
}
func TestPegasusTableConnector_QueryConfigRespWhileCancelSplit(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
ptb, _ := tb.(*pegasusTableConnector)
partitionCount := len(ptb.parts)
nodes := make([]*replicaNode, partitionCount*2)
for i := 0; i < partitionCount*2; i++ {
if i < partitionCount {
nodes[i] = ptb.parts[i]
} else {
conf := replication.NewPartitionConfiguration()
conf.Ballot = -1
conf.Pid = &base.Gpid{ptb.appID, int32(i)}
nodes[i] = &replicaNode{nil, conf}
}
}
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
resp.AppID = ptb.appID
resp.PartitionCount = int32(partitionCount)
resp.Partitions = make([]*replication.PartitionConfiguration, partitionCount)
for i := 0; i < partitionCount; i++ {
resp.Partitions[i] = ptb.parts[i].pconf
}
ptb.parts = make([]*replicaNode, partitionCount*2)
ptb.parts = nodes
err = ptb.handleQueryConfigResp(resp)
assert.Nil(t, err)
assert.Equal(t, partitionCount, len(ptb.parts))
ptb.Close()
}
func TestPegasusTableConnector_Close(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
// Ensure: Closing table doesn't close the connections.
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
ptb, _ := tb.(*pegasusTableConnector)
err = tb.Set(context.Background(), []byte("a"), []byte("a"), []byte("a"))
assert.Nil(t, err)
ptb.Close()
_, r := ptb.getPartition(crc64Hash([]byte("a")))
assert.Equal(t, r.ConnState(), rpc.ConnStateReady)
}
func TestPegasusTableConnector_GetPartitionIndex(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
ptb, _ := tb.(*pegasusTableConnector)
// hashKey = 'a', partitionCount = 8, target index is 4
targetIndex := 4
partitionHash := crc64Hash([]byte("a"))
gpid, _ := ptb.getPartition(partitionHash)
assert.Equal(t, gpid.PartitionIndex, int32(targetIndex))
}
func TestPegasusTableConnector_GetPartitionIndexRedirct(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
ptb, _ := tb.(*pegasusTableConnector)
partitionCount := len(ptb.parts)
nodes := make([]*replicaNode, partitionCount*2)
for i := 0; i < partitionCount*2; i++ {
if i < partitionCount {
nodes[i] = ptb.parts[i]
} else {
conf := replication.NewPartitionConfiguration()
conf.Ballot = -1
conf.Pid = &base.Gpid{ptb.appID, int32(i)}
nodes[i] = &replicaNode{nil, conf}
}
}
ptb.parts = make([]*replicaNode, partitionCount*2)
ptb.parts = nodes
// hashKey = 'a', partitionCount = 16, target index is 12
// But partition is invalid, it should redirect to parent partition, index is 4
targetIndex := 4
partitionHash := crc64Hash([]byte("a"))
gpid, _ := ptb.getPartition(partitionHash)
assert.Equal(t, gpid.PartitionIndex, int32(targetIndex))
}
func TestPegasusTableConnector_MultiKeyOperations(t *testing.T) {
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
testMultiKeyOperations(t, tb)
}
func testMultiKeyOperations(t *testing.T, tb TableConnector) {
hashKey := []byte("h1")
sortKeys := make([][]byte, 10)
values := make([][]byte, 10)
for i := 0; i < 10; i++ {
// make sortKeys sorted.
sidBuf := []byte(fmt.Sprintf("%d", i))
var sidWithLeadingZero bytes.Buffer
for k := 0; k < 20-len(sidBuf); k++ {
sidWithLeadingZero.WriteByte('0')
}
sidWithLeadingZero.Write(sidBuf)
sortKeys[i] = sidWithLeadingZero.Bytes()
values[i] = []byte(fmt.Sprintf("v%d", i))
}
// clear keyspace
results, allFetched, err := tb.MultiGetRange(context.Background(), hashKey, nil, nil)
assert.NoError(t, err)
assert.True(t, allFetched)
for _, result := range results {
assert.NoError(t, tb.Del(context.Background(), hashKey, result.SortKey))
}
count, err := tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, count, int64(0))
// empty database
results, allFetched, err = tb.MultiGet(context.Background(), hashKey, sortKeys)
assert.NoError(t, err)
assert.Nil(t, results)
assert.True(t, allFetched)
results, allFetched, err = tb.MultiGetRange(context.Background(), hashKey, nil, nil)
assert.NoError(t, err)
assert.Nil(t, results)
assert.True(t, allFetched)
// === read after write === //
assert.NoError(t, tb.MultiSet(context.Background(), hashKey, sortKeys, values))
results, allFetched, err = tb.MultiGet(context.Background(), hashKey, sortKeys)
assert.NoError(t, err)
assert.Equal(t, len(results), len(values))
for i, result := range results {
assert.Equal(t, result.Value, values[i])
assert.Equal(t, result.SortKey, sortKeys[i])
}
assert.True(t, allFetched)
count, err = tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, count, int64(len(sortKeys)))
// test StartInclusive & StopInclusive
results, allFetched, err = tb.MultiGetRangeOpt(context.Background(), hashKey, sortKeys[0], sortKeys[len(sortKeys)-1],
&MultiGetOptions{StartInclusive: true, StopInclusive: true})
assert.NoError(t, err)
assert.Equal(t, len(results), len(values))
for i, result := range results {
assert.Equal(t, result.Value, values[i])
assert.Equal(t, result.SortKey, sortKeys[i])
}
assert.True(t, allFetched)
results, allFetched, err = tb.MultiGetRangeOpt(context.Background(), hashKey, sortKeys[0], sortKeys[len(sortKeys)-1],
&MultiGetOptions{StartInclusive: false, StopInclusive: false})
assert.NoError(t, err)
assert.Equal(t, len(results), len(values)-2) // exclude start and stop
for i, result := range results {
assert.Equal(t, result.Value, values[i+1])
assert.Equal(t, result.SortKey, sortKeys[i+1])
}
assert.True(t, allFetched)
// test MaxFetchCount
results, allFetched, err = tb.MultiGetOpt(context.Background(), hashKey, sortKeys, &MultiGetOptions{MaxFetchCount: 4})
assert.NoError(t, err)
assert.Equal(t, len(results), 4)
assert.False(t, allFetched)
// test MaxFetchSize
results, allFetched, err = tb.MultiGetOpt(context.Background(), hashKey, sortKeys, &MultiGetOptions{MaxFetchSize: len(values[0])})
assert.NoError(t, err)
assert.Equal(t, len(results), 1)
assert.False(t, allFetched)
// ensure passing nil to `sortKeys` in MultiGet retrieves all entries
results, allFetched, err = tb.MultiGetOpt(context.Background(), hashKey, nil, &MultiGetOptions{})
assert.NoError(t, err)
assert.Equal(t, len(results), len(sortKeys))
assert.True(t, allFetched)
// test Reverse
results, allFetched, err = tb.MultiGetRangeOpt(context.Background(), hashKey, nil, nil, &MultiGetOptions{Reverse: true, MaxFetchCount: 1})
assert.NoError(t, err)
assert.Equal(t, allFetched, false)
assert.Equal(t, results, []*KeyValue{
{SortKey: sortKeys[len(sortKeys)-1], Value: values[len(sortKeys)-1]},
})
// test NoValue
results, allFetched, err = tb.MultiGetRangeOpt(context.Background(), hashKey, nil, nil, &MultiGetOptions{NoValue: true, MaxFetchCount: 1})
assert.NoError(t, err)
assert.False(t, allFetched)
assert.Equal(t, results, []*KeyValue{
{SortKey: sortKeys[0], Value: []byte("")},
})
// === ttl === //
assert.NoError(t, tb.MultiSetOpt(context.Background(), hashKey, sortKeys, values, 10*time.Second))
for _, sortKey := range sortKeys {
ttl, err := tb.TTL(context.Background(), hashKey, sortKey)
assert.NoError(t, err)
assert.Condition(t, func() bool {
// pegasus server may return a ttl slightly different
// from the value we set.
return ttl <= 11 && ttl >= 9
})
}
// test with invalid ttl
assert.Error(t, tb.MultiSetOpt(context.Background(), hashKey, sortKeys, values, -1*time.Second))
}
func TestPegasusTableConnector_CheckAndSet(t *testing.T) {
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
{ // CheckTypeValueNotExist
// if (h1, s1) not exists, insert (s1, v1)
err := tb.Del(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
res, err := tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueNotExist, []byte(""), []byte("s1"), []byte("v1"),
&CheckAndSetOptions{ReturnCheckValue: true})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, true)
assert.Equal(t, res.CheckValueReturned, true)
assert.Equal(t, res.CheckValueExist, false)
// since (h1, s1) exists, insertion of (s1, v1) failed
res, err = tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueNotExist, []byte(""), []byte("s1"), []byte("v1"),
&CheckAndSetOptions{ReturnCheckValue: true})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, false)
assert.Equal(t, res.CheckValueReturned, true)
assert.Equal(t, res.CheckValueExist, true)
assert.Equal(t, res.CheckValue, []byte("v1"))
}
{ // CheckTypeValueExist
// if (h1, s1) exists, insert (s1, v1)
// this op will failed since there's no such entry.
assert.Nil(t, tb.Del(context.Background(), []byte("h1"), []byte("s1")))
res, err := tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueExist, []byte(""), []byte("s1"), []byte("v1"),
&CheckAndSetOptions{ReturnCheckValue: true})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, false)
assert.Equal(t, res.CheckValueReturned, true)
assert.Equal(t, res.CheckValueExist, false)
assert.Nil(t, tb.Set(context.Background(), []byte("h1"), []byte("s1"), []byte("v1")))
res, err = tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueExist, []byte(""), []byte("s1"), []byte("v2"),
&CheckAndSetOptions{ReturnCheckValue: true})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, true)
assert.Equal(t, res.CheckValueReturned, true)
assert.Equal(t, res.CheckValueExist, true)
assert.Equal(t, res.CheckValue, []byte("v1"))
value, err := tb.Get(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
assert.Equal(t, value, []byte("v2"))
// set ttl to 10 if value exists
ttl, err := tb.TTL(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
assert.Equal(t, ttl, -1) // ttl is not set
res, err = tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueExist, []byte(""), []byte("s1"), []byte("v3"),
&CheckAndSetOptions{SetValueTTLSeconds: 10})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, true)
assert.Equal(t, res.CheckValueReturned, false)
assert.Equal(t, res.CheckValueExist, false) // no check value returned
ttl, err = tb.TTL(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
assert.Condition(t, func() bool {
return ttl >= 9 && ttl <= 11
})
}
{ // check sortkey and set sortkey are different
results, _, err := tb.MultiGetRange(context.Background(), []byte("h1"), nil, nil)
assert.Nil(t, err)
for _, result := range results {
assert.Nil(t, tb.Del(context.Background(), []byte("h1"), result.SortKey))
}
assert.Nil(t, tb.Set(context.Background(), []byte("h1"), []byte("s1"), []byte("v1")))
res, err := tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueExist, []byte(""), []byte("s2"), []byte("v2"),
&CheckAndSetOptions{ReturnCheckValue: true})
assert.Nil(t, err)
assert.Equal(t, res.SetSucceed, true)
assert.Equal(t, res.CheckValueReturned, true)
assert.Equal(t, res.CheckValueExist, true)
assert.Equal(t, res.CheckValue, []byte("v1"))
count, err := tb.SortKeyCount(context.Background(), []byte("h1"))
assert.Nil(t, err)
assert.Equal(t, count, int64(2))
value, err := tb.Get(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
assert.Equal(t, value, []byte("v1"))
value, err = tb.Get(context.Background(), []byte("h1"), []byte("s2"))
assert.Nil(t, err)
assert.Equal(t, value, []byte("v2"))
}
// test with invalid ttl
{
_, err := tb.CheckAndSet(context.Background(), []byte("h1"), []byte("s1"), CheckTypeValueExist, []byte(""), []byte("s2"), []byte("v2"),
&CheckAndSetOptions{SetValueTTLSeconds: -1})
assert.Error(t, err)
}
// TODO(wutao1): add tests for other check type
}
func TestPegasusTableConnector_Incr(t *testing.T) {
defer leaktest.Check(t)()
concurrency := 3
times := 20
{
client := NewClient(testingCfg)
tb, err := client.OpenTable(context.Background(), "temp")
assert.NoError(t, err)
err = tb.Del(context.Background(), []byte("idx_hash"), []byte("idx_sort"))
assert.NoError(t, err)
_ = tb.Close()
_ = client.Close()
}
sortedIDs := make([]int64, 0, times*concurrency)
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
client := NewClient(testingCfg)
tb, err := client.OpenTable(context.Background(), "temp")
assert.NoError(t, err)
ids := make([]int64, 0, times)
for i := 0; i < times; i++ {
value, err := tb.Incr(context.Background(), []byte("idx_hash"), []byte("idx_sort"), 1)
assert.NoError(t, err)
ids = append(ids, value)
}
mu.Lock()
sortedIDs = append(sortedIDs, ids...)
mu.Unlock()
_ = tb.Close()
_ = client.Close()
}()
}
wg.Wait()
sort.Slice(sortedIDs, func(i, j int) bool {
return sortedIDs[i] < sortedIDs[j]
})
for i := 0; i < times*concurrency; i++ {
assert.Equal(t, int64(i+1), sortedIDs[i])
}
}
func TestPegasusTableConnector_BatchGet(t *testing.T) {
defer leaktest.Check(t)()
client := NewClient(testingCfg)
defer client.Close()
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
err = tb.Del(context.Background(), []byte("h1"), []byte("s1"))
assert.Nil(t, err)
err = tb.Del(context.Background(), []byte("h2"), []byte("s2"))
assert.Nil(t, err)
err = tb.Del(context.Background(), []byte("h3"), []byte("s3"))
assert.Nil(t, err)
err = tb.Set(context.Background(), []byte("h1"), []byte("s1"), []byte("v1"))
assert.Nil(t, err)
err = tb.Set(context.Background(), []byte("h2"), []byte("s2"), []byte("v2"))
assert.Nil(t, err)
err = tb.Set(context.Background(), []byte("h3"), []byte("s3"), []byte("v3"))
assert.Nil(t, err)
keys := []CompositeKey{{HashKey: []byte("h1"), SortKey: []byte("s1")},
{HashKey: []byte("h2"), SortKey: []byte("s2")},
{HashKey: []byte("h3"), SortKey: []byte("s3")}}
values, err := tb.BatchGet(context.Background(), keys)
assert.Nil(t, err)
assert.Equal(t, values, [][]byte{[]byte("v1"), []byte("v2"), []byte("v3")})
values, err = tb.BatchGet(context.Background(), nil)
assert.Nil(t, values)
assert.Equal(t, err.Error(),
"pegasus BATCH_GET failed: InvalidParameter: CompositeKeys must not be nil")
values, err = tb.BatchGet(context.Background(), []CompositeKey{{HashKey: []byte{}, SortKey: nil}, {HashKey: nil, SortKey: nil}})
assert.Equal(t, values, [][]byte{nil, nil})
if err.Error() != "pegasus BATCH_GET failed: [pegasus GET failed: InvalidParameter: hashkey must not be empty, pegasus GET failed: InvalidParameter: hashkey must not be nil]" &&
err.Error() != "pegasus BATCH_GET failed: [pegasus GET failed: InvalidParameter: hashkey must not be nil, pegasus GET failed: InvalidParameter: hashkey must not be empty]" {
assert.NotNil(t, nil) // ordering of the errors is indefinite
}
}