blob: aa20f1073cbf018456fef849cbe89310045e31a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pegasus
import (
"context"
"sync"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/session"
)
// Client manages the client sessions to the pegasus cluster specified by `Config`.
// In order to reuse the previous connections, it's recommended to use one singleton
// client in your program. The operations upon a client instance are thread-safe.
type Client interface {
Close() error
// Open the specific pegasus table. If the table was opened before,
// it will reuse the previous connection to the table.
OpenTable(ctx context.Context, tableName string) (TableConnector, error)
}
type pegasusClient struct {
tables map[string]TableConnector
// protect the access of tables
mu sync.RWMutex
metaMgr *session.MetaManager
replicaMgr *session.ReplicaManager
}
// NewClient creates a new instance of pegasus client.
// It panics if the configured addresses are illegal.
func NewClient(cfg Config) Client {
c, err := newClientWithError(cfg)
if err != nil {
pegalog.GetLogger().Fatal(err)
return nil
}
return c
}
func newClientWithError(cfg Config) (Client, error) {
var err error
cfg.MetaServers, err = session.ResolveMetaAddr(cfg.MetaServers)
if err != nil {
return nil, err
}
c := &pegasusClient{
tables: make(map[string]TableConnector),
metaMgr: session.NewMetaManager(cfg.MetaServers, session.NewNodeSession),
replicaMgr: session.NewReplicaManager(session.NewNodeSession),
}
return c, nil
}
func (p *pegasusClient) Close() error {
p.mu.RLock()
defer p.mu.RUnlock()
for _, table := range p.tables {
if err := table.Close(); err != nil {
return err
}
}
if err := p.metaMgr.Close(); err != nil {
pegalog.GetLogger().Fatal("pegasus-go-client: unable to close MetaManager: ", err)
}
return p.replicaMgr.Close()
}
func (p *pegasusClient) OpenTable(ctx context.Context, tableName string) (TableConnector, error) {
tb, err := func() (TableConnector, error) {
// ensure only one goroutine is fetching the routing table.
p.mu.Lock()
defer p.mu.Unlock()
if tb := p.findTable(tableName); tb != nil {
return tb, nil
}
var tb TableConnector
tb, err := ConnectTable(ctx, tableName, p.metaMgr, p.replicaMgr)
if err != nil {
return nil, err
}
p.tables[tableName] = tb
return tb, nil
}()
return tb, WrapError(err, OpQueryConfig)
}
func (p *pegasusClient) findTable(tableName string) TableConnector {
if tb, ok := p.tables[tableName]; ok {
return tb
}
return nil
}