HTRACE-282. htraced: reap spans which are older than a configurable interval (cmccabe)
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 34ed15e..5e57f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -114,6 +114,16 @@
 
 	// Per-host Span Metrics
 	HostSpanMetrics SpanMetricsMap
+
+	// The time (in UTC milliseconds since the epoch) when the
+	// datastore was last started.
+	LastStartMs int64
+
+	// The current time (in UTC milliseconds since the epoch) on the server.
+	CurMs int64
+
+	// The total number of spans which have been reaped.
+	ReapedSpans uint64
 }
 
 type StorageDirectoryStats struct {
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time.go b/htrace-htraced/go/src/org/apache/htrace/common/time.go
new file mode 100644
index 0000000..8b4b6b8
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+	"time"
+)
+
+func TimeToUnixMs(t time.Time) int64 {
+	return t.UnixNano() / 1000000
+}
+
+func UnixMsToTime(u int64) time.Time {
+	secs := u / 1000
+	nanos := u - (secs * 1000)
+	return time.Unix(secs, nanos)
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
new file mode 100644
index 0000000..11e2733
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+	"testing"
+)
+
+func testRoundTrip(t *testing.T, u int64) {
+	tme := UnixMsToTime(u)
+	u2 := TimeToUnixMs(tme)
+	if u2 != u {
+		t.Fatalf("Error taking %d on a round trip: came back as "+
+			"%d instead.\n", u, u2)
+	}
+}
+
+func TestTimeConversions(t *testing.T) {
+	testRoundTrip(t, 0)
+	testRoundTrip(t, 1445540632000)
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 487762b..ed809f9 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -75,6 +75,12 @@
 // The maximum number of addresses for which we will maintain metrics.
 const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
 
+// The number of milliseconds we should keep spans before discarding them.
+const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms"
+
+// The period between updates to the span reaper
+const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
+
 // A host:port pair to send information to on startup.  This is used in unit
 // tests to determine the (random) port of the htraced process that has been
 // started.
@@ -92,6 +98,8 @@
 	HTRACE_LOG_LEVEL:                   "INFO",
 	HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
 	HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
+	HTRACE_SPAN_EXPIRY_MS:              fmt.Sprintf("%d", 3*24*60*60*1000),
+	HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),
 }
 
 // Values to be used when creating test configurations
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 749acdf..e7286ff 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -33,6 +33,7 @@
 	"os"
 	"sort"
 	"strings"
+	"text/tabwriter"
 	"time"
 )
 
@@ -197,8 +198,17 @@
 		fmt.Println(err.Error())
 		return EXIT_FAILURE
 	}
-	fmt.Printf("HTRACED SERVER STATS:\n")
-	fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+	w := new(tabwriter.Writer)
+	w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+	fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+	fmt.Fprintf(w, "Datastore Start\t%s\n",
+		common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+	fmt.Fprintf(w, "Server Time\t%s\n",
+		common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+	fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+	fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+	w.Flush()
+	fmt.Println("")
 	for i := range stats.Dirs {
 		dir := stats.Dirs[i]
 		fmt.Printf("==== %s ===\n", dir.Path)
@@ -206,7 +216,9 @@
 		stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
 		fmt.Printf("%s\n", stats)
 	}
-	fmt.Printf("HOST SPAN METRICS:\n")
+	w = new(tabwriter.Writer)
+	w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+	fmt.Fprintf(w, "HOST SPAN METRICS\n")
 	mtxMap := stats.HostSpanMetrics
 	keys := make(sort.StringSlice, len(mtxMap))
 	i := 0
@@ -217,9 +229,10 @@
 	sort.Sort(keys)
 	for k := range keys {
 		mtx := mtxMap[keys[k]]
-		fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n",
+		fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n",
 			keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
 	}
+	w.Flush()
 	return EXIT_SUCCESS
 }
 
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 780b6d2..5d5559a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,6 +31,9 @@
 	"os"
 	"strconv"
 	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
 )
 
 //
@@ -138,10 +141,83 @@
 				mtx.Clear()
 			}
 			shd.store.msink.UpdateMetrics(mtxMap)
+			shd.pruneExpired()
 		}
 	}
 }
 
