HTRACE-160. htraced: support continuing a query from where the client left it off by sending a previous span (cmccabe)
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
index a32909e..8c9128f 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
@@ -103,9 +103,18 @@
Val string `val:"val"`
}
+func (pred *Predicate) String() string {
+ buf, err := json.Marshal(pred)
+ if err != nil {
+ panic(err)
+ }
+ return string(buf)
+}
+
type Query struct {
Predicates []Predicate `json:"pred"`
Lim int `json:"lim"`
+ Prev *Span `json:"prev"`
}
func (query *Query) String() string {
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
index c273ad9..b276844 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
@@ -130,6 +130,10 @@
return jbytes
}
+func (span *Span) String() string {
+ return string(span.ToJson())
+}
+
// Compute the span duration. We ignore overflow since we never deal with negative times.
func (span *Span) Duration() int64 {
return span.End - span.Begin
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
index a26779c..97af3fb 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -90,19 +90,18 @@
}
}
-// Translate a span id into a leveldb key.
-func makeKey(tag byte, sid uint64) []byte {
- id := uint64(sid)
+// Translate an 8-byte value into a leveldb key.
+func makeKey(tag byte, val uint64) []byte {
return []byte{
tag,
- byte(0xff & (id >> 56)),
- byte(0xff & (id >> 48)),
- byte(0xff & (id >> 40)),
- byte(0xff & (id >> 32)),
- byte(0xff & (id >> 24)),
- byte(0xff & (id >> 16)),
- byte(0xff & (id >> 8)),
- byte(0xff & (id >> 0)),
+ byte(0xff & (val >> 56)),
+ byte(0xff & (val >> 48)),
+ byte(0xff & (val >> 40)),
+ byte(0xff & (val >> 32)),
+ byte(0xff & (val >> 24)),
+ byte(0xff & (val >> 16)),
+ byte(0xff & (val >> 8)),
+ byte(0xff & (val >> 0)),
}
}
@@ -704,7 +703,7 @@
}
}
-func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) {
var ret *source
src := source{store: store,
pred: pred,
@@ -726,7 +725,76 @@
shd := store.shards[shardIdx]
src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
}
- searchKey := makeKey(src.keyPrefix, pred.uintKey)
+ var searchKey []byte
+ lg := store.lg
+ if prev != nil {
+ // If prev != nil, this query RPC is the continuation of a previous
+ // one. The final result returned the last time is 'prev'.
+ //
+ // To avoid returning the same results multiple times, we adjust the
+ // predicate here. If the predicate is on the span id field, we
+ // simply manipulate the span ID we're looking for.
+ //
+ // If the predicate is on a secondary index, we also use span ID, but
+ // in a slightly different way. Since the secondary indices are
+ // organized as [type-code][8b-secondary-key][8b-span-id], elements
+ // with the same secondary index field are ordered by span ID. So we
+ // create a 17-byte key incorporating the span ID from 'prev.'
+ var startId common.SpanId
+ switch (pred.Op) {
+ case common.EQUALS:
+ if pred.Field == common.SPAN_ID {
+ // This is an annoying corner case. There can only be one
+ // result each time we do an EQUALS search for a span id.
+ // Span id is the primary key for all our spans.
+ // But for some reason someone is asking for another result.
+ // We modify the query to search for the illegal 0 span ID,
+ // which will never be present.
+ lg.Debugf("Attempted to use a continuation token with an EQUALS " +
+ "SPAN_ID query. %s. Setting search id = 0",
+ pred.Predicate.String())
+ startId = 0
+ } else {
+ // When doing an EQUALS search on a secondary index, the
+ // results are sorted by span id.
+ startId = prev.Id + 1
+ }
+ case common.LESS_THAN_OR_EQUALS:
+ // Subtract one from the previous span id. Since the previous
+ // start ID will never be 0 (0 is an illegal span id), we'll never
+ // wrap around when doing this.
+ startId = prev.Id - 1
+ case common.GREATER_THAN_OR_EQUALS:
+ // We can't add one to the span id, since the previous span ID
+ // might be the maximum value. So just switch over to using
+ // GREATER_THAN.
+ pred.Op = common.GREATER_THAN
+ startId = prev.Id
+ case common.GREATER_THAN:
+ // This one is easy.
+ startId = prev.Id
+ default:
+ str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String())
+ lg.Error(str + "\n")
+ panic(str)
+ }
+ if pred.Field == common.SPAN_ID {
+ pred.uintKey = uint64(startId)
+ searchKey = makeKey(src.keyPrefix, uint64(startId))
+ } else {
+ // Start where the previous query left off. This means adjusting
+ // our uintKey.
+ pred.uintKey, _ = pred.extractRelevantSpanData(prev)
+ searchKey = makeSecondaryKey(src.keyPrefix, pred.uintKey, uint64(startId))
+ }
+ if lg.TraceEnabled() {
+ lg.Tracef("Handling continuation token %s for %s. startId=%d, " +
+ "pred.uintKey=%d\n", prev, pred.Predicate.String(), startId,
+ pred.uintKey)
+ }
+ } else {
+ searchKey = makeKey(src.keyPrefix, pred.uintKey)
+ }
for i := range src.iters {
src.iters[i].Seek(searchKey)
}
@@ -815,7 +883,7 @@
iter.Next()
}
if src.pred.satisfiedBy(span) {
- lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
+ lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx)
src.nexts[shardIdx] = span // Found valid entry
return
} else {
@@ -861,14 +929,14 @@
src.iters = nil
}
-func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
+func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) {
// Read spans from the first predicate that is indexed.
p := *preds
for i := range p {
pred := p[i]
if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
*preds = append(p[0:i], p[i+1:]...)
- return pred.createSource(store)
+ return pred.createSource(store, span)
}
}
// If there are no predicates that are indexed, read rows in order of span id.
@@ -880,7 +948,7 @@
if err != nil {
return nil, err
}
- return spanIdPredData.createSource(store)
+ return spanIdPredData.createSource(store, span)
}
func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
@@ -896,7 +964,7 @@
}
// Get a source of rows.
var src *source
- src, err = store.obtainSource(&preds)
+ src, err = store.obtainSource(&preds, query.Prev)
if err != nil {
return nil, err
}
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 79a7c4f..8a14f30 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -443,3 +443,72 @@
"incorrect version. But it failed with error %s\n", err.Error())
}
}
+
+func TestQueriesWithContinuationTokens1(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ // Adding a prev value to this query excludes the first result that we
+ // would normally get.
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN,
+ Field: common.BEGIN_TIME,
+ Val: "120",
+ },
+ },
+ Lim: 5,
+ Prev: &SIMPLE_TEST_SPANS[0],
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+ // There is only one result from an EQUALS query on SPAN_ID.
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.SPAN_ID,
+ Val: "1",
+ },
+ },
+ Lim: 100,
+ Prev: &SIMPLE_TEST_SPANS[0],
+ }, []common.Span{})
+
+ // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
+ // span we pass as a continuation token. (Primary index edition).
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "2",
+ },
+ },
+ Lim: 100,
+ Prev: &SIMPLE_TEST_SPANS[1],
+ }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+ // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the
+ // span we pass as a continuation token. (Secondary index edition).
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN,
+ Field: common.DURATION,
+ Val: "0",
+ },
+ },
+ Lim: 100,
+ Prev: &SIMPLE_TEST_SPANS[1],
+ }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
+}