blob: f2c1918aea6079c0ecc22347651aa50f85ff68ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pegasus
import (
"context"
"encoding/binary"
"fmt"
"sync/atomic"
"time"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/rrdb"
"github.com/apache/incubator-pegasus/go-client/pegalog"
)
// ScannerOptions is the options for GetScanner and GetUnorderedScanners.
type ScannerOptions struct {
BatchSize int // internal buffer batch size
StartInclusive bool // if the startSortKey is included
StopInclusive bool // if the stopSortKey is included
HashKeyFilter Filter
SortKeyFilter Filter
NoValue bool // only fetch hash_key and sort_key, but not fetch value
}
const (
batchScanning = 0
batchScanFinished = -1 // Scanner's batch is finished, clean up it and switch to the status batchEmpty
batchEmpty = -2 // scan context has been removed
batchRpcError = -3 // rpc error, include ERR_SESSION_RESET,ERR_OBJECT_NOT_FOUND,ERR_INVALID_STATE, ERR_TIMEOUT
batchUnknownError = -4 // rpc succeed, but operation encounter some unknown error in server side
)
// Scanner defines the interface of client-side scanning.
type Scanner interface {
// Grabs the next entry.
Next(ctx context.Context) (completed bool, hashKey []byte, sortKey []byte, value []byte, err error)
Close() error
}
type pegasusScanner struct {
table *pegasusTableConnector
startKey *base.Blob
stopKey *base.Blob
options *ScannerOptions
gpidSlice []*base.Gpid
hashSlice []uint64
gpidIndex int // index of gpidSlice[] and hashSlice[]
curGpid *base.Gpid // current gpid
curHash uint64 // current partitionHash
checkHash bool
batchEntries []*KeyValue
batchIndex int
batchStatus int64
isNextRunning atomic.Value
closed bool
logger pegalog.Logger
}
// NewScanOptions returns the default ScannerOptions.
func NewScanOptions() *ScannerOptions {
return &ScannerOptions{
BatchSize: 1000,
StartInclusive: true,
StopInclusive: false,
HashKeyFilter: Filter{Type: FilterTypeNoFilter, Pattern: nil},
SortKeyFilter: Filter{Type: FilterTypeNoFilter, Pattern: nil},
NoValue: false,
}
}
func newPegasusScannerImpl(table *pegasusTableConnector, gpidSlice []*base.Gpid, hashSlice []uint64, options *ScannerOptions,
startKey *base.Blob, stopKey *base.Blob, checkHash bool) Scanner {
scanner := &pegasusScanner{
table: table,
gpidSlice: gpidSlice,
hashSlice: hashSlice,
checkHash: checkHash,
options: options,
startKey: startKey,
stopKey: stopKey,
batchIndex: -1,
batchStatus: batchScanFinished,
gpidIndex: len(gpidSlice),
batchEntries: make([]*KeyValue, 0),
closed: false,
logger: pegalog.GetLogger(),
}
scanner.isNextRunning.Store(0)
return scanner
}
func newPegasusScanner(table *pegasusTableConnector, gpid *base.Gpid, partitionHash uint64, options *ScannerOptions,
startKey *base.Blob, stopKey *base.Blob) Scanner {
gpidSlice := []*base.Gpid{gpid}
hashSlice := []uint64{partitionHash}
return newPegasusScannerImpl(table, gpidSlice, hashSlice, options, startKey, stopKey, false)
}
func newPegasusScannerForUnorderedScanners(table *pegasusTableConnector, gpidSlice []*base.Gpid, hashSlice []uint64,
options *ScannerOptions) Scanner {
options.StartInclusive = true
options.StopInclusive = false
return newPegasusScannerImpl(table, gpidSlice, hashSlice, options, &base.Blob{Data: []byte{0x00, 0x00}},
&base.Blob{Data: []byte{0xFF, 0xFF}}, true)
}
func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
if p.batchStatus == batchUnknownError {
err = fmt.Errorf("last Next() encounter unknow error, please retry after resloving it manually")
return
}
if p.batchStatus == batchRpcError {
err = fmt.Errorf("last Next() encounter rpc error, it may recover after next loop")
p.batchStatus = batchScanning
return
}
if p.closed {
err = fmt.Errorf("scanner is closed")
return
}
completed, hashKey, sortKey, value, err = func() (completed bool, hashKey []byte, sortKey []byte, value []byte, err error) {
// TODO(tangyanzhao): This method is not thread safe, should use r/w lock
// Prevent two concurrent calls on Next of the same Scanner.
if p.isNextRunning.Load() != 0 {
err = fmt.Errorf("there can be no concurrent calls on Next of the same Scanner")
return
}
p.isNextRunning.Store(1)
defer p.isNextRunning.Store(0)
return p.doNext(ctx)
}()
err = WrapError(err, OpNext)
return
}
func (p *pegasusScanner) doNext(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
// until we have the valid batch
for p.batchIndex++; p.batchIndex >= len(p.batchEntries); p.batchIndex++ {
if p.batchStatus == batchScanFinished {
if p.gpidIndex <= 0 {
completed = true
p.logger.Print(" Scanning on all partitions has been completed")
return
}
p.gpidIndex--
p.curGpid = p.gpidSlice[p.gpidIndex]
p.curHash = p.hashSlice[p.gpidIndex]
p.batchClear()
} else if p.batchStatus == batchEmpty {
return p.startScanPartition(ctx)
} else {
// request nextBatch
return p.nextBatch(ctx)
}
}
// batch.SortKey=<hashKey,sortKey>
hashKey, sortKey, err = restoreSortKeyHashKey(p.batchEntries[p.batchIndex].SortKey)
value = p.batchEntries[p.batchIndex].Value
return
}
func (p *pegasusScanner) batchClear() {
p.batchEntries = make([]*KeyValue, 0)
p.batchIndex = -1
p.batchStatus = batchEmpty
}
func (p *pegasusScanner) startScanPartition(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
request := rrdb.NewGetScannerRequest()
if len(p.batchEntries) == 0 {
request.StartKey = p.startKey
request.StartInclusive = p.options.StartInclusive
} else {
request.StartKey = &base.Blob{Data: p.batchEntries[len(p.batchEntries)-1].SortKey}
request.StartInclusive = false
}
request.StopKey = p.stopKey
request.StopInclusive = p.options.StopInclusive
request.BatchSize = int32(p.options.BatchSize)
request.NoValue = p.options.NoValue
request.HashKeyFilterType = rrdb.FilterType(p.options.HashKeyFilter.Type)
request.HashKeyFilterPattern = &base.Blob{}
if p.options.HashKeyFilter.Pattern != nil {
request.HashKeyFilterPattern.Data = p.options.HashKeyFilter.Pattern
}
request.SortKeyFilterType = rrdb.FilterType(p.options.SortKeyFilter.Type)
request.SortKeyFilterPattern = &base.Blob{}
if p.options.SortKeyFilter.Pattern != nil {
request.SortKeyFilterPattern.Data = p.options.SortKeyFilter.Pattern
}
request.ValidatePartitionHash = &p.checkHash
part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.GetScanner(ctx, p.curGpid, p.curHash, request)
if err != nil {
p.batchStatus = batchRpcError
if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
}
return
}
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
}
return
}
func (p *pegasusScanner) nextBatch(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
request := &rrdb.ScanRequest{ContextID: p.batchStatus}
part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.Scan(ctx, p.curGpid, p.curHash, request)
if err != nil {
p.batchStatus = batchRpcError
if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
}
return
}
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
}
return
}
func (p *pegasusScanner) onRecvScanResponse(response *rrdb.ScanResponse, err error) error {
if err == nil {
if response.Error == 0 {
// ERR_OK
p.batchEntries = make([]*KeyValue, len(response.Kvs))
for i := 0; i < len(response.Kvs); i++ {
p.batchEntries[i] = &KeyValue{
SortKey: response.Kvs[i].Key.Data, // batch.SortKey=<hashKey,sortKey>
Value: response.Kvs[i].Value.Data,
}
}
p.batchIndex = -1
p.batchStatus = response.ContextID
} else if response.Error == 1 {
// scan context has been removed
p.batchStatus = batchEmpty
} else {
// rpc succeed, but operation encounter some error in server side
p.batchStatus = batchUnknownError
return base.NewRocksDBErrFromInt(response.Error)
}
return nil
}
return fmt.Errorf("scan failed with error:" + err.Error())
}
func (p *pegasusScanner) Close() error {
var err error
// try to close in 100ms,
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
// if batchScanFinished or batchEmpty, server side will clear scanner automatically
// if not, clear scanner manually
if p.batchStatus >= batchScanning {
part := p.table.getPartitionByGpid(p.curGpid)
err = part.ClearScanner(ctx, p.curGpid, p.curHash, p.batchStatus)
if err == nil {
p.batchStatus = batchScanFinished
}
}
p.gpidIndex = 0
p.closed = true
return WrapError(err, OpScannerClose)
}
func restoreSortKeyHashKey(key []byte) (hashKey []byte, sortKey []byte, err error) {
if key == nil || len(key) < 2 {
return nil, nil, fmt.Errorf("unable to restore key: %s", key)
}
hashKeyLen := 0xFFFF & binary.BigEndian.Uint16(key[:2])
if hashKeyLen != 0xFFFF && int(2+hashKeyLen) <= len(key) {
hashKey = key[2 : 2+hashKeyLen]
sortKey = key[2+hashKeyLen:]
return hashKey, sortKey, nil
}
return nil, nil, fmt.Errorf("unable to restore key, hashKey length invalid")
}