| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package pegasus |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "math" |
| "sync" |
| "time" |
| |
| "github.com/apache/incubator-pegasus/go-client/idl/base" |
| "github.com/apache/incubator-pegasus/go-client/idl/replication" |
| "github.com/apache/incubator-pegasus/go-client/idl/rrdb" |
| "github.com/apache/incubator-pegasus/go-client/pegalog" |
| "github.com/apache/incubator-pegasus/go-client/pegasus/op" |
| "github.com/apache/incubator-pegasus/go-client/session" |
| "gopkg.in/tomb.v2" |
| kerrors "k8s.io/apimachinery/pkg/util/errors" |
| ) |
| |
| // KeyValue is the returned type of MultiGet and MultiGetRange. |
| type KeyValue struct { |
| SortKey, Value []byte |
| } |
| |
| // CompositeKey is a composition of HashKey and SortKey. |
| type CompositeKey struct { |
| HashKey, SortKey []byte |
| } |
| |
| // MultiGetOptions is the options for MultiGet and MultiGetRange, defaults to DefaultMultiGetOptions. |
| type MultiGetOptions struct { |
| StartInclusive bool |
| StopInclusive bool |
| SortKeyFilter Filter |
| |
| // MaxFetchCount and MaxFetchSize limit the size of returned result. |
| |
| // Max count of k-v pairs to be fetched. MaxFetchCount <= 0 means no limit. |
| MaxFetchCount int |
| |
| // Max size of k-v pairs to be fetched. MaxFetchSize <= 0 means no limit. |
| MaxFetchSize int |
| |
| // Query order |
| Reverse bool |
| |
| // Whether to retrieve keys only, without value. |
| // Enabling this option will reduce the network load, improve the RPC latency. |
| NoValue bool |
| } |
| |
| // DefaultMultiGetOptions defines the defaults of MultiGetOptions. |
| var DefaultMultiGetOptions = &MultiGetOptions{ |
| StartInclusive: true, |
| StopInclusive: false, |
| SortKeyFilter: Filter{ |
| Type: FilterTypeNoFilter, |
| Pattern: nil, |
| }, |
| MaxFetchCount: 100, |
| MaxFetchSize: 100000, |
| NoValue: false, |
| } |
| |
| // TableConnector is used to communicate with single Pegasus table. |
| type TableConnector interface { |
| // Get retrieves the entry for `hashKey` + `sortKey`. |
| // Returns nil if no entry matches. |
| // `hashKey` : CAN'T be nil or empty. |
| // `sortKey` : CAN'T be nil but CAN be empty. |
| Get(ctx context.Context, hashKey []byte, sortKey []byte) ([]byte, error) |
| |
| // Set the entry for `hashKey` + `sortKey` to `value`. |
| // If Set is called or `ttl` == 0, no data expiration is specified. |
| // `hashKey` : CAN'T be nil or empty. |
| // `sortKey` / `value` : CAN'T be nil but CAN be empty. |
| Set(ctx context.Context, hashKey []byte, sortKey []byte, value []byte) error |
| SetTTL(ctx context.Context, hashKey []byte, sortKey []byte, value []byte, ttl time.Duration) error |
| |
| // Delete the entry for `hashKey` + `sortKey`. |
| // `hashKey` : CAN'T be nil or empty. |
| // `sortKey` : CAN'T be nil but CAN be empty. |
| Del(ctx context.Context, hashKey []byte, sortKey []byte) error |
| |
| // MultiGet/MultiGetOpt retrieves the multiple entries for `hashKey` + `sortKeys[i]` atomically in one operation. |
| // MultiGet is identical to MultiGetOpt except that the former uses DefaultMultiGetOptions as `options`. |
| // |
| // If `sortKeys` are given empty or nil, all entries under `hashKey` will be retrieved. |
| // `hashKey` : CAN'T be nil or empty. |
| // `sortKeys[i]` : CAN'T be nil but CAN be empty. |
| // |
| // The returned key-value pairs are sorted by sort key in ascending order. |
| // Returns nil if no entries match. |
| // Returns true if all data is fetched, false if only partial data is fetched. |
| // |
| MultiGet(ctx context.Context, hashKey []byte, sortKeys [][]byte) ([]*KeyValue, bool, error) |
| MultiGetOpt(ctx context.Context, hashKey []byte, sortKeys [][]byte, options *MultiGetOptions) ([]*KeyValue, bool, error) |
| |
| // MultiGetRange retrieves the multiple entries under `hashKey`, between range (`startSortKey`, `stopSortKey`), |
| // atomically in one operation. |
| // |
| // startSortKey: nil or len(startSortKey) == 0 means start from begin. |
| // stopSortKey: nil or len(stopSortKey) == 0 means stop to end. |
| // `hashKey` : CAN'T be nil. |
| // |
| // The returned key-value pairs are sorted by sort keys in ascending order. |
| // Returns nil if no entries match. |
| // Returns true if all data is fetched, false if only partial data is fetched. |
| // |
| MultiGetRange(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte) ([]*KeyValue, bool, error) |
| MultiGetRangeOpt(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, options *MultiGetOptions) ([]*KeyValue, bool, error) |
| |
| // MultiSet sets the multiple entries for `hashKey` + `sortKeys[i]` atomically in one operation. |
| // `hashKey` / `sortKeys` / `values` : CAN'T be nil or empty. |
| // `sortKeys[i]` / `values[i]` : CAN'T be nil but CAN be empty. |
| MultiSet(ctx context.Context, hashKey []byte, sortKeys [][]byte, values [][]byte) error |
| MultiSetOpt(ctx context.Context, hashKey []byte, sortKeys [][]byte, values [][]byte, ttl time.Duration) error |
| |
| // MultiDel deletes the multiple entries under `hashKey` all atomically in one operation. |
| // `hashKey` / `sortKeys` : CAN'T be nil or empty. |
| // `sortKeys[i]` : CAN'T be nil but CAN be empty. |
| MultiDel(ctx context.Context, hashKey []byte, sortKeys [][]byte) error |
| |
| // Returns ttl(time-to-live) in seconds: -1 if ttl is not set; -2 if entry doesn't exist. |
| // `hashKey` : CAN'T be nil or empty. |
| // `sortKey` : CAN'T be nil but CAN be empty. |
| TTL(ctx context.Context, hashKey []byte, sortKey []byte) (int, error) |
| |
| // Check value existence for the entry for `hashKey` + `sortKey`. |
| // `hashKey`: CAN'T be nil or empty. |
| Exist(ctx context.Context, hashKey []byte, sortKey []byte) (bool, error) |
| |
| // Get Scanner for {startSortKey, stopSortKey} within hashKey. |
| // startSortKey: nil or len(startSortKey) == 0 means start from begin. |
| // stopSortKey: nil or len(stopSortKey) == 0 means stop to end. |
| // `hashKey`: CAN'T be nil or empty. |
| GetScanner(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, options *ScannerOptions) (Scanner, error) |
| |
| // Get Scanners for all data in pegasus, the count of scanners will |
| // be no more than maxSplitCount |
| GetUnorderedScanners(ctx context.Context, maxSplitCount int, options *ScannerOptions) ([]Scanner, error) |
| |
| // Atomically check and set value by key from the cluster. The value will be set if and only if check passed. |
| // The sort key for checking and setting can be the same or different. |
| // |
| // `checkSortKey`: The sort key for checking. |
| // `setSortKey`: The sort key for setting. |
| // `checkOperand`: |
| CheckAndSet(ctx context.Context, hashKey []byte, checkSortKey []byte, checkType CheckType, |
| checkOperand []byte, setSortKey []byte, setValue []byte, options *CheckAndSetOptions) (*CheckAndSetResult, error) |
| |
| // Returns the count of sortkeys under hashkey. |
| // `hashKey`: CAN'T be nil or empty. |
| SortKeyCount(ctx context.Context, hashKey []byte) (int64, error) |
| |
| // Atomically increment value by key from the cluster. |
| // Returns the new value. |
| // `hashKey` / `sortKeys` : CAN'T be nil or empty |
| Incr(ctx context.Context, hashKey []byte, sortKey []byte, increment int64) (int64, error) |
| |
| // Gets values from a batch of CompositeKeys. Internally it distributes each key |
| // into a Get call and wait until all returned. |
| // |
| // `keys`: CAN'T be nil or empty, `hashkey` in `keys` can't be nil or empty either. |
| // The returned values are in sequence order of each key, aka `keys[i] => values[i]`. |
| // If keys[i] is not found, or the Get failed, values[i] is set nil. |
| // |
| // Returns a non-nil `err` once there's a failed Get call. It doesn't mean all calls failed. |
| // |
| // NOTE: this operation is not guaranteed to be atomic |
| BatchGet(ctx context.Context, keys []CompositeKey) (values [][]byte, err error) |
| |
| // Deprecated: danger operation: table is managed by client and should only be Close once when client is exiting. |
| // this operation will kill all table-related loops |
| Close() error |
| } |
| |
| type pegasusTableConnector struct { |
| meta *session.MetaManager |
| replica *session.ReplicaManager |
| |
| logger pegalog.Logger |
| |
| tableName string |
| appID int32 |
| parts []*replicaNode |
| mu sync.RWMutex |
| |
| confUpdateCh chan bool |
| tom tomb.Tomb |
| } |
| |
| type replicaNode struct { |
| session *session.ReplicaSession |
| pconf *replication.PartitionConfiguration |
| } |
| |
| // ConnectTable queries for the configuration of the given table, and set up connection to |
| // the replicas which the table locates on. |
| func ConnectTable(ctx context.Context, tableName string, meta *session.MetaManager, replica *session.ReplicaManager) (TableConnector, error) { |
| p := &pegasusTableConnector{ |
| tableName: tableName, |
| meta: meta, |
| replica: replica, |
| confUpdateCh: make(chan bool, 1), |
| logger: pegalog.GetLogger(), |
| } |
| |
| // if the session became unresponsive, TableConnector auto-triggers |
| // a update of the routing table. |
| p.replica.SetUnresponsiveHandler(func(n session.NodeSession) { |
| p.tryConfUpdate(errors.New("session unresponsive for long"), n) |
| }) |
| |
| if err := p.updateConf(ctx); err != nil { |
| return nil, err |
| } |
| |
| p.tom.Go(p.loopForAutoUpdate) |
| return p, nil |
| } |
| |
| // Update configuration of this table. |
| func (p *pegasusTableConnector) updateConf(ctx context.Context) error { |
| resp, err := p.meta.QueryConfig(ctx, p.tableName) |
| if err == nil { |
| err = p.handleQueryConfigResp(resp) |
| } |
| if err != nil { |
| return fmt.Errorf("failed to connect table(%s): %s", p.tableName, err) |
| } |
| return nil |
| } |
| |
| func isPartitionValid(oldCount int, respCount int) bool { |
| return oldCount == 0 || oldCount == respCount || oldCount*2 == respCount || oldCount == respCount*2 |
| } |
| |
| func (p *pegasusTableConnector) handleQueryConfigResp(resp *replication.QueryCfgResponse) error { |
| if resp.Err.Errno != base.ERR_OK.String() { |
| return errors.New(resp.Err.Errno) |
| } |
| if resp.PartitionCount == 0 || len(resp.Partitions) != int(resp.PartitionCount) || !isPartitionValid(len(p.parts), int(resp.PartitionCount)) { |
| return fmt.Errorf("invalid table configuration: response [%v]", resp) |
| } |
| |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| |
| p.appID = resp.AppID |
| |
| if len(resp.Partitions) != len(p.parts) { |
| p.logger.Printf("table[%s] partition count update from %d to %d", p.tableName, len(p.parts), len(resp.Partitions)) |
| p.parts = make([]*replicaNode, len(resp.Partitions)) |
| } |
| |
| // TODO(wutao1): make sure PartitionIndex are continuous |
| for _, pconf := range resp.Partitions { |
| if pconf == nil || (pconf.Ballot >= 0 && (pconf.Primary == nil || pconf.Primary.GetRawAddress() == 0)) { |
| return fmt.Errorf("unable to resolve routing table [appid: %d]: [%v]", p.appID, pconf) |
| } |
| |
| var s *session.ReplicaSession |
| if pconf.Ballot >= 0 { |
| s = p.replica.GetReplica(pconf.Primary.GetAddress()) |
| } else { |
| // table is partition split, and child partition is not ready |
| // child requests should be redirected to its parent partition |
| // this will be happened when query meta is called during partition split |
| s = nil |
| } |
| |
| r := &replicaNode{ |
| pconf: pconf, |
| session: s, |
| } |
| p.parts[pconf.Pid.PartitionIndex] = r |
| } |
| return nil |
| } |
| |
| func validateHashKey(hashKey []byte) error { |
| if hashKey == nil { |
| return fmt.Errorf("InvalidParameter: hashkey must not be nil") |
| } |
| if len(hashKey) == 0 { |
| return fmt.Errorf("InvalidParameter: hashkey must not be empty") |
| } |
| if len(hashKey) > math.MaxUint16 { |
| return fmt.Errorf("InvalidParameter: length of hashkey (%d) must be less than %d", len(hashKey), math.MaxUint16) |
| } |
| return nil |
| } |
| |
| func validateCompositeKeys(keys []CompositeKey) error { |
| if keys == nil { |
| return fmt.Errorf("InvalidParameter: CompositeKeys must not be nil") |
| } |
| if len(keys) == 0 { |
| return fmt.Errorf("InvalidParameter: CompositeKeys must not be empty") |
| } |
| return nil |
| } |
| |
| // WrapError wraps up the internal errors for ensuring that all types of errors |
| // returned by public interfaces are pegasus.PError. |
| func WrapError(err error, op OpType) error { |
| if err != nil { |
| if pe, ok := err.(*PError); ok { |
| pe.Op = op |
| return pe |
| } |
| return &PError{ |
| Err: err, |
| Op: op, |
| } |
| } |
| return nil |
| } |
| |
| func (p *pegasusTableConnector) wrapPartitionError(err error, gpid *base.Gpid, replica *session.ReplicaSession, opType OpType) error { |
| err = WrapError(err, opType) |
| if err == nil { |
| return nil |
| } |
| perr := err.(*PError) |
| if perr.Err != nil { |
| perr.Err = fmt.Errorf("%s [%s, %s, table=%s]", perr.Err, gpid, replica, p.tableName) |
| } else { |
| perr.Err = fmt.Errorf("[%s, %s, table=%s]", gpid, replica, p.tableName) |
| } |
| return perr |
| } |
| |
| func (p *pegasusTableConnector) Get(ctx context.Context, hashKey []byte, sortKey []byte) ([]byte, error) { |
| res, err := p.runPartitionOp(ctx, hashKey, &op.Get{HashKey: hashKey, SortKey: sortKey}, OpGet) |
| if err != nil { |
| return nil, err |
| } |
| if res == nil { // indicates the record is not found |
| return nil, nil |
| } |
| return res.([]byte), err |
| } |
| |
| func (p *pegasusTableConnector) SetTTL(ctx context.Context, hashKey []byte, sortKey []byte, value []byte, ttl time.Duration) error { |
| req := &op.Set{HashKey: hashKey, SortKey: sortKey, Value: value, TTL: ttl} |
| _, err := p.runPartitionOp(ctx, hashKey, req, OpSet) |
| return err |
| } |
| |
| func (p *pegasusTableConnector) Set(ctx context.Context, hashKey []byte, sortKey []byte, value []byte) error { |
| return p.SetTTL(ctx, hashKey, sortKey, value, 0) |
| } |
| |
| func (p *pegasusTableConnector) Del(ctx context.Context, hashKey []byte, sortKey []byte) error { |
| req := &op.Del{HashKey: hashKey, SortKey: sortKey} |
| _, err := p.runPartitionOp(ctx, hashKey, req, OpDel) |
| return err |
| } |
| |
| func setRequestByOption(options *MultiGetOptions, request *rrdb.MultiGetRequest) { |
| request.MaxKvCount = int32(options.MaxFetchCount) |
| request.MaxKvSize = int32(options.MaxFetchSize) |
| request.StartInclusive = options.StartInclusive |
| request.StopInclusive = options.StopInclusive |
| request.SortKeyFilterType = rrdb.FilterType(options.SortKeyFilter.Type) |
| request.SortKeyFilterPattern = &base.Blob{Data: options.SortKeyFilter.Pattern} |
| request.Reverse = options.Reverse |
| request.NoValue = options.NoValue |
| } |
| |
| func (p *pegasusTableConnector) MultiGetOpt(ctx context.Context, hashKey []byte, sortKeys [][]byte, options *MultiGetOptions) ([]*KeyValue, bool, error) { |
| req := &op.MultiGet{HashKey: hashKey, SortKeys: sortKeys, Req: rrdb.NewMultiGetRequest()} |
| setRequestByOption(options, req.Req) |
| res, err := p.runPartitionOp(ctx, hashKey, req, OpMultiGet) |
| if err != nil { |
| return nil, false, err |
| } |
| return extractMultiGetResult(res.(*op.MultiGetResult)) |
| } |
| |
| func (p *pegasusTableConnector) MultiGet(ctx context.Context, hashKey []byte, sortKeys [][]byte) ([]*KeyValue, bool, error) { |
| return p.MultiGetOpt(ctx, hashKey, sortKeys, DefaultMultiGetOptions) |
| } |
| |
| func (p *pegasusTableConnector) MultiGetRangeOpt(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, options *MultiGetOptions) ([]*KeyValue, bool, error) { |
| req := &op.MultiGet{HashKey: hashKey, StartSortkey: startSortKey, StopSortkey: stopSortKey, Req: rrdb.NewMultiGetRequest()} |
| setRequestByOption(options, req.Req) |
| res, err := p.runPartitionOp(ctx, hashKey, req, OpMultiGetRange) |
| if err != nil { |
| return nil, false, err |
| } |
| return extractMultiGetResult(res.(*op.MultiGetResult)) |
| } |
| |
| func extractMultiGetResult(res *op.MultiGetResult) ([]*KeyValue, bool, error) { |
| if len(res.KVs) == 0 { |
| return nil, res.AllFetched, nil |
| } |
| kvs := make([]*KeyValue, len(res.KVs)) |
| for i, blobKv := range res.KVs { |
| kvs[i] = &KeyValue{ |
| SortKey: blobKv.Key.Data, |
| Value: blobKv.Value.Data, |
| } |
| } |
| return kvs, res.AllFetched, nil |
| } |
| |
| func (p *pegasusTableConnector) MultiGetRange(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte) ([]*KeyValue, bool, error) { |
| return p.MultiGetRangeOpt(ctx, hashKey, startSortKey, stopSortKey, DefaultMultiGetOptions) |
| } |
| |
| func (p *pegasusTableConnector) MultiSet(ctx context.Context, hashKey []byte, sortKeys [][]byte, values [][]byte) error { |
| return p.MultiSetOpt(ctx, hashKey, sortKeys, values, 0) |
| } |
| |
| func (p *pegasusTableConnector) MultiSetOpt(ctx context.Context, hashKey []byte, sortKeys [][]byte, values [][]byte, ttl time.Duration) error { |
| req := &op.MultiSet{HashKey: hashKey, SortKeys: sortKeys, Values: values, TTL: ttl} |
| _, err := p.runPartitionOp(ctx, hashKey, req, OpMultiSet) |
| return err |
| } |
| |
| func (p *pegasusTableConnector) MultiDel(ctx context.Context, hashKey []byte, sortKeys [][]byte) error { |
| _, err := p.runPartitionOp(ctx, hashKey, &op.MultiDel{HashKey: hashKey, SortKeys: sortKeys}, OpMultiDel) |
| return err |
| } |
| |
| // -2 means entry not found. |
| func (p *pegasusTableConnector) TTL(ctx context.Context, hashKey []byte, sortKey []byte) (int, error) { |
| res, err := p.runPartitionOp(ctx, hashKey, &op.TTL{HashKey: hashKey, SortKey: sortKey}, OpTTL) |
| return res.(int), err |
| } |
| |
| func (p *pegasusTableConnector) Exist(ctx context.Context, hashKey []byte, sortKey []byte) (bool, error) { |
| ttl, err := p.TTL(ctx, hashKey, sortKey) |
| if err == nil { |
| if ttl == -2 { |
| return false, nil |
| } |
| return true, nil |
| } |
| return false, WrapError(err, OpExist) |
| } |
| |
| func (p *pegasusTableConnector) GetScanner(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, |
| options *ScannerOptions) (Scanner, error) { |
| scanner, err := func() (Scanner, error) { |
| if err := validateHashKey(hashKey); err != nil { |
| return nil, err |
| } |
| |
| start := encodeHashKeySortKey(hashKey, startSortKey) |
| var stop *base.Blob |
| if len(stopSortKey) == 0 { |
| stop = encodeHashKeySortKey(hashKey, []byte{0xFF, 0xFF}) // []byte{0xFF, 0xFF} means the max sortKey value |
| options.StopInclusive = false |
| } else { |
| stop = encodeHashKeySortKey(hashKey, stopSortKey) |
| } |
| |
| if options.SortKeyFilter.Type == FilterTypeMatchPrefix { |
| prefixStartBlob := encodeHashKeySortKey(hashKey, options.SortKeyFilter.Pattern) |
| |
| // if the prefixStartKey generated by pattern is greater than the startKey, start from the prefixStartKey |
| if bytes.Compare(prefixStartBlob.Data, start.Data) > 0 { |
| start = prefixStartBlob |
| options.StartInclusive = true |
| } |
| |
| prefixStop := encodeNextBytesByKeys(hashKey, options.SortKeyFilter.Pattern) |
| |
| // if the prefixStopKey generated by pattern is less than the stopKey, end to the prefixStopKey |
| if bytes.Compare(prefixStop.Data, stop.Data) <= 0 { |
| stop = prefixStop |
| options.StopInclusive = false |
| } |
| } |
| |
| cmp := bytes.Compare(start.Data, stop.Data) |
| if cmp < 0 || (cmp == 0 && options.StartInclusive && options.StopInclusive) { |
| gpid, partitionHash, err := p.getGpid(start.Data) |
| if err != nil && (gpid != nil || partitionHash != 0) { |
| return nil, err |
| } |
| return newPegasusScanner(p, gpid, partitionHash, options, start, stop), nil |
| } |
| return nil, fmt.Errorf("the scanning interval MUST NOT BE EMPTY") |
| }() |
| return scanner, WrapError(err, OpGetScanner) |
| } |
| |
| func (p *pegasusTableConnector) GetUnorderedScanners(ctx context.Context, maxSplitCount int, |
| options *ScannerOptions) ([]Scanner, error) { |
| scanners, err := func() ([]Scanner, error) { |
| if maxSplitCount <= 0 { |
| return nil, fmt.Errorf("invalid maxSplitCount: %d", maxSplitCount) |
| } |
| allGpid := p.getAllGpid() |
| total := len(allGpid) |
| |
| var split int // the actual split count |
| if total < maxSplitCount { |
| split = total |
| } else { |
| split = maxSplitCount |
| } |
| scanners := make([]Scanner, split) |
| |
| // k: the smallest multiple of split which is greater than or equal to total |
| k := 1 |
| for ; k*split < total; k++ { |
| } |
| left := total - k*(split-1) |
| |
| sliceLen := 0 |
| id := 0 |
| for i := 0; i < split; i++ { |
| if i == 0 { |
| sliceLen = left |
| } else { |
| sliceLen = k |
| } |
| gpidSlice := make([]*base.Gpid, sliceLen) |
| hashSlice := make([]uint64, sliceLen) |
| for j := 0; j < sliceLen; j++ { |
| gpidSlice[j] = allGpid[id] |
| hashSlice[j] = uint64(id) |
| id++ |
| } |
| scanners[i] = newPegasusScannerForUnorderedScanners(p, gpidSlice, hashSlice, options) |
| } |
| return scanners, nil |
| }() |
| return scanners, WrapError(err, OpGetUnorderedScanners) |
| } |
| |
| func (p *pegasusTableConnector) CheckAndSet(ctx context.Context, hashKey []byte, checkSortKey []byte, checkType CheckType, |
| checkOperand []byte, setSortKey []byte, setValue []byte, options *CheckAndSetOptions) (*CheckAndSetResult, error) { |
| |
| if options == nil { |
| options = &CheckAndSetOptions{} |
| } |
| request := rrdb.NewCheckAndSetRequest() |
| request.CheckType = rrdb.CasCheckType(checkType) |
| request.CheckOperand = &base.Blob{Data: checkOperand} |
| request.CheckSortKey = &base.Blob{Data: checkSortKey} |
| request.HashKey = &base.Blob{Data: hashKey} |
| request.SetExpireTsSeconds = int32(options.SetValueTTLSeconds) |
| request.SetSortKey = &base.Blob{Data: setSortKey} |
| request.SetValue = &base.Blob{Data: setValue} |
| request.ReturnCheckValue = options.ReturnCheckValue |
| if !bytes.Equal(checkSortKey, setSortKey) { |
| request.SetDiffSortKey = true |
| } else { |
| request.SetDiffSortKey = false |
| } |
| |
| req := &op.CheckAndSet{Req: request} |
| res, err := p.runPartitionOp(ctx, hashKey, req, OpCheckAndSet) |
| if err != nil { |
| return nil, err |
| } |
| casRes := res.(*op.CheckAndSetResult) |
| return &CheckAndSetResult{ |
| SetSucceed: casRes.SetSucceed, |
| CheckValue: casRes.CheckValue, |
| CheckValueExist: casRes.CheckValueExist, |
| CheckValueReturned: casRes.CheckValueReturned, |
| }, nil |
| } |
| |
| func (p *pegasusTableConnector) SortKeyCount(ctx context.Context, hashKey []byte) (int64, error) { |
| res, err := p.runPartitionOp(ctx, hashKey, &op.SortKeyCount{HashKey: hashKey}, OpSortKeyCount) |
| if err != nil { |
| return 0, err |
| } |
| return res.(int64), nil |
| } |
| |
| func (p *pegasusTableConnector) Incr(ctx context.Context, hashKey []byte, sortKey []byte, increment int64) (int64, error) { |
| req := &op.Incr{HashKey: hashKey, SortKey: sortKey, Increment: increment} |
| res, err := p.runPartitionOp(ctx, hashKey, req, OpIncr) |
| if err != nil { |
| return 0, err |
| } |
| return res.(int64), nil |
| } |
| |
| func (p *pegasusTableConnector) runPartitionOp(ctx context.Context, hashKey []byte, req op.Request, optype OpType) (interface{}, error) { |
| // validate arguments |
| if err := req.Validate(); err != nil { |
| return 0, WrapError(err, optype) |
| } |
| partitionHash := crc64Hash(hashKey) |
| gpid, part := p.getPartition(partitionHash) |
| res, err := retryFailOver(ctx, func() (confUpdated bool, result interface{}, retry bool, err error) { |
| result, err = req.Run(ctx, gpid, partitionHash, part) |
| confUpdated, retry, err = p.handleReplicaError(err, part) |
| return |
| }) |
| return res, p.wrapPartitionError(err, gpid, part, optype) |
| } |
| |
| func (p *pegasusTableConnector) BatchGet(ctx context.Context, keys []CompositeKey) (values [][]byte, err error) { |
| v, err := func() ([][]byte, error) { |
| if err := validateCompositeKeys(keys); err != nil { |
| return nil, err |
| } |
| |
| values = make([][]byte, len(keys)) |
| funcs := make([]func() error, 0, len(keys)) |
| for i := 0; i < len(keys); i++ { |
| idx := i |
| funcs = append(funcs, func() (subErr error) { |
| key := keys[idx] |
| values[idx], subErr = p.Get(ctx, key.HashKey, key.SortKey) |
| if subErr != nil { |
| values[idx] = nil |
| return subErr |
| } |
| return nil |
| }) |
| } |
| return values, kerrors.AggregateGoroutines(funcs...) |
| }() |
| return v, WrapError(err, OpBatchGet) |
| } |
| |
| func (p *pegasusTableConnector) isPartitionValid(index int) bool { |
| if index < 0 || index >= len(p.parts) { |
| return false |
| } |
| return p.parts[index].pconf.Ballot > 0 |
| } |
| |
| func (p *pegasusTableConnector) getPartitionIndex(partitionHash uint64) int32 { |
| partitionCount := int32(len(p.parts)) |
| index := int32(partitionHash % uint64(partitionCount)) |
| if !p.isPartitionValid(int(index)) { |
| p.logger.Printf("table [%s] partition[%d] is not valid now, requests will send to partition[%d]", p.tableName, index, index-partitionCount/2) |
| index -= partitionCount / 2 |
| } |
| return index |
| } |
| |
| func (p *pegasusTableConnector) getPartition(partitionHash uint64) (*base.Gpid, *session.ReplicaSession) { |
| p.mu.RLock() |
| defer p.mu.RUnlock() |
| |
| gpid := &base.Gpid{ |
| Appid: p.appID, |
| PartitionIndex: p.getPartitionIndex(partitionHash), |
| } |
| part := p.parts[gpid.PartitionIndex].session |
| |
| return gpid, part |
| } |
| |
| func (p *pegasusTableConnector) getPartitionByGpid(gpid *base.Gpid) *session.ReplicaSession { |
| p.mu.RLock() |
| defer p.mu.RUnlock() |
| return p.parts[gpid.PartitionIndex].session |
| } |
| |
| func (p *pegasusTableConnector) Close() error { |
| p.tom.Kill(errors.New("table closed")) |
| return nil |
| } |
| |
| func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.ReplicaSession) (bool, bool, error) { |
| if err != nil { |
| confUpdate := false |
| retry := false |
| |
| switch err { |
| case base.ERR_OK: |
| // should not happen |
| return false, false, nil |
| |
| case base.ERR_TIMEOUT: |
| case context.DeadlineExceeded: |
| confUpdate = true |
| case context.Canceled: |
| // timeout will not trigger a configuration update |
| |
| case base.ERR_NOT_ENOUGH_MEMBER: |
| case base.ERR_CAPACITY_EXCEEDED: |
| |
| case base.ERR_BUSY: |
| // throttled by server, skip confUpdate |
| case base.ERR_SPLITTING: |
| // table is executing partition split, skip confUpdate |
| case base.ERR_DISK_INSUFFICIENT: |
| // server disk space is insufficient, skip confUpdate |
| |
| default: |
| confUpdate = true |
| retry = true |
| } |
| |
| switch err { |
| case base.ERR_BUSY: |
| err = errors.New(err.Error() + " Rate of requests exceeds the throughput limit") |
| case base.ERR_INVALID_STATE: |
| err = errors.New(err.Error() + " The target replica is not primary") |
| retry = false |
| case base.ERR_OBJECT_NOT_FOUND: |
| err = errors.New(err.Error() + " The replica server doesn't serve this partition") |
| case base.ERR_SPLITTING: |
| err = errors.New(err.Error() + " The table is executing partition split") |
| case base.ERR_PARENT_PARTITION_MISUSED: |
| err = errors.New(err.Error() + " The table finish partition split, will update config") |
| retry = false |
| case base.ERR_DISK_INSUFFICIENT: |
| err = errors.New(err.Error() + " The server disk space is insufficient") |
| } |
| |
| if confUpdate { |
| // we need to check if there's newer configuration. |
| p.tryConfUpdate(err, replica) |
| } |
| |
| return confUpdate, retry, err |
| } |
| return false, false, nil |
| } |
| |
| // tryConfUpdate makes an attempt to update table configuration by querying meta server. |
| func (p *pegasusTableConnector) tryConfUpdate(err error, replica session.NodeSession) { |
| select { |
| case p.confUpdateCh <- true: |
| p.logger.Printf("trigger configuration update of table [%s] due to RPC failure [%s] to %s", p.tableName, err, replica) |
| default: |
| } |
| } |
| |
| func (p *pegasusTableConnector) loopForAutoUpdate() error { |
| for { |
| select { |
| case <-p.confUpdateCh: |
| p.selfUpdate() |
| case <-p.tom.Dying(): |
| return nil |
| } |
| |
| // sleep a while |
| select { |
| case <-time.After(time.Second): |
| case <-p.tom.Dying(): |
| return nil |
| } |
| } |
| } |
| |
| func (p *pegasusTableConnector) selfUpdate() bool { |
| // ignore the returned error |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) |
| defer cancel() |
| |
| if err := p.updateConf(ctx); err != nil { |
| p.logger.Printf("self update failed [table: %s]: %s", p.tableName, err.Error()) |
| } |
| |
| // flush confUpdateCh |
| select { |
| case <-p.confUpdateCh: |
| default: |
| } |
| |
| return true |
| } |
| |
| func (p *pegasusTableConnector) getGpid(key []byte) (*base.Gpid, uint64, error) { |
| if key == nil || len(key) < 2 { |
| return nil, 0, fmt.Errorf("unable to getGpid by key: %s", key) |
| } |
| |
| hashKeyLen := 0xFFFF & binary.BigEndian.Uint16(key[:2]) |
| if hashKeyLen != 0xFFFF && int(2+hashKeyLen) <= len(key) { |
| gpid := &base.Gpid{Appid: p.appID} |
| var partitionHash uint64 |
| if hashKeyLen == 0 { |
| partitionHash = crc64Hash(key[2:]) |
| } else { |
| partitionHash = crc64Hash(key[2 : hashKeyLen+2]) |
| } |
| gpid.PartitionIndex = int32(partitionHash % uint64(len(p.parts))) |
| return gpid, partitionHash, nil |
| |
| } |
| return nil, 0, fmt.Errorf("unable to getGpid, hashKey length invalid") |
| } |
| |
| func (p *pegasusTableConnector) getAllGpid() []*base.Gpid { |
| p.mu.RLock() |
| defer p.mu.RUnlock() |
| count := len(p.parts) |
| ret := make([]*base.Gpid, count) |
| for i := 0; i < count; i++ { |
| ret[i] = &base.Gpid{Appid: p.appID, PartitionIndex: int32(i)} |
| } |
| return ret |
| } |