HTRACE-282. htraced: reap spans which are older than a configurable interval (cmccabe)
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 34ed15e..5e57f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -114,6 +114,16 @@
// Per-host Span Metrics
HostSpanMetrics SpanMetricsMap
+
+ // The time (in UTC milliseconds since the epoch) when the
+ // datastore was last started.
+ LastStartMs int64
+
+ // The current time (in UTC milliseconds since the epoch) on the server.
+ CurMs int64
+
+ // The total number of spans which have been reaped.
+ ReapedSpans uint64
}
type StorageDirectoryStats struct {
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time.go b/htrace-htraced/go/src/org/apache/htrace/common/time.go
new file mode 100644
index 0000000..8b4b6b8
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+ "time"
+)
+
+func TimeToUnixMs(t time.Time) int64 {
+ return t.UnixNano() / 1000000
+}
+
+func UnixMsToTime(u int64) time.Time {
+ secs := u / 1000
+ nanos := u - (secs * 1000)
+ return time.Unix(secs, nanos)
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
new file mode 100644
index 0000000..11e2733
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+ "testing"
+)
+
+func testRoundTrip(t *testing.T, u int64) {
+ tme := UnixMsToTime(u)
+ u2 := TimeToUnixMs(tme)
+ if u2 != u {
+ t.Fatalf("Error taking %d on a round trip: came back as "+
+ "%d instead.\n", u, u2)
+ }
+}
+
+func TestTimeConversions(t *testing.T) {
+ testRoundTrip(t, 0)
+ testRoundTrip(t, 1445540632000)
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 487762b..ed809f9 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -75,6 +75,12 @@
// The maximum number of addresses for which we will maintain metrics.
const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
+// The number of milliseconds we should keep spans before discarding them.
+const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms"
+
+// The period between updates to the span reaper
+const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
+
// A host:port pair to send information to on startup. This is used in unit
// tests to determine the (random) port of the htraced process that has been
// started.
@@ -92,6 +98,8 @@
HTRACE_LOG_LEVEL: "INFO",
HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000",
+ HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 3*24*60*60*1000),
+ HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000),
}
// Values to be used when creating test configurations
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 749acdf..e7286ff 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -33,6 +33,7 @@
"os"
"sort"
"strings"
+ "text/tabwriter"
"time"
)
@@ -197,8 +198,17 @@
fmt.Println(err.Error())
return EXIT_FAILURE
}
- fmt.Printf("HTRACED SERVER STATS:\n")
- fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+ w := new(tabwriter.Writer)
+ w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+ fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+ fmt.Fprintf(w, "Datastore Start\t%s\n",
+ common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+ fmt.Fprintf(w, "Server Time\t%s\n",
+ common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+ fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+ fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+ w.Flush()
+ fmt.Println("")
for i := range stats.Dirs {
dir := stats.Dirs[i]
fmt.Printf("==== %s ===\n", dir.Path)
@@ -206,7 +216,9 @@
stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
fmt.Printf("%s\n", stats)
}
- fmt.Printf("HOST SPAN METRICS:\n")
+ w = new(tabwriter.Writer)
+ w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+ fmt.Fprintf(w, "HOST SPAN METRICS\n")
mtxMap := stats.HostSpanMetrics
keys := make(sort.StringSlice, len(mtxMap))
i := 0
@@ -217,9 +229,10 @@
sort.Sort(keys)
for k := range keys {
mtx := mtxMap[keys[k]]
- fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n",
+ fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n",
keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
}
+ w.Flush()
return EXIT_SUCCESS
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 780b6d2..5d5559a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,6 +31,9 @@
"os"
"strconv"
"strings"
+ "sync"
+ "sync/atomic"
+ "time"
)
//
@@ -138,10 +141,83 @@
mtx.Clear()
}
shd.store.msink.UpdateMetrics(mtxMap)
+ shd.pruneExpired()
}
}
}
+func (shd *shard) pruneExpired() {
+ lg := shd.store.rpr.lg
+ src, err := CreateReaperSource(shd)
+ if err != nil {
+ lg.Errorf("Error creating reaper source for shd(%s): %s\n",
+ shd.path, err.Error())
+ return
+ }
+ var totalReaped uint64
+ defer func() {
+ src.Close()
+ if totalReaped > 0 {
+ atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped)
+ }
+ }()
+ urdate := s2u64(shd.store.rpr.GetReaperDate())
+ for {
+ span := src.next()
+ if span == nil {
+ lg.Debugf("After reaping %d span(s), no more found in shard %s "+
+ "to reap.\n", totalReaped, shd.path)
+ return
+ }
+ begin := s2u64(span.Begin)
+ if begin >= urdate {
+ lg.Debugf("After reaping %d span(s), the remaining spans in "+
+ "shard %s are new enough to be kept\n",
+ totalReaped, shd.path)
+ return
+ }
+ err = shd.DeleteSpan(span)
+ if err != nil {
+ lg.Errorf("Error deleting span %s from shd(%s): %s\n",
+ span.String(), shd.path, err.Error())
+ return
+ }
+ if lg.TraceEnabled() {
+ lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path)
+ }
+ totalReaped++
+ }
+}
+
+// Delete a span from the shard. Note that leveldb may retain the data until
+// compaction(s) remove it.
+func (shd *shard) DeleteSpan(span *common.Span) error {
+ batch := levigo.NewWriteBatch()
+ defer batch.Close()
+ primaryKey :=
+ append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
+ batch.Delete(primaryKey)
+ for parentIdx := range span.Parents {
+ key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
+ span.Parents[parentIdx].Val()...), span.Id.Val()...)
+ batch.Delete(key)
+ }
+ beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
+ u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
+ batch.Delete(beginTimeKey)
+ endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
+ u64toSlice(s2u64(span.End))...), span.Id.Val()...)
+ batch.Delete(endTimeKey)
+ durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
+ u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
+ batch.Delete(durationKey)
+ err := shd.ldb.Write(shd.store.writeOpts, batch)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the
// highest bit, so that negative input values map to unsigned numbers which are
// less than non-negative input values.
@@ -251,6 +327,107 @@
lg.Infof("Closed %s...\n", shd.path)
}
+type Reaper struct {
+ // The logger used by the reaper
+ lg *common.Logger
+
+ // The number of milliseconds to keep spans around, in milliseconds.
+ spanExpiryMs int64
+
+ // The oldest date for which we'll keep spans.
+ reaperDate int64
+
+ // A channel used to send heartbeats to the reaper
+ heartbeats chan interface{}
+
+ // A channel used to block until the reaper goroutine has exited.
+ exited chan interface{}
+
+ // The lock protecting reaper data.
+ lock sync.Mutex
+
+ // The reaper heartbeater
+ hb *Heartbeater
+
+ // The total number of spans which have been reaped.
+ ReapedSpans uint64
+}
+
+func NewReaper(cnf *conf.Config) *Reaper {
+ rpr := &Reaper{
+ lg: common.NewLogger("reaper", cnf),
+ spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
+ heartbeats: make(chan interface{}, 1),
+ exited: make(chan interface{}),
+ }
+ rpr.hb = NewHeartbeater("ReaperHeartbeater",
+ cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+ go rpr.run()
+ rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
+ name: "reaper",
+ targetChan: rpr.heartbeats,
+ })
+ return rpr
+}
+
+func (rpr *Reaper) run() {
+ defer func() {
+ rpr.lg.Info("Exiting Reaper goroutine.\n")
+ rpr.exited <- nil
+ }()
+
+ for {
+ _, isOpen := <-rpr.heartbeats
+ if !isOpen {
+ return
+ }
+ rpr.handleHeartbeat()
+ }
+}
+
+func (rpr *Reaper) handleHeartbeat() {
+ // TODO: check dataStore fullness
+ now := common.TimeToUnixMs(time.Now().UTC())
+ d, updated := func() (int64, bool) {
+ rpr.lock.Lock()
+ defer rpr.lock.Unlock()
+ newReaperDate := now - rpr.spanExpiryMs
+ if newReaperDate > rpr.reaperDate {
+ rpr.reaperDate = newReaperDate
+ return rpr.reaperDate, true
+ } else {
+ return rpr.reaperDate, false
+ }
+ }()
+ if rpr.lg.DebugEnabled() {
+ if updated {
+ rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
+ common.UnixMsToTime(d).Format(time.RFC3339))
+ } else {
+ rpr.lg.Debugf("Not updating previous reaperDate of %s.\n",
+ common.UnixMsToTime(d).Format(time.RFC3339))
+ }
+ }
+}
+
+func (rpr *Reaper) GetReaperDate() int64 {
+ rpr.lock.Lock()
+ defer rpr.lock.Unlock()
+ return rpr.reaperDate
+}
+
+func (rpr *Reaper) SetReaperDate(rdate int64) {
+ rpr.lock.Lock()
+ defer rpr.lock.Unlock()
+ rpr.reaperDate = rdate
+}
+
+func (rpr *Reaper) Shutdown() {
+ rpr.hb.Shutdown()
+ close(rpr.heartbeats)
+ <-rpr.exited
+}
+
// The Data Store.
type dataStore struct {
lg *common.Logger
@@ -273,6 +450,12 @@
// The heartbeater which periodically asks shards to update the MetricsSink.
hb *Heartbeater
+
+ // The reaper for this datastore
+ rpr *Reaper
+
+ // When this datastore was started (in UTC milliseconds since the epoch)
+ startMs int64
}
func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
@@ -327,6 +510,8 @@
targetChan: shd.heartbeats,
})
}
+ store.rpr = NewReaper(cnf)
+ store.startMs = common.TimeToUnixMs(time.Now().UTC())
return store, nil
}
@@ -456,6 +641,10 @@
store.shards[idx].Close()
store.shards[idx] = nil
}
+ if store.rpr != nil {
+ store.rpr.Shutdown()
+ store.rpr = nil
+ }
if store.msink != nil {
store.msink.Shutdown()
store.msink = nil
@@ -502,8 +691,8 @@
var span *common.Span
span, err = shd.decodeSpan(sid, buf)
if err != nil {
- lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n",
- shd.path, sid.String(), err.Error())
+ lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n",
+ shd.path, sid.String(), err.Error(), hex.EncodeToString(buf))
return nil
}
return span
@@ -704,6 +893,7 @@
var ret *source
src := source{store: store,
pred: pred,
+ shards: make([]*shard, len(store.shards)),
iters: make([]*levigo.Iterator, 0, len(store.shards)),
nexts: make([]*common.Span, len(store.shards)),
numRead: make([]int, len(store.shards)),
@@ -720,6 +910,7 @@
}()
for shardIdx := range store.shards {
shd := store.shards[shardIdx]
+ src.shards[shardIdx] = shd
src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
}
var searchKey []byte
@@ -804,12 +995,41 @@
type source struct {
store *dataStore
pred *predicateData
+ shards []*shard
iters []*levigo.Iterator
nexts []*common.Span
numRead []int
keyPrefix byte
}
+func CreateReaperSource(shd *shard) (*source, error) {
+ store := shd.store
+ p := &common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: common.INVALID_SPAN_ID.String(),
+ }
+ pred, err := loadPredicateData(p)
+ if err != nil {
+ return nil, err
+ }
+ src := &source{
+ store: store,
+ pred: pred,
+ shards: []*shard{shd},
+ iters: make([]*levigo.Iterator, 1),
+ nexts: make([]*common.Span, 1),
+ numRead: make([]int, 1),
+ keyPrefix: pred.getIndexPrefix(),
+ }
+ iter := shd.ldb.NewIterator(store.readOpts)
+ src.iters[0] = iter
+ searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
+ pred.key...)
+ iter.Seek(searchKey)
+ return src, nil
+}
+
// Return true if this operation may require skipping the first result we get back from leveldb.
func mayRequireOneSkip(op common.Op) bool {
switch op {
@@ -834,24 +1054,25 @@
lg := src.store.lg
var err error
iter := src.iters[shardIdx]
+ shdPath := src.shards[shardIdx].path
if iter == nil {
- lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
+ lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath)
return // There are no more entries in this shard.
}
if src.nexts[shardIdx] != nil {
- lg.Debugf("No need to populate shard %d\n", shardIdx)
+ lg.Debugf("No need to populate shard %s\n", shdPath)
return // We already have a valid entry for this shard.
}
for {
if !iter.Valid() {
- lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
+ lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath)
break // Can't read past end of DB
}
src.numRead[shardIdx]++
key := iter.Key()
if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
- lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n",
- shardIdx, string(src.keyPrefix))
+ lg.Debugf("Can't populate: Iterator for shard %s does not have prefix %s\n",
+ shdPath, string(src.keyPrefix))
break // Can't read past end of indexed section
}
var span *common.Span
@@ -859,19 +1080,19 @@
if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
// The span id maps to the span itself.
sid = common.SpanId(key[1:17])
- span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
+ span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value())
if err != nil {
- lg.Debugf("Internal error decoding span %s in shard %d: %s\n",
- sid.String(), shardIdx, err.Error())
+ lg.Debugf("Internal error decoding span %s in shard %s: %s\n",
+ sid.String(), shdPath, err.Error())
break
}
} else {
// With a secondary index, we have to look up the span by id.
sid = common.SpanId(key[9:25])
- span = src.store.shards[shardIdx].FindSpan(sid)
+ span = src.shards[shardIdx].FindSpan(sid)
if span == nil {
- lg.Debugf("Internal error rehydrating span %s in shard %d\n",
- sid.String(), shardIdx)
+ lg.Debugf("Internal error rehydrating span %s in shard %s\n",
+ sid.String(), shdPath)
break
}
}
@@ -881,12 +1102,12 @@
iter.Next()
}
if src.pred.satisfiedBy(span) {
- lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx)
+ lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath)
src.nexts[shardIdx] = span // Found valid entry
return
} else {
- lg.Debugf("Span %s from shard %d does not satisfy the predicate.\n",
- sid.String(), shardIdx)
+ lg.Debugf("Span %s from shard %s does not satisfy the predicate.\n",
+ sid.String(), shdPath)
if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) {
continue
}
@@ -894,13 +1115,13 @@
break
}
}
- lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+ lg.Debugf("Closing iterator for shard %s.\n", shdPath)
iter.Close()
src.iters[shardIdx] = nil
}
func (src *source) next() *common.Span {
- for shardIdx := range src.iters {
+ for shardIdx := range src.shards {
src.populateNextFromShard(shardIdx)
}
var best *common.Span
@@ -1017,5 +1238,8 @@
shard.ldb.PropertyValue("leveldb.stats"))
}
serverStats.HostSpanMetrics = store.msink.AccessTotals()
+ serverStats.LastStartMs = store.startMs
+ serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
+ serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
return &serverStats
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
index ea4b053..49a21ee 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -21,8 +21,8 @@
import (
"org/apache/htrace/common"
- "time"
"sync"
+ "time"
)
type Heartbeater struct {
@@ -84,6 +84,10 @@
}
func (hb *Heartbeater) run() {
+ defer func() {
+ hb.lg.Debugf("%s: exiting.\n", hb.String())
+ hb.wg.Done()
+ }()
period := time.Duration(hb.periodMs) * time.Millisecond
for {
periodEnd := time.Now().Add(period)
@@ -99,8 +103,6 @@
select {
case tgt, open := <-hb.req:
if !open {
- defer hb.wg.Done()
- hb.lg.Debugf("%s: exiting.\n", hb.String())
return
}
hb.targets = append(hb.targets, *tgt)
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..7aef1d1
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "org/apache/htrace/test"
+ "testing"
+ "time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+ const NUM_TEST_SPANS = 20
+ testSpans := make([]*common.Span, NUM_TEST_SPANS)
+ rnd := rand.New(rand.NewSource(2))
+ now := common.TimeToUnixMs(time.Now().UTC())
+ for i := range testSpans {
+ testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+ testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+ testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+ }
+ htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+ Cnf: map[string]string{
+ conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000),
+ conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
+ WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+ DataDirs: make([]string, 2),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
+ }
+ for i := range testSpans {
+ ht.Store.WriteSpan(&IncomingSpan{
+ Addr: "127.0.0.1:1234",
+ Span: testSpans[i],
+ })
+ }
+ // Wait the spans to be created
+ for i := 0; i < len(testSpans); i++ {
+ <-ht.Store.WrittenSpans
+ }
+ // Set a reaper date that will remove all the spans except final one.
+ ht.Store.rpr.SetReaperDate(now)
+
+ common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+ for i := 0; i < NUM_TEST_SPANS-1; i++ {
+ span := ht.Store.FindSpan(testSpans[i].Id)
+ if span != nil {
+ ht.Store.lg.Debugf("Waiting for %s to be removed...\n",
+ testSpans[i].Description)
+ return false
+ }
+ }
+ span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+ if span == nil {
+ ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+ testSpans[NUM_TEST_SPANS-1].Description)
+ return false
+ }
+ return true
+ })
+ defer ht.Close()
+}