+func (shd *shard) pruneExpired() {
+	lg := shd.store.rpr.lg
+	src, err := CreateReaperSource(shd)
+	if err != nil {
+		lg.Errorf("Error creating reaper source for shd(%s): %s\n",
+			shd.path, err.Error())
+		return
+	}
+	var totalReaped uint64
+	defer func() {
+		src.Close()
+		if totalReaped > 0 {
+			atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped)
+		}
+	}()
+	urdate := s2u64(shd.store.rpr.GetReaperDate())
+	for {
+		span := src.next()
+		if span == nil {
+			lg.Debugf("After reaping %d span(s), no more found in shard %s "+
+				"to reap.\n", totalReaped, shd.path)
+			return
+		}
+		begin := s2u64(span.Begin)
+		if begin >= urdate {
+			lg.Debugf("After reaping %d span(s), the remaining spans in "+
+				"shard %s are new enough to be kept\n",
+				totalReaped, shd.path)
+			return
+		}
+		err = shd.DeleteSpan(span)
+		if err != nil {
+			lg.Errorf("Error deleting span %s from shd(%s): %s\n",
+				span.String(), shd.path, err.Error())
+			return
+		}
+		if lg.TraceEnabled() {
+			lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path)
+		}
+		totalReaped++
+	}
+}
+
+// Delete a span from the shard.  Note that leveldb may retain the data until
+// compaction(s) remove it.
+func (shd *shard) DeleteSpan(span *common.Span) error {
+	batch := levigo.NewWriteBatch()
+	defer batch.Close()
+	primaryKey :=
+		append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
+	batch.Delete(primaryKey)
+	for parentIdx := range span.Parents {
+		key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
+			span.Parents[parentIdx].Val()...), span.Id.Val()...)
+		batch.Delete(key)
+	}
+	beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
+		u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
+	batch.Delete(beginTimeKey)
+	endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
+		u64toSlice(s2u64(span.End))...), span.Id.Val()...)
+	batch.Delete(endTimeKey)
+	durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
+		u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
+	batch.Delete(durationKey)
+	err := shd.ldb.Write(shd.store.writeOpts, batch)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
 // Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
 // highest bit, so that negative input values map to unsigned numbers which are
 // less than non-negative input values.
@@ -251,6 +327,107 @@
 	lg.Infof("Closed %s...\n", shd.path)
 }
 
+type Reaper struct {
+	// The logger used by the reaper
+	lg *common.Logger
+
+	// The number of milliseconds to keep spans around, in milliseconds.
+	spanExpiryMs int64
+
+	// The oldest date for which we'll keep spans.
+	reaperDate int64
+
+	// A channel used to send heartbeats to the reaper
+	heartbeats chan interface{}
+
+	// A channel used to block until the reaper goroutine has exited.
+	exited chan interface{}
+
+	// The lock protecting reaper data.
+	lock sync.Mutex
+
+	// The reaper heartbeater
+	hb *Heartbeater
+
+	// The total number of spans which have been reaped.
+	ReapedSpans uint64
+}
+
+func NewReaper(cnf *conf.Config) *Reaper {
+	rpr := &Reaper{
+		lg:           common.NewLogger("reaper", cnf),
+		spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
+		heartbeats:   make(chan interface{}, 1),
+		exited:       make(chan interface{}),
+	}
+	rpr.hb = NewHeartbeater("ReaperHeartbeater",
+		cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+	go rpr.run()
+	rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
+		name:       "reaper",
+		targetChan: rpr.heartbeats,
+	})
+	return rpr
+}
+
+func (rpr *Reaper) run() {
+	defer func() {
+		rpr.lg.Info("Exiting Reaper goroutine.\n")
+		rpr.exited <- nil
+	}()
+
+	for {
+		_, isOpen := <-rpr.heartbeats
+		if !isOpen {
+			return
+		}
+		rpr.handleHeartbeat()
+	}
+}
+
+func (rpr *Reaper) handleHeartbeat() {
+	// TODO: check dataStore fullness
+	now := common.TimeToUnixMs(time.Now().UTC())
+	d, updated := func() (int64, bool) {
+		rpr.lock.Lock()
+		defer rpr.lock.Unlock()
+		newReaperDate := now - rpr.spanExpiryMs
+		if newReaperDate > rpr.reaperDate {
+			rpr.reaperDate = newReaperDate
+			return rpr.reaperDate, true
+		} else {
+			return rpr.reaperDate, false
+		}
+	}()
+	if rpr.lg.DebugEnabled() {
+		if updated {
+			rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
+				common.UnixMsToTime(d).Format(time.RFC3339))
+		} else {
+			rpr.lg.Debugf("Not updating previous reaperDate of %s.\n",
+				common.UnixMsToTime(d).Format(time.RFC3339))
+		}
+	}
+}
+
+func (rpr *Reaper) GetReaperDate() int64 {
+	rpr.lock.Lock()
+	defer rpr.lock.Unlock()
+	return rpr.reaperDate
+}
+
+func (rpr *Reaper) SetReaperDate(rdate int64) {
+	rpr.lock.Lock()
+	defer rpr.lock.Unlock()
+	rpr.reaperDate = rdate
+}
+
+func (rpr *Reaper) Shutdown() {
+	rpr.hb.Shutdown()
+	close(rpr.heartbeats)
+	<-rpr.exited
+}
+
 // The Data Store.
 type dataStore struct {
 	lg *common.Logger
@@ -273,6 +450,12 @@
 
 	// The heartbeater which periodically asks shards to update the MetricsSink.
 	hb *Heartbeater
+
+	// The reaper for this datastore
+	rpr *Reaper
+
+	// When this datastore was started (in UTC milliseconds since the epoch)
+	startMs int64
 }
 
 func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
