blob: 09de6e6e3e5daf3d1b7ed0a3699908136cdbde5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package session
import (
"context"
"sync"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/rrdb"
kerrors "k8s.io/apimachinery/pkg/util/errors"
)
// ReplicaSession represents the network session between client and
// replica server.
type ReplicaSession struct {
NodeSession
}
func (rs *ReplicaSession) Get(ctx context.Context, gpid *base.Gpid, partitionHash uint64, key *base.Blob) (*rrdb.ReadResponse, error) {
args := &rrdb.RrdbGetArgs{Key: key}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_GET")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbGetResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) Put(ctx context.Context, gpid *base.Gpid, partitionHash uint64, update *rrdb.UpdateRequest) (*rrdb.UpdateResponse, error) {
args := &rrdb.RrdbPutArgs{Update: update}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_PUT")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbPutResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) Del(ctx context.Context, gpid *base.Gpid, partitionHash uint64, key *base.Blob) (*rrdb.UpdateResponse, error) {
args := &rrdb.RrdbRemoveArgs{Key: key}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_REMOVE")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbRemoveResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) MultiGet(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.MultiGetRequest) (*rrdb.MultiGetResponse, error) {
args := &rrdb.RrdbMultiGetArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_MULTI_GET")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbMultiGetResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) MultiSet(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.MultiPutRequest) (*rrdb.UpdateResponse, error) {
args := &rrdb.RrdbMultiPutArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_MULTI_PUT")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbMultiPutResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) MultiDelete(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.MultiRemoveRequest) (*rrdb.MultiRemoveResponse, error) {
args := &rrdb.RrdbMultiRemoveArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_MULTI_REMOVE")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbMultiRemoveResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) TTL(ctx context.Context, gpid *base.Gpid, partitionHash uint64, key *base.Blob) (*rrdb.TTLResponse, error) {
args := &rrdb.RrdbTTLArgs{Key: key}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_TTL")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbTTLResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) GetScanner(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.GetScannerRequest) (*rrdb.ScanResponse, error) {
args := &rrdb.RrdbGetScannerArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_GET_SCANNER")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbGetScannerResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) Scan(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
args := &rrdb.RrdbScanArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_SCAN")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbScanResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) ClearScanner(ctx context.Context, gpid *base.Gpid, partitionHash uint64, contextId int64) error {
args := &rrdb.RrdbClearScannerArgs{ContextID: contextId}
_, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_CLEAR_SCANNER")
if err != nil {
return err
}
return nil
}
func (rs *ReplicaSession) CheckAndSet(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.CheckAndSetRequest) (*rrdb.CheckAndSetResponse, error) {
args := &rrdb.RrdbCheckAndSetArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_CHECK_AND_SET")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbCheckAndSetResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) SortKeyCount(ctx context.Context, gpid *base.Gpid, partitionHash uint64, hashKey *base.Blob) (*rrdb.CountResponse, error) {
args := &rrdb.RrdbSortkeyCountArgs{HashKey: hashKey}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_SORTKEY_COUNT")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbSortkeyCountResult)
return ret.GetSuccess(), nil
}
func (rs *ReplicaSession) Incr(ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.IncrRequest) (*rrdb.IncrResponse, error) {
args := &rrdb.RrdbIncrArgs{Request: request}
result, err := rs.CallWithGpid(ctx, gpid, partitionHash, args, "RPC_RRDB_RRDB_INCR")
if err != nil {
return nil, err
}
ret, _ := result.(*rrdb.RrdbIncrResult)
return ret.GetSuccess(), nil
}
// ReplicaManager manages the pool of sessions to replica servers, so that
// different tables that locate on the same replica server can share one
// ReplicaSession, without the effort of creating a new connection.
type ReplicaManager struct {
// rpc address -> replica
replicas map[string]*ReplicaSession
sync.RWMutex
creator NodeSessionCreator
unresponsiveHandler UnresponsiveHandler
}
// UnresponsiveHandler is a callback executed when the session is in unresponsive state.
type UnresponsiveHandler func(NodeSession)
// SetUnresponsiveHandler inits the UnresponsiveHandler.
func (rm *ReplicaManager) SetUnresponsiveHandler(handler UnresponsiveHandler) {
rm.unresponsiveHandler = handler
}
// Create a new session to the replica server if no existing one.
func (rm *ReplicaManager) GetReplica(addr string) *ReplicaSession {
rm.Lock()
defer rm.Unlock()
if _, ok := rm.replicas[addr]; !ok {
r := &ReplicaSession{
NodeSession: rm.creator(addr, NodeTypeReplica),
}
withUnresponsiveHandler(r.NodeSession, rm.unresponsiveHandler)
rm.replicas[addr] = r
}
return rm.replicas[addr]
}
func NewReplicaManager(creator NodeSessionCreator) *ReplicaManager {
return &ReplicaManager{
replicas: make(map[string]*ReplicaSession),
creator: creator,
}
}
func (rm *ReplicaManager) Close() error {
rm.Lock()
defer rm.Unlock()
funcs := make([]func() error, 0, len(rm.replicas))
for _, r := range rm.replicas {
rep := r
funcs = append(funcs, func() error {
return rep.Close()
})
}
return kerrors.AggregateGoroutines(funcs...)
}
func (rm *ReplicaManager) ReplicaCount() int {
rm.RLock()
defer rm.RUnlock()
return len(rm.replicas)
}