HTRACE-160. htraced: support continuing a query from where the client left it off by sending a previous span (cmccabe)
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
index a32909e..8c9128f 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/query.go
@@ -103,9 +103,18 @@
 	Val   string `val:"val"`
 }
 
+func (pred *Predicate) String() string {
+	buf, err := json.Marshal(pred)
+	if err != nil {
+		panic(err)
+	}
+	return string(buf)
+}
+
 type Query struct {
 	Predicates []Predicate `json:"pred"`
 	Lim        int         `json:"lim"`
+	Prev       *Span       `json:"prev"`
 }
 
 func (query *Query) String() string {
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
index c273ad9..b276844 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/common/span.go
@@ -130,6 +130,10 @@
 	return jbytes
 }
 
+func (span *Span) String() string {
+	return string(span.ToJson())
+}
+
 // Compute the span duration.  We ignore overflow since we never deal with negative times.
 func (span *Span) Duration() int64 {
 	return span.End - span.Begin
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
index a26779c..97af3fb 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -90,19 +90,18 @@
 	}
 }
 
-// Translate a span id into a leveldb key.
-func makeKey(tag byte, sid uint64) []byte {
-	id := uint64(sid)
+// Translate an 8-byte value into a leveldb key.
+func makeKey(tag byte, val uint64) []byte {
 	return []byte{
 		tag,
-		byte(0xff & (id >> 56)),
-		byte(0xff & (id >> 48)),
-		byte(0xff & (id >> 40)),
-		byte(0xff & (id >> 32)),
-		byte(0xff & (id >> 24)),
-		byte(0xff & (id >> 16)),
-		byte(0xff & (id >> 8)),
-		byte(0xff & (id >> 0)),
+		byte(0xff & (val >> 56)),
+		byte(0xff & (val >> 48)),
+		byte(0xff & (val >> 40)),
+		byte(0xff & (val >> 32)),
+		byte(0xff & (val >> 24)),
+		byte(0xff & (val >> 16)),
+		byte(0xff & (val >> 8)),
+		byte(0xff & (val >> 0)),
 	}
 }
 
@@ -704,7 +703,7 @@
 	}
 }
 
-func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) {
 	var ret *source
 	src := source{store: store,
 		pred:      pred,
@@ -726,7 +725,76 @@
 		shd := store.shards[shardIdx]
 		src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
 	}
-	searchKey := makeKey(src.keyPrefix, pred.uintKey)
+	var searchKey []byte
+	lg := store.lg
+	if prev != nil {
+		// If prev != nil, this query RPC is the continuation of a previous
+		// one.  The final result returned the last time is 'prev'.
+		//
+		// To avoid returning the same results multiple times, we adjust the
+		// predicate here.  If the predicate is on the span id field, we
+		// simply manipulate the span ID we're looking for.
+		//
+		// If the predicate is on a secondary index, we also use span ID, but
+		// in a slightly different way.  Since the secondary indices are
+		// organized as [type-code][8b-secondary-key][8b-span-id], elements
+		// with the same secondary index field are ordered by span ID.  So we
+		// create a 17-byte key incorporating the span ID from 'prev.'
+		var startId common.SpanId
+		switch (pred.Op) {
+		case common.EQUALS:
+			if pred.Field == common.SPAN_ID {
+				// This is an annoying corner case.  There can only be one
+				// result each time we do an EQUALS search for a span id.
+				// Span id is the primary key for all our spans.
+				// But for some reason someone is asking for another result.
+				// We modify the query to search for the illegal 0 span ID,
+				// which will never be present.
+				lg.Debugf("Attempted to use a continuation token with an EQUALS " +
+					"SPAN_ID query. %s.  Setting search id = 0",
+					pred.Predicate.String())
+				startId = 0
+			} else {
+				// When doing an EQUALS search on a secondary index, the
+				// results are sorted by span id.
+				startId = prev.Id + 1
+			}
+		case common.LESS_THAN_OR_EQUALS:
+			// Subtract one from the previous span id.  Since the previous
+			// start ID will never be 0 (0 is an illegal span id), we'll never
+			// wrap around when doing this.
+			startId = prev.Id - 1
+		case common.GREATER_THAN_OR_EQUALS:
+			// We can't add one to the span id, since the previous span ID
+			// might be the maximum value.  So just switch over to using
+			// GREATER_THAN.
+			pred.Op = common.GREATER_THAN
+			startId = prev.Id
+		case common.GREATER_THAN:
+			// This one is easy.
+			startId = prev.Id
+		default:
+			str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String())
+			lg.Error(str + "\n")
+			panic(str)
+		}
+		if pred.Field == common.SPAN_ID {
+			pred.uintKey = uint64(startId)
+			searchKey = makeKey(src.keyPrefix, uint64(startId))
+		} else {
+			// Start where the previous query left off.  This means adjusting
+			// our uintKey.
+			pred.uintKey, _ = pred.extractRelevantSpanData(prev)
+			searchKey = makeSecondaryKey(src.keyPrefix, pred.uintKey, uint64(startId))
+		}
+		if lg.TraceEnabled() {
+			lg.Tracef("Handling continuation token %s for %s.  startId=%d, " +
+				"pred.uintKey=%d\n", prev, pred.Predicate.String(), startId,
+				pred.uintKey)
+		}
+	} else {
+		searchKey = makeKey(src.keyPrefix, pred.uintKey)
+	}
 	for i := range src.iters {
 		src.iters[i].Seek(searchKey)
 	}