@@ -327,6 +510,8 @@
 			targetChan: shd.heartbeats,
 		})
 	}
+	store.rpr = NewReaper(cnf)
+	store.startMs = common.TimeToUnixMs(time.Now().UTC())
 	return store, nil
 }
 
@@ -456,6 +641,10 @@
 		store.shards[idx].Close()
 		store.shards[idx] = nil
 	}
+	if store.rpr != nil {
+		store.rpr.Shutdown()
+		store.rpr = nil
+	}
 	if store.msink != nil {
 		store.msink.Shutdown()
 		store.msink = nil
@@ -502,8 +691,8 @@
 	var span *common.Span
 	span, err = shd.decodeSpan(sid, buf)
 	if err != nil {
-		lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s\n",
-			shd.path, sid.String(), err.Error())
+		lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n",
+			shd.path, sid.String(), err.Error(), hex.EncodeToString(buf))
 		return nil
 	}
 	return span
@@ -704,6 +893,7 @@
 	var ret *source
 	src := source{store: store,
 		pred:      pred,
+		shards:    make([]*shard, len(store.shards)),
 		iters:     make([]*levigo.Iterator, 0, len(store.shards)),
 		nexts:     make([]*common.Span, len(store.shards)),
 		numRead:   make([]int, len(store.shards)),
@@ -720,6 +910,7 @@
 	}()
 	for shardIdx := range store.shards {
 		shd := store.shards[shardIdx]
+		src.shards[shardIdx] = shd
 		src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
 	}
 	var searchKey []byte
@@ -804,12 +995,41 @@
 type source struct {
 	store     *dataStore
 	pred      *predicateData
+	shards    []*shard
 	iters     []*levigo.Iterator
 	nexts     []*common.Span
 	numRead   []int
 	keyPrefix byte
 }
 
+func CreateReaperSource(shd *shard) (*source, error) {
+	store := shd.store
+	p := &common.Predicate{
+		Op:    common.GREATER_THAN_OR_EQUALS,
+		Field: common.BEGIN_TIME,
+		Val:   common.INVALID_SPAN_ID.String(),
+	}
+	pred, err := loadPredicateData(p)
+	if err != nil {
+		return nil, err
+	}
+	src := &source{
+		store:     store,
+		pred:      pred,
+		shards:    []*shard{shd},
+		iters:     make([]*levigo.Iterator, 1),
+		nexts:     make([]*common.Span, 1),
+		numRead:   make([]int, 1),
+		keyPrefix: pred.getIndexPrefix(),
+	}
+	iter := shd.ldb.NewIterator(store.readOpts)
+	src.iters[0] = iter
+	searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
+		pred.key...)
+	iter.Seek(searchKey)
+	return src, nil
+}
+
 // Return true if this operation may require skipping the first result we get back from leveldb.
 func mayRequireOneSkip(op common.Op) bool {
 	switch op {
@@ -834,24 +1054,25 @@
 	lg := src.store.lg
 	var err error
 	iter := src.iters[shardIdx]
+	shdPath := src.shards[shardIdx].path
 	if iter == nil {
-		lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
+		lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath)
 		return // There are no more entries in this shard.
 	}
 	if src.nexts[shardIdx] != nil {
-		lg.Debugf("No need to populate shard %d\n", shardIdx)
+		lg.Debugf("No need to populate shard %s\n", shdPath)
 		return // We already have a valid entry for this shard.
 	}
 	for {
 		if !iter.Valid() {
-			lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
+			lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath)
 			break // Can't read past end of DB
 		}
 		src.numRead[shardIdx]++
 		key := iter.Key()
 		if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
-			lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n",
-				shardIdx, string(src.keyPrefix))
+			lg.Debugf("Can't populate: Iterator for shard %s does not have prefix %s\n",
+				shdPath, string(src.keyPrefix))
 			break // Can't read past end of indexed section
 		}
 		var span *common.Span
@@ -859,19 +1080,19 @@
 		if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
 			// The span id maps to the span itself.
 			sid = common.SpanId(key[1:17])
