blob: 0832f6048ed2095d5fbccfa030a9c8cc238cf369 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pegasus
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
func TestPegasusClient_OpenTable(t *testing.T) {
defer leaktest.Check(t)()
cfg := Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}
client := NewClient(cfg)
defer client.Close()
tb1, err := client.OpenTable(context.Background(), "temp")
assert.NoError(t, err)
assert.NotNil(t, tb1)
tb2, err := client.OpenTable(context.Background(), "temp")
assert.NoError(t, err)
assert.NotNil(t, tb1)
// must reuse previous connection
assert.Equal(t, tb1, tb2)
pclient, _ := client.(*pegasusClient)
assert.NotNil(t, pclient.findTable("temp"))
tb, err := client.OpenTable(context.Background(), "table_not_exists")
assert.Error(t, err)
assert.Contains(t, err.Error(), "ERR_OBJECT_NOT_FOUND")
assert.Nil(t, tb)
}
func TestPegasusClient_OpenTableTimeout(t *testing.T) {
defer leaktest.Check(t)()
// make sure the port 8801 is not opened on your computer.
cfg := Config{
MetaServers: []string{"0.0.0.0:8801"},
}
client := NewClient(cfg)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
tb, err := client.OpenTable(ctx, "temp")
assert.Equal(t, ctx.Err(), context.DeadlineExceeded)
assert.Nil(t, tb)
assert.NotNil(t, err)
client.Close()
}
// Ensure that concurrent OpenTable operations to the same table
// won't invoke more than one query to meta server.
func TestPegasusClient_ConcurrentOpenSameTable(t *testing.T) {
defer leaktest.Check(t)()
cfg := Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}
client := NewClient(cfg)
defer client.Close()
openTableQueries := 100
tblist := make([]TableConnector, openTableQueries)
var wg sync.WaitGroup
wg.Add(openTableQueries)
for i := 0; i < openTableQueries; i++ {
idx := i
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
tb, err := client.OpenTable(ctx, "temp")
assert.Nil(t, err)
tblist[idx] = tb
wg.Done()
}()
}
wg.Wait()
// all tables returned by OpenTable must be the same one
tb := tblist[0]
for i := 1; i < openTableQueries; i++ {
assert.Equal(t, tb, tblist[i])
}
}
// In this test we verifies if there's any easy bugs can be found in concurrent rpc.
func TestPegasusClient_ConcurrentMetaQueries(t *testing.T) {
defer leaktest.Check(t)()
cfg := Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}
client := NewClient(cfg)
defer client.Close()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
id := i
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := client.OpenTable(ctx, "table_not_exist"+fmt.Sprint(id))
assert.NotNil(t, err)
wg.Done()
}()
}
wg.Wait()
}
func TestPegasusClient_New(t *testing.T) {
c, err := newClientWithError(Config{
MetaServers: []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"},
})
assert.Nil(t, err)
_ = c.Close()
c, err = newClientWithError(Config{
MetaServers: []string{"127abc"},
})
assert.NotNil(t, err)
assert.Nil(t, c)
_, err = newClientWithError(Config{
MetaServers: []string{},
})
assert.NotNil(t, err)
}