blob: 523b7abc49f1c314f31fcdab2b371681bdd7610f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"github.com/jmhodges/levigo"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
"strconv"
"strings"
"sync/atomic"
"syscall"
)
//
// The data store code for HTraced.
//
// This code stores the trace spans. We use levelDB here so that we don't have to store everything
// in memory at all times. The data is sharded across multiple levelDB databases in multiple
// directories. Normally, these multiple directories will be on multiple disk drives.
//
// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
// coming from many daemons. Durability is not as big a concern as in some data stores, since
// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package
// for serialization. We assume that there will be many more writes than reads.
//
// TODO: implement redundancy (storing data on more than 1 drive)
// TODO: implement re-loading old span data
//
// Schema
// m -> dataStoreMetadata
// s[8-byte-big-endian-sid] -> SpanData
// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
//
const DATA_STORE_VERSION = 1
var EMPTY_BYTE_BUF []byte = []byte{}
const SPAN_ID_INDEX_PREFIX = 's'
const BEGIN_TIME_INDEX_PREFIX = 'b'
const END_TIME_INDEX_PREFIX = 'e'
const DURATION_INDEX_PREFIX = 'd'
const PARENT_ID_INDEX_PREFIX = 'p'
const INVALID_INDEX_PREFIX = 0
type Statistics struct {
NumSpansWritten uint64
}
func (stats *Statistics) IncrementWrittenSpans() {
atomic.AddUint64(&stats.NumSpansWritten, 1)
}
// Make a copy of the statistics structure, using atomic operations.
func (stats *Statistics) Copy() *Statistics {
return &Statistics{
NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
}
}
// Translate a span id into a leveldb key.
func makeKey(tag byte, sid int64) []byte {
id := uint64(sid)
return []byte{
tag,
byte(0xff & (id >> 56)),
byte(0xff & (id >> 48)),
byte(0xff & (id >> 40)),
byte(0xff & (id >> 32)),
byte(0xff & (id >> 24)),
byte(0xff & (id >> 16)),
byte(0xff & (id >> 8)),
byte(0xff & (id >> 0)),
}
}
func keyToInt(key []byte) int64 {
var id uint64
id = (uint64(key[0]) << 56) |
(uint64(key[1]) << 48) |
(uint64(key[2]) << 40) |
(uint64(key[3]) << 32) |
(uint64(key[4]) << 24) |
(uint64(key[5]) << 16) |
(uint64(key[6]) << 8) |
(uint64(key[7]) << 0)
return int64(id)
}
func makeSecondaryKey(tag byte, first int64, second int64) []byte {
fir := uint64(first)
sec := uint64(second)
return []byte{
tag,
byte(0xff & (fir >> 56)),
byte(0xff & (fir >> 48)),
byte(0xff & (fir >> 40)),
byte(0xff & (fir >> 32)),
byte(0xff & (fir >> 24)),
byte(0xff & (fir >> 16)),
byte(0xff & (fir >> 8)),
byte(0xff & (fir >> 0)),
byte(0xff & (sec >> 56)),
byte(0xff & (sec >> 48)),
byte(0xff & (sec >> 40)),
byte(0xff & (sec >> 32)),
byte(0xff & (sec >> 24)),
byte(0xff & (sec >> 16)),
byte(0xff & (sec >> 8)),
byte(0xff & (sec >> 0)),
}
}
// A single directory containing a levelDB instance.
type shard struct {
// The data store that this shard is part of
store *dataStore
// The LevelDB instance.
ldb *levigo.DB
// The path to the leveldb directory this shard is managing.
path string
// Incoming requests to write Spans.
incoming chan *common.Span
// The channel we will send a bool to when we exit.
exited chan bool
}
// Metadata about the DataStore.
type dataStoreMetadata struct {
// The DataStore version.
Version int32
}
// Write the metadata key to a shard.
func (shd *shard) WriteMetadata(meta *dataStoreMetadata) error {
w := new(bytes.Buffer)
encoder := gob.NewEncoder(w)
err := encoder.Encode(meta)
if err != nil {
return err
}
return shd.ldb.Put(shd.store.writeOpts, []byte("m"), w.Bytes())
}
// Process incoming spans for a shard.
func (shd *shard) processIncoming() {
lg := shd.store.lg
for {
span := <-shd.incoming
if span == nil {
lg.Infof("Shard processor for %s exiting.\n", shd.path)
shd.exited <- true
return
}
err := shd.writeSpan(span)
if err != nil {
lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
} else {
lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
}
}
}
func (shd *shard) writeSpan(span *common.Span) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
// Add SpanData to batch.
spanDataBuf := new(bytes.Buffer)
spanDataEnc := gob.NewEncoder(spanDataBuf)
err := spanDataEnc.Encode(span.SpanData)
if err != nil {
return err
}
batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes())
// Add this to the parent index.
for parentIdx := range span.Parents {
batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
}
// Add to the other secondary indices.
batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin,
span.Id.Val()), EMPTY_BYTE_BUF)
batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End,
span.Id.Val()), EMPTY_BYTE_BUF)
batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(),
span.Id.Val()), EMPTY_BYTE_BUF)
err = shd.ldb.Write(shd.store.writeOpts, batch)
if err != nil {
return err
}
shd.store.stats.IncrementWrittenSpans()
if shd.store.WrittenSpans != nil {
shd.store.WrittenSpans <- span
}
return nil
}
func (shd *shard) FindChildren(sid int64, childIds []common.SpanId, lim int32) ([]common.SpanId, int32, error) {
searchKey := makeKey('p', sid)
iter := shd.ldb.NewIterator(shd.store.readOpts)
defer iter.Close()
iter.Seek(searchKey)
for {
if !iter.Valid() {
break
}
if lim == 0 {
break
}
key := iter.Key()
if !bytes.HasPrefix(key, searchKey) {
break
}
id := common.SpanId(keyToInt(key[9:]))
childIds = append(childIds, id)
lim--
iter.Next()
}
return childIds, lim, nil
}
// Close a shard.
func (shd *shard) Close() {
lg := shd.store.lg
shd.incoming <- nil
lg.Infof("Waiting for %s to exit...\n", shd.path)
if shd.exited != nil {
<-shd.exited
}
shd.ldb.Close()
lg.Infof("Closed %s...\n", shd.path)
}
// The Data Store.
type dataStore struct {
lg *common.Logger
// The shards which manage our LevelDB instances.
shards []*shard
// I/O statistics for all shards.
stats Statistics
// The read options to use for LevelDB.
readOpts *levigo.ReadOptions
// The write options to use for LevelDB.
writeOpts *levigo.WriteOptions
// If non-null, a channel we will send spans to once we finish writing them. This is only used
// for testing.
WrittenSpans chan *common.Span
}
func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
// Get the configuration.
clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
// If we return an error, close the store.
var err error
lg := common.NewLogger("datastore", cnf)
store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans}
defer func() {
if err != nil {
store.Close()
store = nil
}
}()
store.readOpts = levigo.NewReadOptions()
store.readOpts.SetFillCache(true)
store.writeOpts = levigo.NewWriteOptions()
store.writeOpts.SetSync(false)
// Open all shards
for idx := range dirs {
path := dirs[idx] + conf.PATH_SEP + "db"
err := os.MkdirAll(path, 0777)
if err != nil {
e, ok := err.(*os.PathError)
if !ok || e.Err != syscall.EEXIST {
return nil, err
}
if !clearStored {
// TODO: implement re-opening saved data
lg.Error("Error: path " + path + "already exists.")
return nil, err
} else {
err = os.RemoveAll(path)
if err != nil {
lg.Error("Failed to create " + path + ": " + err.Error())
return nil, err
}
lg.Info("Cleared " + path)
}
}
var shd *shard
shd, err = CreateShard(store, cnf, path)
if err != nil {
lg.Errorf("Error creating shard %s: %s", path, err.Error())
return nil, err
}
store.shards = append(store.shards, shd)
}
meta := &dataStoreMetadata{Version: DATA_STORE_VERSION}
for idx := range store.shards {
shd := store.shards[idx]
err := shd.WriteMetadata(meta)
if err != nil {
lg.Error("Failed to write metadata to " + store.shards[idx].path + ": " + err.Error())
return nil, err
}
shd.exited = make(chan bool, 1)
go shd.processIncoming()
}
return store, nil
}
func CreateShard(store *dataStore, cnf *conf.Config, path string) (*shard, error) {
var shd *shard
//filter := levigo.NewBloomFilter(10)
//defer filter.Close()
openOpts := levigo.NewOptions()
defer openOpts.Close()
openOpts.SetCreateIfMissing(true)
//openOpts.SetFilterPolicy(filter)
ldb, err := levigo.Open(path, openOpts)
if err != nil {
store.lg.Errorf("LevelDB failed to open %s: %s\n", path, err.Error())
return nil, err
}
defer func() {
if shd == nil {
ldb.Close()
}
}()
spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
shd = &shard{store: store, ldb: ldb, path: path,
incoming: make(chan *common.Span, spanBufferSize)}
store.lg.Infof("LevelDB opened %s\n", path)
return shd, nil
}
func (store *dataStore) GetStatistics() *Statistics {
return store.stats.Copy()
}
// Close the DataStore.
func (store *dataStore) Close() {
for idx := range store.shards {
store.shards[idx].Close()
store.shards[idx] = nil
}
if store.readOpts != nil {
store.readOpts.Close()
store.readOpts = nil
}
if store.writeOpts != nil {
store.writeOpts.Close()
store.writeOpts = nil
}
if store.lg != nil {
store.lg.Close()
store.lg = nil
}
}
// Get the index of the shard which stores the given spanId.
func (store *dataStore) getShardIndex(spanId int64) int {
return int(uint64(spanId) % uint64(len(store.shards)))
}
func (store *dataStore) WriteSpan(span *common.Span) {
store.shards[store.getShardIndex(span.Id.Val())].incoming <- span
}
func (store *dataStore) FindSpan(sid int64) *common.Span {
return store.shards[store.getShardIndex(sid)].FindSpan(sid)
}
func (shd *shard) FindSpan(sid int64) *common.Span {
lg := shd.store.lg
buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid))
if err != nil {
if strings.Index(err.Error(), "NotFound:") != -1 {
return nil
}
lg.Warnf("Shard(%s): FindSpan(%016x) error: %s\n",
shd.path, sid, err.Error())
return nil
}
var span *common.Span
span, err = shd.decodeSpan(sid, buf)
if err != nil {
lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
shd.path, sid, err.Error())
return nil
}
return span
}
func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) {
r := bytes.NewBuffer(buf)
decoder := gob.NewDecoder(r)
data := common.SpanData{}
err := decoder.Decode(&data)
if err != nil {
return nil, err
}
// Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with
// non-nil slices.
if data.Parents == nil {
data.Parents = []common.SpanId{}
}
return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
}
// Find the children of a given span id.
func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId {
childIds := make([]common.SpanId, 0)
var err error
startIdx := store.getShardIndex(sid)
idx := startIdx
numShards := len(store.shards)
for {
if lim == 0 {
break
}
shd := store.shards[idx]
childIds, lim, err = shd.FindChildren(sid, childIds, lim)
if err != nil {
store.lg.Errorf("Shard(%s): FindChildren(%016x) error: %s\n",
shd.path, sid, err.Error())
}
idx++
if idx >= numShards {
idx = 0
}
if idx == startIdx {
break
}
}
return childIds
}
type predicateData struct {
*common.Predicate
intKey int64
strKey string
}
func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
p := predicateData{Predicate: pred}
// Parse the input value given to make sure it matches up with the field
// type.
switch pred.Field {
case common.SPAN_ID:
// Span IDs are sent as hex strings.
var id common.SpanId
if err := id.FromString(pred.Val); err != nil {
return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
pred.Val, err.Error()))
}
p.intKey = id.Val()
break
case common.DESCRIPTION:
// Any string is valid for a description.
p.strKey = pred.Val
break
case common.BEGIN_TIME, common.END_TIME, common.DURATION:
// Base-10 numeric fields.
v, err := strconv.ParseInt(pred.Val, 10, 64)
if err != nil {
return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
pred.Field, pred.Val, err.Error()))
}
p.intKey = v
break
default:
return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
}
// Validate the predicate operation.
switch pred.Op {
case common.EQUALS, common.LESS_THAN_OR_EQUALS, common.GREATER_THAN_OR_EQUALS:
break
case common.CONTAINS:
if p.fieldIsNumeric() {
return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
"numeric field like '%s'", pred.Field))
}
default:
return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
pred.Op))
}
return &p, nil
}
// Get the index prefix for this predicate, or 0 if it is not indexed.
func (pred *predicateData) getIndexPrefix() byte {
switch pred.Field {
case common.SPAN_ID:
return SPAN_ID_INDEX_PREFIX
case common.BEGIN_TIME:
return BEGIN_TIME_INDEX_PREFIX
case common.END_TIME:
return END_TIME_INDEX_PREFIX
case common.DURATION:
return DURATION_INDEX_PREFIX
default:
return INVALID_INDEX_PREFIX
}
}
// Returns true if the predicate type is numeric.
func (pred *predicateData) fieldIsNumeric() bool {
switch pred.Field {
case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
return true
default:
return false
}
}
// Get the values that this predicate cares about for a given span.
func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, string) {
switch pred.Field {
case common.SPAN_ID:
return span.Id.Val(), ""
case common.DESCRIPTION:
return 0, span.Description
case common.BEGIN_TIME:
return span.Begin, ""
case common.END_TIME:
return span.End, ""
case common.DURATION:
return span.Duration(), ""
default:
panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field))
}
}
func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
// nil is after everything.
if a == nil {
if b == nil {
return false
}
return false
} else if b == nil {
return true
}
// Compare the spans according to this predicate.
aInt, aStr := pred.extractRelevantSpanData(a)
bInt, bStr := pred.extractRelevantSpanData(b)
if pred.fieldIsNumeric() {
if pred.Op.IsDescending() {
return aInt > bInt
} else {
return aInt < bInt
}
} else {
if pred.Op.IsDescending() {
return aStr > bStr
} else {
return aStr < bStr
}
}
}
// Returns true if the predicate is satisfied by the given span.
func (pred *predicateData) satisfiedBy(span *common.Span) bool {
intVal, strVal := pred.extractRelevantSpanData(span)
if pred.fieldIsNumeric() {
switch pred.Op {
case common.EQUALS:
return intVal == pred.intKey
case common.LESS_THAN_OR_EQUALS:
return intVal <= pred.intKey
case common.GREATER_THAN_OR_EQUALS:
return intVal >= pred.intKey
default:
panic(fmt.Sprintf("unknown Op type %s should have been caught "+
"during normalization", pred.Op))
}
} else {
switch pred.Op {
case common.CONTAINS:
return strings.Contains(strVal, pred.strKey)
case common.EQUALS:
return strVal == pred.strKey
case common.LESS_THAN_OR_EQUALS:
return strVal <= pred.strKey
case common.GREATER_THAN_OR_EQUALS:
return strVal >= pred.strKey
default:
panic(fmt.Sprintf("unknown Op type %s should have been caught "+
"during normalization", pred.Op))
}
}
}
func (pred *predicateData) createSource(store *dataStore) (*source, error) {
var ret *source
src := source{store: store,
pred: pred,
iters: make([]*levigo.Iterator, 0, len(store.shards)),
nexts: make([]*common.Span, len(store.shards)),
numRead: make([]int, len(store.shards)),
keyPrefix: pred.getIndexPrefix(),
}
if src.keyPrefix == INVALID_INDEX_PREFIX {
return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
"predicate on field %s", pred.Field))
}
defer func() {
if ret == nil {
src.Close()
}
}()
for shardIdx := range store.shards {
shd := store.shards[shardIdx]
src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
}
searchKey := makeKey(src.keyPrefix, pred.intKey)
for i := range src.iters {
src.iters[i].Seek(searchKey)
}
ret = &src
return ret, nil
}
// A source of spans.
type source struct {
store *dataStore
pred *predicateData
iters []*levigo.Iterator
nexts []*common.Span
numRead []int
keyPrefix byte
}
// Fill in the entry in the 'next' array for a specific shard.
func (src *source) populateNextFromShard(shardIdx int) {
lg := src.store.lg
var err error
iter := src.iters[shardIdx]
if iter == nil {
lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
return // There are no more entries in this shard.
}
if src.nexts[shardIdx] != nil {
lg.Debugf("No need to populate shard %d\n", shardIdx)
return // We already have a valid entry for this shard.
}
for {
if !iter.Valid() {
lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
break // Can't read past end of DB
}
src.numRead[shardIdx]++
key := iter.Key()
if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s",
shardIdx, string(src.keyPrefix))
break // Can't read past end of indexed section
}
var span *common.Span
var sid int64
if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
// The span id maps to the span itself.
sid = keyToInt(key[1:])
span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
if err != nil {
lg.Debugf("Internal error decoding span %016x in shard %d: %s\n",
sid, shardIdx, err.Error())
break
}
} else {
// With a secondary index, we have to look up the span by id.
sid = keyToInt(key[9:])
span = src.store.shards[shardIdx].FindSpan(sid)
if span == nil {
lg.Debugf("Internal error rehydrating span %016x in shard %d\n",
sid, shardIdx)
break
}
}
if src.pred.Op.IsDescending() {
iter.Prev()
} else {
iter.Next()
}
if src.pred.satisfiedBy(span) {
lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
src.nexts[shardIdx] = span // Found valid entry
return
} else {
lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n",
sid, shardIdx)
if src.numRead[shardIdx] <= 1 && src.pred.Op.IsDescending() {
// When dealing with descending predicates, the first span we read might not satisfy
// the predicate, even though subsequent ones will. This is because the iter.Seek()
// function "moves the iterator the position of the key given or, if the key doesn't
// exist, the next key that does exist in the database." So if we're on that "next
// key" it will not satisfy the predicate, but the keys previous to it might.
continue
}
// This and subsequent entries don't satisfy predicate
break
}
}
lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
iter.Close()
src.iters[shardIdx] = nil
}
func (src *source) next() *common.Span {
for shardIdx := range src.iters {
src.populateNextFromShard(shardIdx)
}
var best *common.Span
bestIdx := -1
for shardIdx := range src.iters {
span := src.nexts[shardIdx]
if src.pred.spanPtrIsBefore(span, best) {
best = span
bestIdx = shardIdx
}
}
if bestIdx >= 0 {
src.nexts[bestIdx] = nil
}
return best
}
func (src *source) Close() {
for i := range src.iters {
if src.iters[i] != nil {
src.iters[i].Close()
}
}
src.iters = nil
}
func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
// Read spans from the first predicate that is indexed.
p := *preds
for i := range p {
pred := p[i]
if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
*preds = append(p[0:i], p[i+1:]...)
return pred.createSource(store)
}
}
// If there are no predicates that are indexed, read rows in order of span id.
spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
Field: common.SPAN_ID,
Val: "0000000000000000",
}
spanIdPredData, err := loadPredicateData(&spanIdPred)
if err != nil {
return nil, err
}
return spanIdPredData.createSource(store)
}
func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
lg := store.lg
// Parse predicate data.
var err error
preds := make([]*predicateData, len(query.Predicates))
for i := range query.Predicates {
preds[i], err = loadPredicateData(&query.Predicates[i])
if err != nil {
return nil, err
}
}
// Get a source of rows.
var src *source
src, err = store.obtainSource(&preds)
if err != nil {
return nil, err
}
defer src.Close()
lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
// Filter the spans through the remaining predicates.
ret := make([]*common.Span, 0, 32)
for {
if len(ret) >= query.Lim {
break // we hit the result size limit
}
span := src.next()
if span == nil {
break // the source has no more spans to give
}
lg.Debugf("src.next returned span %s\n", span.ToJson())
satisfied := true
for predIdx := range preds {
if !preds[predIdx].satisfiedBy(span) {
satisfied = false
break
}
}
if satisfied {
ret = append(ret, span)
}
}
return ret, nil
}