-			span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
+			span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value())
 			if err != nil {
-				lg.Debugf("Internal error decoding span %s in shard %d: %s\n",
-					sid.String(), shardIdx, err.Error())
+				lg.Debugf("Internal error decoding span %s in shard %s: %s\n",
+					sid.String(), shdPath, err.Error())
 				break
 			}
 		} else {
 			// With a secondary index, we have to look up the span by id.
 			sid = common.SpanId(key[9:25])
-			span = src.store.shards[shardIdx].FindSpan(sid)
+			span = src.shards[shardIdx].FindSpan(sid)
 			if span == nil {
-				lg.Debugf("Internal error rehydrating span %s in shard %d\n",
-					sid.String(), shardIdx)
+				lg.Debugf("Internal error rehydrating span %s in shard %s\n",
+					sid.String(), shdPath)
 				break
 			}
 		}
@@ -881,12 +1102,12 @@
 			iter.Next()
 		}
 		if src.pred.satisfiedBy(span) {
-			lg.Debugf("Populated valid span %v from shard %d.\n", sid, shardIdx)
+			lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath)
 			src.nexts[shardIdx] = span // Found valid entry
 			return
 		} else {
-			lg.Debugf("Span %s from shard %d does not satisfy the predicate.\n",
-				sid.String(), shardIdx)
+			lg.Debugf("Span %s from shard %s does not satisfy the predicate.\n",
+				sid.String(), shdPath)
 			if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) {
 				continue
 			}
@@ -894,13 +1115,13 @@
 			break
 		}
 	}
-	lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+	lg.Debugf("Closing iterator for shard %s.\n", shdPath)
 	iter.Close()
 	src.iters[shardIdx] = nil
 }
 
 func (src *source) next() *common.Span {
-	for shardIdx := range src.iters {
+	for shardIdx := range src.shards {
 		src.populateNextFromShard(shardIdx)
 	}
 	var best *common.Span
@@ -1017,5 +1238,8 @@
 			shard.ldb.PropertyValue("leveldb.stats"))
 	}
 	serverStats.HostSpanMetrics = store.msink.AccessTotals()
+	serverStats.LastStartMs = store.startMs
+	serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
+	serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
 	return &serverStats
 }
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
index ea4b053..49a21ee 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -21,8 +21,8 @@
 
 import (
 	"org/apache/htrace/common"
-	"time"
 	"sync"
+	"time"
 )
 
 type Heartbeater struct {
@@ -84,6 +84,10 @@
 }
 
 func (hb *Heartbeater) run() {
+	defer func() {
+		hb.lg.Debugf("%s: exiting.\n", hb.String())
+		hb.wg.Done()
+	}()
 	period := time.Duration(hb.periodMs) * time.Millisecond
 	for {
 		periodEnd := time.Now().Add(period)
@@ -99,8 +103,6 @@
 			select {
 			case tgt, open := <-hb.req:
 				if !open {
-					defer hb.wg.Done()
-					hb.lg.Debugf("%s: exiting.\n", hb.String())
 					return
 				}
 				hb.targets = append(hb.targets, *tgt)
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..7aef1d1
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+	"org/apache/htrace/test"
+	"testing"
+	"time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+	const NUM_TEST_SPANS = 20
+	testSpans := make([]*common.Span, NUM_TEST_SPANS)
+	rnd := rand.New(rand.NewSource(2))
+	now := common.TimeToUnixMs(time.Now().UTC())
+	for i := range testSpans {
+		testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+		testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+		testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+	}
+	htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+		Cnf: map[string]string{
+			conf.HTRACE_SPAN_EXPIRY_MS:              fmt.Sprintf("%d", 60*60*1000),
+			conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
+		WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+		DataDirs:     make([]string, 2),
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
+	}
+	for i := range testSpans {
+		ht.Store.WriteSpan(&IncomingSpan{
+			Addr: "127.0.0.1:1234",
+			Span: testSpans[i],
+		})
+	}
+	// Wait the spans to be created
+	for i := 0; i < len(testSpans); i++ {
+		<-ht.Store.WrittenSpans
+	}
+	// Set a reaper date that will remove all the spans except final one.
+	ht.Store.rpr.SetReaperDate(now)
+
+	common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+		for i := 0; i < NUM_TEST_SPANS-1; i++ {
+			span := ht.Store.FindSpan(testSpans[i].Id)
+			if span != nil {
+				ht.Store.lg.Debugf("Waiting for %s to be removed...\n",
+					testSpans[i].Description)
+				return false
+			}
+		}
+		span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+		if span == nil {
+			ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+				testSpans[NUM_TEST_SPANS-1].Description)
+			return false
+		}
+		return true
+	})
+	defer ht.Close()
+}