@@ -815,7 +883,7 @@
 			iter.Next()
 		}
 		if src.pred.satisfiedBy(span) {
-			lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
+			lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx)
 			src.nexts[shardIdx] = span // Found valid entry
 			return
 		} else {
@@ -861,14 +929,14 @@
 	src.iters = nil
 }
 
-func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
+func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) {
 	// Read spans from the first predicate that is indexed.
 	p := *preds
 	for i := range p {
 		pred := p[i]
 		if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
 			*preds = append(p[0:i], p[i+1:]...)
-			return pred.createSource(store)
+			return pred.createSource(store, span)
 		}
 	}
 	// If there are no predicates that are indexed, read rows in order of span id.
@@ -880,7 +948,7 @@
 	if err != nil {
 		return nil, err
 	}
-	return spanIdPredData.createSource(store)
+	return spanIdPredData.createSource(store, span)
 }
 
 func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
@@ -896,7 +964,7 @@
 	}
 	// Get a source of rows.
 	var src *source
-	src, err = store.obtainSource(&preds)
+	src, err = store.obtainSource(&preds, query.Prev)
 	if err != nil {
 		return nil, err
 	}
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 79a7c4f..8a14f30 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -443,3 +443,72 @@
 			"incorrect version.  But it failed with error %s\n", err.Error())
 	}
 }
+
+func TestQueriesWithContinuationTokens1(t *testing.T) {
+	t.Parallel()
+	htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
+		WrittenSpans: make(chan *common.Span, 100)}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		panic(err)
+	}
+	defer ht.Close()
+	createSpans(SIMPLE_TEST_SPANS, ht.Store)
+	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+		t.Fatal()
+	}
+	// Adding a prev value to this query excludes the first result that we
+	// would normally get.
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.GREATER_THAN,
+				Field: common.BEGIN_TIME,
+				Val:   "120",
+			},
+		},
+		Lim: 5,
+		Prev: &SIMPLE_TEST_SPANS[0],
+	}, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+	// There is only one result from an EQUALS query on SPAN_ID.
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.EQUALS,
+				Field: common.SPAN_ID,
+				Val:   "1",
+			},
+		},
+		Lim: 100,
+		Prev: &SIMPLE_TEST_SPANS[0],
+	}, []common.Span{})
+
+	// When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
+	// span we pass as a continuation token. (Primary index edition).
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.LESS_THAN_OR_EQUALS,
+				Field: common.SPAN_ID,
+				Val:   "2",
+			},
+		},
+		Lim: 100,
+		Prev: &SIMPLE_TEST_SPANS[1],
+	}, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+	// When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the
+	// span we pass as a continuation token. (Secondary index edition).
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.GREATER_THAN,
+				Field: common.DURATION,
+				Val:   "0",
+			},
+		},
+		Lim: 100,
+		Prev: &SIMPLE_TEST_SPANS[1],
+	}, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
+}