HTRACE-328. htraced continues scanning in some cases even when no more results are possible (Colin Patrick McCabe via iwasakims)
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 596b652..82fb7b5 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -44,11 +44,11 @@
//
// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
// coming from many daemons. Durability is not as big a concern as in some data stores, since
-// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package
+// losing a little bit of trace data if htraced goes down is not critical. We use msgpack
// for serialization. We assume that there will be many more writes than reads.
//
// Schema
-// m -> dataStoreVersion
+// w -> ShardInfo
// s[8-byte-big-endian-sid] -> SpanData
// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
@@ -890,6 +890,19 @@
SATISFIED = iota
)
+func (r satisfiedByReturn) String() string {
+ switch (r) {
+ case NOT_SATISFIED:
+ return "NOT_SATISFIED"
+ case NOT_YET_SATISFIED:
+ return "NOT_YET_SATISFIED"
+ case SATISFIED:
+ return "SATISFIED"
+ default:
+ return "(unknown)"
+ }
+}
+
// Determine whether the predicate is satisfied by the given span.
func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn {
val := pred.extractRelevantSpanData(span)
@@ -910,7 +923,7 @@
if bytes.Compare(val, pred.key) <= 0 {
return SATISFIED
} else {
- return NOT_SATISFIED
+ return NOT_YET_SATISFIED
}
case common.GREATER_THAN_OR_EQUALS:
if bytes.Compare(val, pred.key) >= 0 {
@@ -920,9 +933,7 @@
}
case common.GREATER_THAN:
cmp := bytes.Compare(val, pred.key)
- if cmp < 0 {
- return NOT_SATISFIED
- } else if cmp == 0 {
+ if cmp <= 0 {
return NOT_YET_SATISFIED
} else {
return SATISFIED
@@ -1143,17 +1154,16 @@
iter.Next()
}
ret = src.pred.satisfiedBy(span)
- switch ret {
- case NOT_SATISFIED:
- break // This and subsequent entries don't satisfy predicate
- case SATISFIED:
+ if ret == SATISFIED {
if lg.DebugEnabled() {
lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath)
}
src.nexts[shardIdx] = span // Found valid entry
return
- case NOT_YET_SATISFIED:
- continue // try again
+ }
+ if ret == NOT_SATISFIED {
+ // This and subsequent entries don't satisfy predicate
+ break
}
}
lg.Debugf("Closing iterator for shard %s.\n", shdPath)
@@ -1209,6 +1219,18 @@
src.iters = nil
}
+func (src *source) getStats() string {
+ ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String())
+ prefix := ". "
+ for shardIdx := range src.shards {
+ next := fmt.Sprintf("%sRead %d spans from %s", prefix,
+ src.numRead[shardIdx], src.shards[shardIdx].path)
+ prefix = ", "
+ ret = ret + next
+ }
+ return ret
+}
+
func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) {
// Read spans from the first predicate that is indexed.
p := *preds
@@ -1231,7 +1253,7 @@
return spanIdPredData.createSource(store, span)
}
-func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) {
lg := store.lg
// Parse predicate data.
var err error
@@ -1239,14 +1261,14 @@
for i := range query.Predicates {
preds[i], err = loadPredicateData(&query.Predicates[i])
if err != nil {
- return nil, err
+ return nil, err, nil
}
}
// Get a source of rows.
var src *source
src, err = store.obtainSource(&preds, query.Prev)
if err != nil {
- return nil, err
+ return nil, err, nil
}
defer src.Close()
if lg.DebugEnabled() {
@@ -1254,13 +1276,25 @@
}
// Filter the spans through the remaining predicates.
- ret := make([]*common.Span, 0, 32)
+ reserved := 32
+ if query.Lim < reserved {
+ reserved = query.Lim
+ }
+ ret := make([]*common.Span, 0, reserved)
for {
if len(ret) >= query.Lim {
+ if lg.DebugEnabled() {
+ lg.Debugf("HandleQuery %s: hit query limit after obtaining " +
+ "%d results. %s\n.", query, query.Lim, src.getStats())
+ }
break // we hit the result size limit
}
span := src.next()
if span == nil {
+ if lg.DebugEnabled() {
+ lg.Debugf("HandleQuery %s: found %d result(s), which are " +
+ "all that exist. %s\n", query, len(ret), src.getStats())
+ }
break // the source has no more spans to give
}
if lg.DebugEnabled() {
@@ -1277,7 +1311,7 @@
ret = append(ret, span)
}
}
- return ret, nil
+ return ret, nil, src.numRead
}
func (store *dataStore) ServerStats() *common.ServerStats {
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index a693874..281ee2d 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -28,6 +28,7 @@
"org/apache/htrace/conf"
"org/apache/htrace/test"
"os"
+ "reflect"
"sort"
"testing"
"time"
@@ -122,10 +123,15 @@
}
func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
- expectedSpans []common.Span) {
- spans, err := ht.Store.HandleQuery(query)
+ expectedSpans []common.Span) {
+ testQueryExt(t, ht, query, expectedSpans, nil)
+}
+
+func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query,
+ expectedSpans []common.Span, expectedNumScanned []int) {
+ spans, err, numScanned := ht.Store.HandleQuery(query)
if err != nil {
- t.Fatalf("First query failed: %s\n", err.Error())
+ t.Fatalf("Query %s failed: %s\n", query.String(), err.Error())
}
expectedBuf := new(bytes.Buffer)
dec := json.NewEncoder(expectedBuf)
@@ -142,6 +148,12 @@
t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
len(expectedSpans))
common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
+ if expectedNumScanned != nil {
+ if !reflect.DeepEqual(expectedNumScanned, numScanned) {
+ t.Fatalf("Invalid values for numScanned: got %v, expected %v\n",
+ expectedNumScanned, numScanned)
+ }
+ }
}
// Test queries on the datastore.
@@ -721,3 +733,29 @@
Prev: &SIMPLE_TEST_SPANS[1],
}, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
}
+
+func TestQueryRowsScanned(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned",
+ WrittenSpans: common.NewSemaphore(0),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
+ testQueryExt(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.SPAN_ID,
+ Val: common.TestId("00000000000000000000000000000001").String(),
+ },
+ },
+ Lim: 100,
+ Prev: nil,
+ }, []common.Span{SIMPLE_TEST_SPANS[0]},
+ []int{2, 1})
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 74ec0cf..eabeee7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -278,7 +278,7 @@
return
}
var results []*common.Span
- results, err = hand.store.HandleQuery(&query)
+ results, err, _ = hand.store.HandleQuery(&query)
if err != nil {
writeError(hand.lg, w, http.StatusInternalServerError,
fmt.Sprintf("Internal error processing query %s: %s",