HTRACE-280. htraced: add metrics about total spans added and dropped per address (cmccabe)
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process.go b/htrace-htraced/go/src/org/apache/htrace/common/process.go
index 419d6fe..aad6ca1 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process.go
@@ -69,7 +69,7 @@
sigQuitChan := make(chan os.Signal, 1)
signal.Notify(sigQuitChan, syscall.SIGQUIT)
go func() {
- bufSize := 1<<20
+ bufSize := 1 << 20
buf := make([]byte, bufSize)
for {
<-sigQuitChan
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
index 7609133..d3f5a56 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
@@ -42,7 +42,7 @@
os.Exit(0)
}
helper := exec.Command(os.Args[0], "-test.run=TestSignals", "--")
- helper.Env = []string { HTRACED_TEST_HELPER_PROCESS + "=1" }
+ helper.Env = []string{HTRACED_TEST_HELPER_PROCESS + "=1"}
stdoutPipe, err := helper.StdoutPipe()
if err != nil {
panic(fmt.Sprintf("Failed to open pipe to process stdout: %s",
@@ -77,7 +77,7 @@
}
t.Logf("Saw 'Terminating on signal: SIGINT'. " +
"Helper goroutine exiting.\n")
- done<-nil
+ done <- nil
}()
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
@@ -97,9 +97,9 @@
// Run the helper process which TestSignals spawns.
func runHelperProcess() {
- cnfMap := map[string]string {
+ cnfMap := map[string]string{
conf.HTRACE_LOG_LEVEL: "TRACE",
- conf.HTRACE_LOG_PATH: "", // log to stdout
+ conf.HTRACE_LOG_PATH: "", // log to stdout
}
cnfBld := conf.Builder{Values: cnfMap, Defaults: conf.DEFAULTS}
cnf, err := cnfBld.Build()
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 9c7bfad..34ed15e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,8 +38,10 @@
// A request to write spans to htraced.
type WriteSpansReq struct {
- DefaultTrid string `json:",omitempty"`
- Spans []*Span
+ Addr string // This gets filled in by the RPC layer.
+ DefaultTrid string `json:",omitempty"`
+ Spans []*Span
+ ClientDropped uint64 `json:",omitempty"`
}
// Info returned by /server/info
@@ -55,22 +57,6 @@
type WriteSpansResp struct {
}
-// Info returned by /server/stats
-type ServerStats struct {
- Shards []ShardStats
-}
-
-type ShardStats struct {
- Path string
-
- // The approximate number of spans present in this shard. This may be an
- // underestimate.
- ApproxNumSpans uint64
-
- // leveldb.stats information
- LevelDbStats string
-}
-
// The header which is sent over the wire for HRPC
type HrpcRequestHeader struct {
Magic uint32
@@ -104,3 +90,39 @@
return METHOD_ID_NONE
}
}
+
+type SpanMetrics struct {
+ // The total number of spans written to HTraced.
+ Written uint64
+
+ // The total number of spans dropped by the server.
+ ServerDropped uint64
+
+ // The total number of spans dropped by the client. Note that this number
+ // is tracked on the client itself and doesn't get updated if the client
+ // can't contact the server.
+ ClientDropped uint64
+}
+
+// A map from network address strings to SpanMetrics structures.
+type SpanMetricsMap map[string]*SpanMetrics
+
+// Info returned by /server/stats
+type ServerStats struct {
+ // Statistics for each shard (directory)
+ Dirs []StorageDirectoryStats
+
+ // Per-host Span Metrics
+ HostSpanMetrics SpanMetricsMap
+}
+
+type StorageDirectoryStats struct {
+ Path string
+
+ // The approximate number of spans present in this shard. This may be an
+ // underestimate.
+ ApproxNumSpans uint64
+
+ // leveldb.stats information
+ LevelDbStats string
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index ccb09e0..487762b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,6 +68,13 @@
// The log level to use for the logs in htrace.
const HTRACE_LOG_LEVEL = "log.level"
+// The period between metrics heartbeats. This is the approximate interval at which we will
+// update global metrics.
+const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+
+// The maximum number of addresses for which we will maintain metrics.
+const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
+
// A host:port pair to send information to on startup. This is used in unit
// tests to determine the (random) port of the htraced process that has been
// started.
@@ -83,4 +90,15 @@
HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
HTRACE_LOG_PATH: "",
HTRACE_LOG_LEVEL: "INFO",
+ HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+ HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000",
+}
+
+// Values to be used when creating test configurations
+func TEST_VALUES() map[string]string {
+ return map[string]string{
+ HTRACE_HRPC_ADDRESS: ":0", // use a random port for the HRPC server
+ HTRACE_LOG_LEVEL: "TRACE", // show all log messages in tests
+ HTRACE_WEB_ADDRESS: ":0", // use a random port for the REST server
+ }
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index f6972bb..749acdf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,6 +31,7 @@
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
+ "sort"
"strings"
"time"
)
@@ -196,15 +197,29 @@
fmt.Println(err.Error())
return EXIT_FAILURE
}
- fmt.Printf("HTraced server stats:\n")
- fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
- for i := range stats.Shards {
- shard := stats.Shards[i]
- fmt.Printf("==== %s ===\n", shard.Path)
- fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans)
- stats := strings.Replace(shard.LevelDbStats, "\\n", "\n", -1)
+ fmt.Printf("HTRACED SERVER STATS:\n")
+ fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+ for i := range stats.Dirs {
+ dir := stats.Dirs[i]
+ fmt.Printf("==== %s ===\n", dir.Path)
+ fmt.Printf("Approximate number of spans: %d\n", dir.ApproxNumSpans)
+ stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
fmt.Printf("%s\n", stats)
}
+ fmt.Printf("HOST SPAN METRICS:\n")
+ mtxMap := stats.HostSpanMetrics
+ keys := make(sort.StringSlice, len(mtxMap))
+ i := 0
+ for k, _ := range mtxMap {
+ keys[i] = k
+ i++
+ }
+ sort.Sort(keys)
+ for k := range keys {
+ mtx := mtxMap[keys[k]]
+ fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n",
+ keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
+ }
return EXIT_SUCCESS
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 540e688..ca0a425 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -210,7 +210,7 @@
func TestClientGetServerConf(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
- Cnf: map[string]string {
+ Cnf: map[string]string{
EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
},
DataDirs: make([]string, 2)}
@@ -230,6 +230,6 @@
}
if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
t.Fatalf("unexpected value for %s: %s",
- EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
+ EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
}
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 9fb9920..780b6d2 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,7 +31,6 @@
"os"
"strconv"
"strings"
- "sync/atomic"
)
//
@@ -77,19 +76,12 @@
const PARENT_ID_INDEX_PREFIX = 'p'
const INVALID_INDEX_PREFIX = 0
-type Statistics struct {
- NumSpansWritten uint64
-}
+type IncomingSpan struct {
+ // The address that the span was sent from.
+ Addr string
-func (stats *Statistics) IncrementWrittenSpans() {
- atomic.AddUint64(&stats.NumSpansWritten, 1)
-}
-
-// Make a copy of the statistics structure, using atomic operations.
-func (stats *Statistics) Copy() *Statistics {
- return &Statistics{
- NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
- }
+ // The span.
+ *common.Span
}
// A single directory containing a levelDB instance.
@@ -104,27 +96,48 @@
path string
// Incoming requests to write Spans.
- incoming chan *common.Span
+ incoming chan *IncomingSpan
+
+ // A channel for incoming heartbeats
+ heartbeats chan interface{}
// The channel we will send a bool to when we exit.
exited chan bool
+
+ // Per-address metrics
+ mtxMap ServerSpanMetricsMap
+
+ // The maximum number of metrics to allow in our map
+ maxMtx int
}
// Process incoming spans for a shard.
func (shd *shard) processIncoming() {
lg := shd.store.lg
+ defer func() {
+ lg.Infof("Shard processor for %s exiting.\n", shd.path)
+ shd.exited <- true
+ }()
for {
- span := <-shd.incoming
- if span == nil {
- lg.Infof("Shard processor for %s exiting.\n", shd.path)
- shd.exited <- true
- return
- }
- err := shd.writeSpan(span)
- if err != nil {
- lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
- } else {
- lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+ select {
+ case span := <-shd.incoming:
+ if span == nil {
+ return
+ }
+ err := shd.writeSpan(span)
+ if err != nil {
+ lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
+ } else {
+ lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+ }
+ case <-shd.heartbeats:
+ lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
+ mtxMap := make(ServerSpanMetricsMap)
+ for addr, mtx := range shd.mtxMap {
+ mtxMap[addr] = mtx.Clone()
+ mtx.Clear()
+ }
+ shd.store.msink.UpdateMetrics(mtxMap)
}
}
}
@@ -150,15 +163,19 @@
byte(0xff & (val >> 0))}
}
-func (shd *shard) writeSpan(span *common.Span) error {
+func (shd *shard) writeSpan(ispan *IncomingSpan) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
// Add SpanData to batch.
spanDataBuf := new(bytes.Buffer)
spanDataEnc := gob.NewEncoder(spanDataBuf)
+ span := ispan.Span
err := spanDataEnc.Encode(span.SpanData)
if err != nil {
+ shd.store.lg.Errorf("Error encoding span %s: %s\n",
+ span.String(), err.Error())
+ shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
return err
}
primaryKey :=
@@ -185,9 +202,12 @@
err = shd.ldb.Write(shd.store.writeOpts, batch)
if err != nil {
+ shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
+ span.String(), shd.path, err.Error())
+ shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
return err
}
- shd.store.stats.IncrementWrittenSpans()
+ shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
if shd.store.WrittenSpans != nil {
shd.store.WrittenSpans <- span
}
@@ -238,9 +258,6 @@
// The shards which manage our LevelDB instances.
shards []*shard
- // I/O statistics for all shards.
- stats Statistics
-
// The read options to use for LevelDB.
readOpts *levigo.ReadOptions
@@ -250,6 +267,12 @@
// If non-null, a channel we will send spans to once we finish writing them. This is only used
// for testing.
WrittenSpans chan *common.Span
+
+ // The metrics sink.
+ msink *MetricsSink
+
+ // The heartbeater which periodically asks shards to update the MetricsSink.
+ hb *Heartbeater
}
func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
@@ -286,11 +309,24 @@
}
store.shards = append(store.shards, shd)
}
+ store.msink = NewMetricsSink(cnf)
for idx := range store.shards {
shd := store.shards[idx]
shd.exited = make(chan bool, 1)
+ shd.heartbeats = make(chan interface{}, 1)
+ shd.mtxMap = make(ServerSpanMetricsMap)
+ shd.maxMtx = store.msink.maxMtx
go shd.processIncoming()
}
+ store.hb = NewHeartbeater("DatastoreHeartbeater",
+ cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+ for shdIdx := range store.shards {
+ shd := store.shards[shdIdx]
+ store.hb.AddHeartbeatTarget(&HeartbeatTarget{
+ name: fmt.Sprintf("shard(%s)", shd.path),
+ targetChan: shd.heartbeats,
+ })
+ }
return store, nil
}
@@ -372,7 +408,7 @@
}
spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
shd = &shard{store: store, ldb: ldb, path: path,
- incoming: make(chan *common.Span, spanBufferSize)}
+ incoming: make(chan *IncomingSpan, spanBufferSize)}
return shd, nil
}
@@ -406,16 +442,24 @@
return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
}
-func (store *dataStore) GetStatistics() *Statistics {
- return store.stats.Copy()
+func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap {
+ return store.msink.AccessTotals()
}
// Close the DataStore.
func (store *dataStore) Close() {
+ if store.hb != nil {
+ store.hb.Shutdown()
+ store.hb = nil
+ }
for idx := range store.shards {
store.shards[idx].Close()
store.shards[idx] = nil
}
+ if store.msink != nil {
+ store.msink.Shutdown()
+ store.msink = nil
+ }
if store.readOpts != nil {
store.readOpts.Close()
store.readOpts = nil
@@ -435,7 +479,7 @@
return int(sid.Hash32() % uint32(len(store.shards)))
}
-func (store *dataStore) WriteSpan(span *common.Span) {
+func (store *dataStore) WriteSpan(span *IncomingSpan) {
store.shards[store.getShardIndex(span.Id)].incoming <- span
}
@@ -954,11 +998,11 @@
func (store *dataStore) ServerStats() *common.ServerStats {
serverStats := common.ServerStats{
- Shards: make([]common.ShardStats, len(store.shards)),
+ Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
}
for shardIdx := range store.shards {
shard := store.shards[shardIdx]
- serverStats.Shards[shardIdx].Path = shard.path
+ serverStats.Dirs[shardIdx].Path = shard.path
r := levigo.Range{
Start: append([]byte{SPAN_ID_INDEX_PREFIX},
common.INVALID_SPAN_ID.Val()...),
@@ -966,11 +1010,12 @@
common.INVALID_SPAN_ID.Val()...),
}
vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
- serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
- serverStats.Shards[shardIdx].LevelDbStats =
+ serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0]
+ serverStats.Dirs[shardIdx].LevelDbStats =
shard.ldb.PropertyValue("leveldb.stats")
store.lg.Infof("shard.ldb.PropertyValue(leveldb.stats)=%s\n",
shard.ldb.PropertyValue("leveldb.stats"))
}
+ serverStats.HostSpanMetrics = store.msink.AccessTotals()
return &serverStats
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0caa509..50d2891 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,10 +73,13 @@
func createSpans(spans []common.Span, store *dataStore) {
for idx := range spans {
- store.WriteSpan(&spans[idx])
+ store.WriteSpan(&IncomingSpan{
+ Addr: "127.0.0.1:1234",
+ Span: &spans[idx],
+ })
}
// Wait the spans to be created
- for i := 0; i < 3; i++ {
+ for i := 0; i < len(spans); i++ {
<-store.WrittenSpans
}
}
@@ -85,6 +88,9 @@
func TestDatastoreWriteAndRead(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -92,9 +98,13 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
+
span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
if span == nil {
t.Fatal()
@@ -147,6 +157,9 @@
func TestSimpleQuery(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -154,9 +167,12 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
+
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -172,6 +188,9 @@
func TestQueries2(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -179,9 +198,11 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -224,6 +245,9 @@
func TestQueries3(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -231,9 +255,11 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -276,6 +302,9 @@
func TestQueries4(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -283,9 +312,11 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -320,6 +351,9 @@
func BenchmarkDatastoreWrites(b *testing.B) {
htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, b.N)}
ht, err := htraceBld.Build()
if err != nil {
@@ -331,22 +365,28 @@
// Write many random spans.
for n := 0; n < b.N; n++ {
span := test.NewRandomSpan(rnd, allSpans[0:n])
- ht.Store.WriteSpan(span)
+ ht.Store.WriteSpan(&IncomingSpan{
+ Addr: "127.0.0.1:1234",
+ Span: span,
+ })
allSpans[n] = span
}
// Wait for all the spans to be written.
for n := 0; n < b.N; n++ {
<-ht.Store.WrittenSpans
}
- spansWritten := ht.Store.GetStatistics().NumSpansWritten
- if spansWritten < uint64(b.N) {
- b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d",
- b.N, spansWritten)
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(b.N), // should be less than?
+ },
+ })
}
func TestReloadDataStore(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
ht, err := htraceBld.Build()
if err != nil {
@@ -444,6 +484,9 @@
func TestQueriesWithContinuationTokens1(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
+ Cnf: map[string]string{
+ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ },
WrittenSpans: make(chan *common.Span, 100)}
ht, err := htraceBld.Build()
if err != nil {
@@ -451,9 +494,11 @@
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
+ waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+ "127.0.0.1:1234": &common.SpanMetrics{
+ Written: uint64(len(SIMPLE_TEST_SPANS)),
+ },
+ })
// Adding a prev value to this query excludes the first result that we
// would normally get.
testQuery(t, ht, &common.Query{
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
new file mode 100644
index 0000000..140b50d
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "org/apache/htrace/common"
+ "time"
+)
+
+type Heartbeater struct {
+ // The name of this heartbeater
+ name string
+
+ // How long to sleep between heartbeats, in milliseconds.
+ periodMs int64
+
+ // The logger to use.
+ lg *common.Logger
+
+ // The channels to send the heartbeat on.
+ targets []HeartbeatTarget
+
+ // Incoming requests to the heartbeater. When this is closed, the
+ // heartbeater will exit.
+ req chan *HeartbeatTarget
+}
+
+type HeartbeatTarget struct {
+ // The name of the heartbeat target.
+ name string
+
+ // The channel for the heartbeat target.
+ targetChan chan interface{}
+}
+
+func (tgt *HeartbeatTarget) String() string {
+ return tgt.name
+}
+
+func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater {
+ hb := &Heartbeater{
+ name: name,
+ periodMs: periodMs,
+ lg: lg,
+ targets: make([]HeartbeatTarget, 0, 4),
+ req: make(chan *HeartbeatTarget),
+ }
+ go hb.run()
+ return hb
+}
+
+func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
+ hb.req <- tgt
+}
+
+func (hb *Heartbeater) Shutdown() {
+ close(hb.req)
+}
+
+func (hb *Heartbeater) String() string {
+ return hb.name
+}
+
+func (hb *Heartbeater) run() {
+ period := time.Duration(hb.periodMs) * time.Millisecond
+ for {
+ periodEnd := time.Now().Add(period)
+ for {
+ timeToWait := periodEnd.Sub(time.Now())
+ if timeToWait <= 0 {
+ break
+ } else if timeToWait > period {
+ // Smooth over jitter or clock changes
+ timeToWait = period
+ periodEnd = time.Now().Add(period)
+ }
+ select {
+ case tgt, open := <-hb.req:
+ if !open {
+ hb.lg.Debugf("%s: exiting.\n", hb.String())
+ return
+ }
+ hb.targets = append(hb.targets, *tgt)
+ hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String())
+ case <-time.After(timeToWait):
+ }
+ }
+ for targetIdx := range hb.targets {
+ select {
+ case hb.targets[targetIdx].targetChan <- nil:
+ default:
+ // We failed to send a heartbeat because the other goroutine was busy and
+ // hasn't cleared the previous one from its channel. This could indicate a
+ // stuck goroutine.
+ hb.lg.Infof("%s: could not send heartbeat to %s.\n",
+ hb.String(), hb.targets[targetIdx])
+ }
+ }
+ }
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..cbde7fc
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "testing"
+ "time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ lg := common.NewLogger("heartbeater", cnf)
+ hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+ if hb.String() != "ExampleHeartbeater" {
+ t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+ }
+ hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ lg := common.NewLogger("heartbeater", cnf)
+ // The minimum amount of time which the heartbeater test should take
+ MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD)
+ duration := MINIMUM_TEST_DURATION
+ for duration <= MINIMUM_TEST_DURATION {
+ start := time.Now()
+ testHeartbeaterSendsHeartbeatsImpl(t, lg)
+ end := time.Now()
+ duration = end.Sub(start)
+ lg.Debugf("Measured duration: %v; minimum expected duration: %v\n",
+ duration, MINIMUM_TEST_DURATION)
+ }
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+ hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+ if hb.String() != "ExampleHeartbeater" {
+ t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+ }
+ testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+ gotAllHeartbeats := make(chan bool)
+ hb.AddHeartbeatTarget(&HeartbeatTarget{
+ name: "ExampleHeartbeatTarget",
+ targetChan: testChan,
+ })
+ go func() {
+ for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+ <-testChan
+ }
+ gotAllHeartbeats <- true
+ for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+ _, open := <-testChan
+ if !open {
+ return
+ }
+ }
+ }()
+ <-gotAllHeartbeats
+ hb.Shutdown()
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 354d064..49587bb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,6 +32,7 @@
"net/rpc"
"org/apache/htrace/common"
"org/apache/htrace/conf"
+ "reflect"
)
// Handles HRPC calls
@@ -109,9 +110,10 @@
}
func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+ remoteAddr := cdc.conn.RemoteAddr()
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
- cdc.length, cdc.conn.RemoteAddr())
+ cdc.length, remoteAddr)
}
mh := new(codec.MsgpackHandle)
mh.WriteExt = true
@@ -119,11 +121,16 @@
err := dec.Decode(body)
if err != nil {
return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+
- "body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+ "body from %s: %s", remoteAddr, err.Error()))
}
if cdc.lg.TraceEnabled() {
cdc.lg.Tracef("Read body from %s: %s\n",
- cdc.conn.RemoteAddr(), asJson(&body))
+ remoteAddr, asJson(&body))
+ }
+ val := reflect.ValueOf(body)
+ addr := val.Elem().FieldByName("Addr")
+ if addr.IsValid() {
+ addr.SetString(remoteAddr.String())
}
return nil
}
@@ -203,7 +210,10 @@
if hand.lg.TraceEnabled() {
hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
}
- hand.store.WriteSpan(span)
+ hand.store.WriteSpan(&IncomingSpan{
+ Addr: req.Addr,
+ Span: span,
+ })
}
return nil
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
new file mode 100644
index 0000000..672f5f6
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "encoding/json"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "sync"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager htraced. In particular, we
+// need to know what rate we are receiving spans at, the main places spans came from. If spans
+// were dropped because of a high sampling rates, we need to know which part of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+type ServerSpanMetrics struct {
+ // The total number of spans written to HTraced.
+ Written uint64
+
+ // The total number of spans dropped by the server.
+ ServerDropped uint64
+}
+
+func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
+ return &ServerSpanMetrics{
+ Written: spm.Written,
+ ServerDropped: spm.ServerDropped,
+ }
+}
+
+func (spm *ServerSpanMetrics) String() string {
+ jbytes, err := json.Marshal(*spm)
+ if err != nil {
+ panic(err)
+ }
+ return string(jbytes)
+}
+
+func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
+ spm.Written += ospm.Written
+ spm.ServerDropped += ospm.ServerDropped
+}
+
+func (spm *ServerSpanMetrics) Clear() {
+ spm.Written = 0
+ spm.ServerDropped = 0
+}
+
+// A map from network address strings to ServerSpanMetrics structures.
+type ServerSpanMetricsMap map[string]*ServerSpanMetrics
+
+func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
+ lg *common.Logger) {
+ mtx := smtxMap[addr]
+ if mtx == nil {
+ mtx = &ServerSpanMetrics{}
+ smtxMap[addr] = mtx
+ }
+ mtx.ServerDropped++
+ smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
+ lg *common.Logger) {
+ mtx := smtxMap[addr]
+ if mtx == nil {
+ mtx = &ServerSpanMetrics{}
+ smtxMap[addr] = mtx
+ }
+ mtx.Written++
+ smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
+ if len(smtxMap) >= maxMtx {
+ // Delete a random entry
+ for k := range smtxMap {
+ lg.Warnf("Evicting metrics entry for addr %s "+
+ "because there are more than %d addrs.\n", k, maxMtx)
+ delete(smtxMap, k)
+ return
+ }
+ }
+}
+
+type AccessReq struct {
+ mtxMap common.SpanMetricsMap
+ done chan interface{}
+}
+
+type MetricsSink struct {
+ // The total span metrics.
+ smtxMap ServerSpanMetricsMap
+
+ // A channel of incoming shard metrics.
+ // When this is shut down, the MetricsSink will exit.
+ updateReqs chan ServerSpanMetricsMap
+
+ // A channel of incoming requests for shard metrics.
+ accessReqs chan *AccessReq
+
+ // This will be closed when the MetricsSink has exited.
+ exited chan interface{}
+
+ // The logger used by this MetricsSink.
+ lg *common.Logger
+
+ // The maximum number of metrics totals we will maintain.
+ maxMtx int
+
+ // The number of spans which each client has self-reported that it has
+ // dropped.
+ clientDroppedMap map[string]uint64
+
+ // Lock protecting clientDropped
+ clientDroppedLock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+ mcl := MetricsSink{
+ smtxMap: make(ServerSpanMetricsMap),
+ updateReqs: make(chan ServerSpanMetricsMap, 128),
+ accessReqs: make(chan *AccessReq),
+ exited: make(chan interface{}),
+ lg: common.NewLogger("metrics", cnf),
+ maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+ clientDroppedMap: make(map[string]uint64),
+ }
+ go mcl.run()
+ return &mcl
+}
+
+func (msink *MetricsSink) run() {
+ lg := msink.lg
+ defer func() {
+ lg.Info("MetricsSink: stopping service goroutine.\n")
+ close(msink.exited)
+ }()
+ lg.Tracef("MetricsSink: starting.\n")
+ for {
+ select {
+ case updateReq, open := <-msink.updateReqs:
+ if !open {
+ lg.Trace("MetricsSink: shutting down cleanly.\n")
+ return
+ }
+ for addr, umtx := range updateReq {
+ smtx := msink.smtxMap[addr]
+ if smtx == nil {
+ smtx = &ServerSpanMetrics{}
+ msink.smtxMap[addr] = smtx
+ }
+ smtx.Add(umtx)
+ if lg.TraceEnabled() {
+ lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String())
+ }
+ }
+ msink.smtxMap.Prune(msink.maxMtx, lg)
+ case accessReq := <-msink.accessReqs:
+ msink.handleAccessReq(accessReq)
+ }
+ }
+}
+
+func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
+ msink.lg.Debug("MetricsSink: accessing global metrics.\n")
+ msink.clientDroppedLock.Lock()
+ defer func() {
+ msink.clientDroppedLock.Unlock()
+ close(accessReq.done)
+ }()
+ for addr, smtx := range msink.smtxMap {
+ accessReq.mtxMap[addr] = &common.SpanMetrics{
+ Written: smtx.Written,
+ ServerDropped: smtx.ServerDropped,
+ ClientDropped: msink.clientDroppedMap[addr],
+ }
+ }
+}
+
+func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap {
+ accessReq := &AccessReq{
+ mtxMap: make(common.SpanMetricsMap),
+ done: make(chan interface{}),
+ }
+ msink.accessReqs <- accessReq
+ <-accessReq.done
+ return accessReq.mtxMap
+}
+
+func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
+ msink.updateReqs <- mtxMap
+}
+
+func (msink *MetricsSink) Shutdown() {
+ close(msink.updateReqs)
+ <-msink.exited
+}
+
+func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped uint64) {
+ msink.clientDroppedLock.Lock()
+ defer msink.clientDroppedLock.Unlock()
+ msink.clientDroppedMap[client] = clientDropped
+ if len(msink.clientDroppedMap) >= msink.maxMtx {
+ // Delete a random entry
+ for k := range msink.clientDroppedMap {
+ delete(msink.clientDroppedMap, k)
+ return
+ }
+ }
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..c90d1da
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "reflect"
+ "testing"
+ "time"
+)
+
+func TestMetricsSinkStartupShutdown(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ msink := NewMetricsSink(cnf)
+ msink.Shutdown()
+}
+
+func TestAddSpanMetrics(t *testing.T) {
+ a := &ServerSpanMetrics{
+ Written: 100,
+ ServerDropped: 200,
+ }
+ b := &ServerSpanMetrics{
+ Written: 500,
+ ServerDropped: 100,
+ }
+ a.Add(b)
+ if a.Written != 600 {
+ t.Fatalf("SpanMetrics#Add failed to update #Written")
+ }
+ if a.ServerDropped != 300 {
+ t.Fatalf("SpanMetrics#Add failed to update #Dropped")
+ }
+ if b.Written != 500 {
+ t.Fatalf("SpanMetrics#Add updated b#Written")
+ }
+ if b.ServerDropped != 100 {
+ t.Fatalf("SpanMetrics#Add updated b#Dropped")
+ }
+}
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+ for k, v := range a {
+ if !reflect.DeepEqual(v, b[k]) {
+ return false
+ }
+ }
+ for k, v := range b {
+ if !reflect.DeepEqual(v, a[k]) {
+ return false
+ }
+ }
+ return true
+}
+
+func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
+ for {
+ time.Sleep(1 * time.Millisecond)
+ totals := msink.AccessTotals()
+ if compareTotals(totals, expectedTotals) {
+ return
+ }
+ }
+}
+
+func TestMetricsSinkMessages(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ msink := NewMetricsSink(cnf)
+ totals := msink.AccessTotals()
+ if len(totals) != 0 {
+ t.Fatalf("Expected no data in the MetricsSink to start with.")
+ }
+ msink.UpdateMetrics(ServerSpanMetricsMap{
+ "192.168.0.100": &ServerSpanMetrics{
+ Written: 20,
+ ServerDropped: 10,
+ },
+ })
+ waitForMetrics(msink, common.SpanMetricsMap{
+ "192.168.0.100": &common.SpanMetrics{
+ Written: 20,
+ ServerDropped: 10,
+ },
+ })
+ msink.UpdateMetrics(ServerSpanMetricsMap{
+ "192.168.0.100": &ServerSpanMetrics{
+ Written: 200,
+ ServerDropped: 100,
+ },
+ })
+ msink.UpdateMetrics(ServerSpanMetricsMap{
+ "192.168.0.100": &ServerSpanMetrics{
+ Written: 1000,
+ ServerDropped: 1000,
+ },
+ })
+ waitForMetrics(msink, common.SpanMetricsMap{
+ "192.168.0.100": &common.SpanMetrics{
+ Written: 1220,
+ ServerDropped: 1110,
+ },
+ })
+ msink.UpdateMetrics(ServerSpanMetricsMap{
+ "192.168.0.200": &ServerSpanMetrics{
+ Written: 200,
+ ServerDropped: 100,
+ },
+ })
+ waitForMetrics(msink, common.SpanMetricsMap{
+ "192.168.0.100": &common.SpanMetrics{
+ Written: 1220,
+ ServerDropped: 1110,
+ },
+ "192.168.0.200": &common.SpanMetrics{
+ Written: 200,
+ ServerDropped: 100,
+ },
+ })
+ msink.Shutdown()
+}
+
+func TestMetricsSinkMessagesEviction(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+ cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ msink := NewMetricsSink(cnf)
+ msink.UpdateMetrics(ServerSpanMetricsMap{
+ "192.168.0.100": &ServerSpanMetrics{
+ Written: 20,
+ ServerDropped: 10,
+ },
+ "192.168.0.101": &ServerSpanMetrics{
+ Written: 20,
+ ServerDropped: 10,
+ },
+ "192.168.0.102": &ServerSpanMetrics{
+ Written: 20,
+ ServerDropped: 10,
+ },
+ })
+ for {
+ totals := msink.AccessTotals()
+ if len(totals) == 2 {
+ break
+ }
+ }
+ msink.Shutdown()
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a54f2cb..c2300c4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -90,11 +90,15 @@
}
}
}
+ // Copy the default test configuration values.
+ for k, v := range conf.TEST_VALUES() {
+ _, hasVal := bld.Cnf[k]
+ if !hasVal {
+ bld.Cnf[k] = v
+ }
+ }
bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
- bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server
- bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server
- bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
cnf, err := cnfBld.Build()
if err != nil {
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 16c3a75..eca3f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -90,7 +90,7 @@
type serverConfHandler struct {
cnf *conf.Config
- lg *common.Logger
+ lg *common.Logger
}
func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -233,9 +233,13 @@
if spanIdProblem != "" {
hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
} else {
- hand.store.WriteSpan(span)
+ hand.store.WriteSpan(&IncomingSpan{
+ Addr: req.RemoteAddr,
+ Span: span,
+ })
}
}
+ hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped)
}
type queryHandler struct {