HTRACE-301. htraced: fix unit tests that aren't waiting for spans to be written, use semaphore for WrittenSpans (Colin Patrick McCabe via iwasakims)
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
new file mode 100644
index 0000000..1d56ae9
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+ "sync"
+)
+
+// A simple lock-and-condition-variable based semaphore implementation.
+type Semaphore struct {
+ lock sync.Mutex
+ cond *sync.Cond
+ count int64
+}
+
+func NewSemaphore(count int64) *Semaphore {
+ sem := &Semaphore {
+ count:int64(count),
+ }
+ sem.cond = &sync.Cond {
+ L: &sem.lock,
+ }
+ return sem
+}
+
+func (sem *Semaphore) Post() {
+ sem.lock.Lock()
+ sem.count++
+ if sem.count > 0 {
+ sem.cond.Broadcast()
+ }
+ sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Posts(amt int64) {
+ sem.lock.Lock()
+ sem.count+=amt
+ if sem.count > 0 {
+ sem.cond.Broadcast()
+ }
+ sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Wait() {
+ sem.lock.Lock()
+ for {
+ if sem.count > 0 {
+ sem.count--
+ sem.lock.Unlock()
+ return
+ }
+ sem.cond.Wait()
+ }
+}
+
+func (sem *Semaphore) Waits(amt int64) {
+ var i int64
+ for i=0; i<amt; i++ {
+ sem.Wait()
+ }
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
new file mode 100644
index 0000000..089c51b
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestSemaphoreWake(t *testing.T) {
+ var done uint32
+ sem := NewSemaphore(0)
+ go func() {
+ time.Sleep(10 * time.Nanosecond)
+ atomic.AddUint32(&done, 1)
+ sem.Post()
+ }()
+ sem.Wait()
+ doneVal := atomic.LoadUint32(&done)
+ if doneVal != 1 {
+ t.Fatalf("sem.Wait did not wait for sem.Post")
+ }
+}
+
+func TestSemaphoreCount(t *testing.T) {
+ sem := NewSemaphore(1)
+ sem.Post()
+ sem.Wait()
+ sem.Wait()
+
+ sem = NewSemaphore(-1)
+ sem.Post()
+ sem.Post()
+ sem.Wait()
+}
+
+func TestSemaphoreMultipleGoroutines(t *testing.T) {
+ var done uint32
+ sem := NewSemaphore(0)
+ sem2 := NewSemaphore(0)
+ go func() {
+ sem.Wait()
+ atomic.AddUint32(&done, 1)
+ sem2.Post()
+ }()
+ go func() {
+ time.Sleep(10 * time.Nanosecond)
+ atomic.AddUint32(&done, 1)
+ sem.Post()
+ }()
+ go func() {
+ time.Sleep(20 * time.Nanosecond)
+ atomic.AddUint32(&done, 1)
+ sem.Post()
+ }()
+ sem.Wait()
+ go func() {
+ time.Sleep(10 * time.Nanosecond)
+ atomic.AddUint32(&done, 1)
+ sem.Post()
+ }()
+ sem.Wait()
+ sem2.Wait()
+ doneVal := atomic.LoadUint32(&done)
+ if doneVal != 4 {
+ t.Fatalf("sem.Wait did not wait for sem.Posts")
+ }
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index e4f2151..fae871c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -46,6 +46,7 @@
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
+ defer hcl.Close()
_, err = hcl.GetServerVersion()
if err != nil {
t.Fatalf("failed to call GetServerVersion: %s", err.Error())
@@ -91,7 +92,9 @@
func TestClientOperations(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
- DataDirs: make([]string, 2)}
+ DataDirs: make([]string, 2),
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
@@ -102,6 +105,7 @@
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
+ defer hcl.Close()
// Create some random trace spans.
NUM_TEST_SPANS := 30
@@ -115,6 +119,7 @@
t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
err.Error())
}
+ ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS/2))
// Look up the first half of the spans. They should be found.
var span *common.Span
@@ -181,7 +186,12 @@
func TestDumpAll(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
- DataDirs: make([]string, 2)}
+ DataDirs: make([]string, 2),
+ WrittenSpans: common.NewSemaphore(0),
+ Cnf: map[string]string{
+ conf.HTRACE_LOG_LEVEL: "INFO",
+ },
+ }
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
@@ -192,6 +202,7 @@
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
+ defer hcl.Close()
NUM_TEST_SPANS := 100
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
@@ -202,7 +213,8 @@
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
- out := make(chan *common.Span, 50)
+ ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
+ out := make(chan *common.Span, NUM_TEST_SPANS)
var dumpErr error
go func() {
dumpErr = hcl.DumpAll(3, out)
@@ -253,6 +265,7 @@
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
+ defer hcl.Close()
serverCnf, err2 := hcl.GetServerConf()
if err2 != nil {
t.Fatalf("failed to call GetServerConf: %s", err2.Error())
@@ -293,7 +306,7 @@
Cnf: map[string]string{
conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
},
- WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+ WrittenSpans: common.NewSemaphore(0),
HrpcTestHooks: testHooks,
}
ht, err := htraceBld.Build()
@@ -319,9 +332,7 @@
}(iter)
}
wg.Wait()
- for i := 0; i < TEST_NUM_WRITESPANS; i++ {
- <-ht.Store.WrittenSpans
- }
+ ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
}
// Tests that HRPC I/O timeouts work.
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index a4bb320..8cd1526 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -108,8 +108,8 @@
// A channel for incoming heartbeats
heartbeats chan interface{}
- // The channel we will send a bool to when we exit.
- exited chan bool
+ // Tracks whether the shard goroutine has exited.
+ exited sync.WaitGroup
// Per-address metrics
mtxMap ServerSpanMetricsMap
@@ -123,7 +123,7 @@
lg := shd.store.lg
defer func() {
lg.Infof("Shard processor for %s exiting.\n", shd.path)
- shd.exited <- true
+ shd.exited.Done()
}()
for {
select {
@@ -289,8 +289,7 @@
}
shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
if shd.store.WrittenSpans != nil {
- shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n")
- shd.store.WrittenSpans <- span
+ shd.store.WrittenSpans.Post()
}
return nil
}
@@ -325,9 +324,7 @@
lg := shd.store.lg
shd.incoming <- nil
lg.Infof("Waiting for %s to exit...\n", shd.path)
- if shd.exited != nil {
- <-shd.exited
- }
+ shd.exited.Wait()
shd.ldb.Close()
lg.Infof("Closed %s...\n", shd.path)
}
@@ -345,8 +342,8 @@
// A channel used to send heartbeats to the reaper
heartbeats chan interface{}
- // A channel used to block until the reaper goroutine has exited.
- exited chan interface{}
+ // Tracks whether the reaper goroutine has exited
+ exited sync.WaitGroup
// The lock protecting reaper data.
lock sync.Mutex
@@ -363,7 +360,6 @@
lg: common.NewLogger("reaper", cnf),
spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
heartbeats: make(chan interface{}, 1),
- exited: make(chan interface{}),
}
if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
@@ -372,6 +368,7 @@
}
rpr.hb = NewHeartbeater("ReaperHeartbeater",
cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+ rpr.exited.Add(1)
go rpr.run()
rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
name: "reaper",
@@ -390,7 +387,7 @@
func (rpr *Reaper) run() {
defer func() {
rpr.lg.Info("Exiting Reaper goroutine.\n")
- rpr.exited <- nil
+ rpr.exited.Done()
}()
for {
@@ -442,7 +439,6 @@
func (rpr *Reaper) Shutdown() {
rpr.hb.Shutdown()
close(rpr.heartbeats)
- <-rpr.exited
}
// The Data Store.
@@ -458,9 +454,9 @@
// The write options to use for LevelDB.
writeOpts *levigo.WriteOptions
- // If non-null, a channel we will send spans to once we finish writing them. This is only used
- // for testing.
- WrittenSpans chan *common.Span
+ // If non-null, a semaphore we will increment once for each span we receive.
+ // Used for testing.
+ WrittenSpans *common.Semaphore
// The metrics sink.
msink *MetricsSink
@@ -475,7 +471,7 @@
startMs int64
}
-func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
+func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) {
// Get the configuration.
clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
@@ -513,10 +509,10 @@
store.rpr = NewReaper(cnf)
for idx := range store.shards {
shd := store.shards[idx]
- shd.exited = make(chan bool, 1)
shd.heartbeats = make(chan interface{}, 1)
shd.mtxMap = make(ServerSpanMetricsMap)
shd.maxMtx = store.msink.maxMtx
+ shd.exited.Add(1)
go shd.processIncoming()
}
store.hb = NewHeartbeater("DatastoreHeartbeater",
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0443834..d9f4a0a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -31,6 +31,7 @@
"sort"
"strings"
"testing"
+ "time"
)
// Test creating and tearing down a datastore.
@@ -74,14 +75,11 @@
func createSpans(spans []common.Span, store *dataStore) {
for idx := range spans {
store.WriteSpan(&IncomingSpan{
- Addr: "127.0.0.1:1234",
+ Addr: "127.0.0.1",
Span: &spans[idx],
})
}
- // Wait the spans to be created
- for i := 0; i < len(spans); i++ {
- <-store.WrittenSpans
- }
+ store.WrittenSpans.Waits(int64(len(spans)))
}
// Test creating a datastore and adding some spans.
@@ -91,7 +89,8 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
@@ -99,12 +98,6 @@
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
-
span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
if span == nil {
t.Fatal()
@@ -160,7 +153,8 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
@@ -168,7 +162,7 @@
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
+ "127.0.0.1": &common.SpanMetrics{
Written: uint64(len(SIMPLE_TEST_SPANS)),
},
})
@@ -191,7 +185,8 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
@@ -199,7 +194,7 @@
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
+ "127.0.0.1": &common.SpanMetrics{
Written: uint64(len(SIMPLE_TEST_SPANS)),
},
})
@@ -248,7 +243,8 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
@@ -256,7 +252,7 @@
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
+ "127.0.0.1": &common.SpanMetrics{
Written: uint64(len(SIMPLE_TEST_SPANS)),
},
})
@@ -305,18 +301,15 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
+
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -355,13 +348,20 @@
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
conf.HTRACE_LOG_LEVEL: "INFO",
},
- WrittenSpans: make(chan *common.Span, b.N)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
- panic(err)
+ b.Fatalf("Error creating MiniHTraced: %s\n", err.Error())
}
- defer ht.Close()
- rnd := rand.New(rand.NewSource(1))
+ ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N)
+ defer func() {
+ if r := recover(); r != nil {
+ ht.Store.lg.Infof("panic: %s\n", r.(error))
+ }
+ ht.Close()
+ }()
+ rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
allSpans := make([]*common.Span, b.N)
for n := range(allSpans) {
allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
@@ -379,9 +379,7 @@
})
}
// Wait for all the spans to be written.
- for n := 0; n < b.N; n++ {
- <-ht.Store.WrittenSpans
- }
+ ht.Store.WrittenSpans.Waits(int64(b.N))
waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
"127.0.0.1": &common.SpanMetrics{
Written: uint64(b.N), // should be less than?
@@ -394,7 +392,10 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
+ DataDirs: make([]string, 2),
+ KeepDataDirsOnClose: true,
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
@@ -424,6 +425,7 @@
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
+ ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
// Look up the spans we wrote.
var span *common.Span
@@ -494,7 +496,8 @@
Cnf: map[string]string{
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, 100)}
+ WrittenSpans: common.NewSemaphore(0),
+ }
ht, err := htraceBld.Build()
if err != nil {
panic(err)
@@ -502,7 +505,7 @@
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1:1234": &common.SpanMetrics{
+ "127.0.0.1": &common.SpanMetrics{
Written: uint64(len(SIMPLE_TEST_SPANS)),
},
})
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 353beae..667a17b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -53,8 +53,8 @@
// If true, we will keep the data dirs around after MiniHTraced#Close
KeepDataDirsOnClose bool
- // If non-null, the WrittenSpans channel to use when creating the DataStore.
- WrittenSpans chan *common.Span
+ // If non-null, the WrittenSpans semaphore to use when creating the DataStore.
+ WrittenSpans *common.Semaphore
// The test hooks to use for the HRPC server
HrpcTestHooks *hrpcTestHooks
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index 7aef1d1..dcc916a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -45,7 +45,7 @@
conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1",
conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
},
- WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+ WrittenSpans: common.NewSemaphore(0),
DataDirs: make([]string, 2),
}
ht, err := htraceBld.Build()
@@ -54,14 +54,12 @@
}
for i := range testSpans {
ht.Store.WriteSpan(&IncomingSpan{
- Addr: "127.0.0.1:1234",
+ Addr: "127.0.0.1",
Span: testSpans[i],
})
}
// Wait the spans to be created
- for i := 0; i < len(testSpans); i++ {
- <-ht.Store.WrittenSpans
- }
+ ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
// Set a reaper date that will remove all the spans except final one.
ht.Store.rpr.SetReaperDate(now)