HTRACE-301. htraced: fix unit tests that aren't waiting for spans to be written, use semaphore for WrittenSpans (Colin Patrick McCabe via iwasakims)
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
new file mode 100644
index 0000000..1d56ae9
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+	"sync"
+)
+
+// A simple lock-and-condition-variable based semaphore implementation.
+type Semaphore struct {
+	lock sync.Mutex
+	cond *sync.Cond
+	count int64
+}
+
+func NewSemaphore(count int64) *Semaphore {
+	sem := &Semaphore {
+		count:int64(count),
+	}
+	sem.cond = &sync.Cond {
+		L: &sem.lock,
+	}
+	return sem
+}
+
+func (sem *Semaphore) Post() {
+	sem.lock.Lock()
+	sem.count++
+	if sem.count > 0 {
+		sem.cond.Broadcast()
+	}
+	sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Posts(amt int64) {
+	sem.lock.Lock()
+	sem.count+=amt
+	if sem.count > 0 {
+		sem.cond.Broadcast()
+	}
+	sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Wait() {
+	sem.lock.Lock()
+	for {
+		if sem.count > 0 {
+			sem.count--
+			sem.lock.Unlock()
+			return
+		}
+		sem.cond.Wait()
+	}
+}
+
+func (sem *Semaphore) Waits(amt int64) {
+	var i int64
+	for i=0; i<amt; i++ {
+		sem.Wait()
+	}
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
new file mode 100644
index 0000000..089c51b
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+func TestSemaphoreWake(t *testing.T) {
+	var done uint32
+	sem := NewSemaphore(0)
+	go func() {
+		time.Sleep(10 * time.Nanosecond)
+		atomic.AddUint32(&done, 1)
+		sem.Post()
+	}()
+	sem.Wait()
+	doneVal := atomic.LoadUint32(&done)
+	if doneVal != 1 {
+		t.Fatalf("sem.Wait did not wait for sem.Post")
+	}
+}
+
+func TestSemaphoreCount(t *testing.T) {
+	sem := NewSemaphore(1)
+	sem.Post()
+	sem.Wait()
+	sem.Wait()
+
+	sem = NewSemaphore(-1)
+	sem.Post()
+	sem.Post()
+	sem.Wait()
+}
+
+func TestSemaphoreMultipleGoroutines(t *testing.T) {
+	var done uint32
+	sem := NewSemaphore(0)
+	sem2 := NewSemaphore(0)
+	go func() {
+		sem.Wait()
+		atomic.AddUint32(&done, 1)
+		sem2.Post()
+	}()
+	go func() {
+		time.Sleep(10 * time.Nanosecond)
+		atomic.AddUint32(&done, 1)
+		sem.Post()
+	}()
+	go func() {
+		time.Sleep(20 * time.Nanosecond)
+		atomic.AddUint32(&done, 1)
+		sem.Post()
+	}()
+	sem.Wait()
+	go func() {
+		time.Sleep(10 * time.Nanosecond)
+		atomic.AddUint32(&done, 1)
+		sem.Post()
+	}()
+	sem.Wait()
+	sem2.Wait()
+	doneVal := atomic.LoadUint32(&done)
+	if doneVal != 4 {
+		t.Fatalf("sem.Wait did not wait for sem.Posts")
+	}
+}
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index e4f2151..fae871c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -46,6 +46,7 @@
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
+	defer hcl.Close()
 	_, err = hcl.GetServerVersion()
 	if err != nil {
 		t.Fatalf("failed to call GetServerVersion: %s", err.Error())
@@ -91,7 +92,9 @@
 
 func TestClientOperations(t *testing.T) {
 	htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
-		DataDirs: make([]string, 2)}
+		DataDirs: make([]string, 2),
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		t.Fatalf("failed to create datastore: %s", err.Error())
@@ -102,6 +105,7 @@
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
+	defer hcl.Close()
 
 	// Create some random trace spans.
 	NUM_TEST_SPANS := 30
@@ -115,6 +119,7 @@
 		t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
 			err.Error())
 	}
+	ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS/2))
 
 	// Look up the first half of the spans.  They should be found.
 	var span *common.Span
@@ -181,7 +186,12 @@
 
 func TestDumpAll(t *testing.T) {
 	htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
-		DataDirs: make([]string, 2)}
+		DataDirs: make([]string, 2),
+		WrittenSpans: common.NewSemaphore(0),
+		Cnf: map[string]string{
+			conf.HTRACE_LOG_LEVEL: "INFO",
+		},
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		t.Fatalf("failed to create datastore: %s", err.Error())
@@ -192,6 +202,7 @@
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
+	defer hcl.Close()
 
 	NUM_TEST_SPANS := 100
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
@@ -202,7 +213,8 @@
 	if err != nil {
 		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}
-	out := make(chan *common.Span, 50)
+	ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
+	out := make(chan *common.Span, NUM_TEST_SPANS)
 	var dumpErr error
 	go func() {
 		dumpErr = hcl.DumpAll(3, out)
@@ -253,6 +265,7 @@
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
+	defer hcl.Close()
 	serverCnf, err2 := hcl.GetServerConf()
 	if err2 != nil {
 		t.Fatalf("failed to call GetServerConf: %s", err2.Error())
@@ -293,7 +306,7 @@
 		Cnf: map[string]string{
 			conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
 		},
-		WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+		WrittenSpans: common.NewSemaphore(0),
 		HrpcTestHooks: testHooks,
 	}
 	ht, err := htraceBld.Build()
@@ -319,9 +332,7 @@
 		}(iter)
 	}
 	wg.Wait()
-	for i := 0; i < TEST_NUM_WRITESPANS; i++ {
-		<-ht.Store.WrittenSpans
-	}
+	ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
 }
 
 // Tests that HRPC I/O timeouts work.
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index a4bb320..8cd1526 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -108,8 +108,8 @@
 	// A channel for incoming heartbeats
 	heartbeats chan interface{}
 
-	// The channel we will send a bool to when we exit.
-	exited chan bool
+	// Tracks whether the shard goroutine has exited.
+	exited sync.WaitGroup
 
 	// Per-address metrics
 	mtxMap ServerSpanMetricsMap
@@ -123,7 +123,7 @@
 	lg := shd.store.lg
 	defer func() {
 		lg.Infof("Shard processor for %s exiting.\n", shd.path)
-		shd.exited <- true
+		shd.exited.Done()
 	}()
 	for {
 		select {
@@ -289,8 +289,7 @@
 	}
 	shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
 	if shd.store.WrittenSpans != nil {
-		shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n")
-		shd.store.WrittenSpans <- span
+		shd.store.WrittenSpans.Post()
 	}
 	return nil
 }
@@ -325,9 +324,7 @@
 	lg := shd.store.lg
 	shd.incoming <- nil
 	lg.Infof("Waiting for %s to exit...\n", shd.path)
-	if shd.exited != nil {
-		<-shd.exited
-	}
+	shd.exited.Wait()
 	shd.ldb.Close()
 	lg.Infof("Closed %s...\n", shd.path)
 }
@@ -345,8 +342,8 @@
 	// A channel used to send heartbeats to the reaper
 	heartbeats chan interface{}
 
-	// A channel used to block until the reaper goroutine has exited.
-	exited chan interface{}
+	// Tracks whether the reaper goroutine has exited
+	exited sync.WaitGroup
 
 	// The lock protecting reaper data.
 	lock sync.Mutex
@@ -363,7 +360,6 @@
 		lg:           common.NewLogger("reaper", cnf),
 		spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
 		heartbeats:   make(chan interface{}, 1),
-		exited:       make(chan interface{}),
 	}
 	if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
 		rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
@@ -372,6 +368,7 @@
 	}
 	rpr.hb = NewHeartbeater("ReaperHeartbeater",
 		cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+	rpr.exited.Add(1)
 	go rpr.run()
 	rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
 		name:       "reaper",
@@ -390,7 +387,7 @@
 func (rpr *Reaper) run() {
 	defer func() {
 		rpr.lg.Info("Exiting Reaper goroutine.\n")
-		rpr.exited <- nil
+		rpr.exited.Done()
 	}()
 
 	for {
@@ -442,7 +439,6 @@
 func (rpr *Reaper) Shutdown() {
 	rpr.hb.Shutdown()
 	close(rpr.heartbeats)
-	<-rpr.exited
 }
 
 // The Data Store.
@@ -458,9 +454,9 @@
 	// The write options to use for LevelDB.
 	writeOpts *levigo.WriteOptions
 
-	// If non-null, a channel we will send spans to once we finish writing them.  This is only used
-	// for testing.
-	WrittenSpans chan *common.Span
+	// If non-null, a semaphore we will increment once for each span we receive.
+	// Used for testing.
+	WrittenSpans *common.Semaphore
 
 	// The metrics sink.
 	msink *MetricsSink
@@ -475,7 +471,7 @@
 	startMs int64
 }
 
-func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
+func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) {
 	// Get the configuration.
 	clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
 	dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
@@ -513,10 +509,10 @@
 	store.rpr = NewReaper(cnf)
 	for idx := range store.shards {
 		shd := store.shards[idx]
-		shd.exited = make(chan bool, 1)
 		shd.heartbeats = make(chan interface{}, 1)
 		shd.mtxMap = make(ServerSpanMetricsMap)
 		shd.maxMtx = store.msink.maxMtx
+		shd.exited.Add(1)
 		go shd.processIncoming()
 	}
 	store.hb = NewHeartbeater("DatastoreHeartbeater",
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0443834..d9f4a0a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -31,6 +31,7 @@
 	"sort"
 	"strings"
 	"testing"
+	"time"
 )
 
 // Test creating and tearing down a datastore.
@@ -74,14 +75,11 @@
 func createSpans(spans []common.Span, store *dataStore) {
 	for idx := range spans {
 		store.WriteSpan(&IncomingSpan{
-			Addr: "127.0.0.1:1234",
+			Addr: "127.0.0.1",
 			Span: &spans[idx],
 		})
 	}
-	// Wait the spans to be created
-	for i := 0; i < len(spans); i++ {
-		<-store.WrittenSpans
-	}
+	store.WrittenSpans.Waits(int64(len(spans)))
 }
 
 // Test creating a datastore and adding some spans.
@@ -91,7 +89,8 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
@@ -99,12 +98,6 @@
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
 
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
-
 	span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
 	if span == nil {
 		t.Fatal()
@@ -160,7 +153,8 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
@@ -168,7 +162,7 @@
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
 	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
+		"127.0.0.1": &common.SpanMetrics{
 			Written: uint64(len(SIMPLE_TEST_SPANS)),
 		},
 	})
@@ -191,7 +185,8 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
@@ -199,7 +194,7 @@
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
 	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
+		"127.0.0.1": &common.SpanMetrics{
 			Written: uint64(len(SIMPLE_TEST_SPANS)),
 		},
 	})
@@ -248,7 +243,8 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
@@ -256,7 +252,7 @@
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
 	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
+		"127.0.0.1": &common.SpanMetrics{
 			Written: uint64(len(SIMPLE_TEST_SPANS)),
 		},
 	})
@@ -305,18 +301,15 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
+
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -355,13 +348,20 @@
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
 			conf.HTRACE_LOG_LEVEL: "INFO",
 		},
-		WrittenSpans: make(chan *common.Span, b.N)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
-		panic(err)
+		b.Fatalf("Error creating MiniHTraced: %s\n", err.Error())
 	}
-	defer ht.Close()
-	rnd := rand.New(rand.NewSource(1))
+	ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N)
+	defer func() {
+		if r := recover(); r != nil {
+			ht.Store.lg.Infof("panic: %s\n", r.(error))
+		}
+		ht.Close()
+	}()
+	rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
 	allSpans := make([]*common.Span, b.N)
 	for n := range(allSpans) {
 		allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
@@ -379,9 +379,7 @@
 		})
 	}
 	// Wait for all the spans to be written.
-	for n := 0; n < b.N; n++ {
-		<-ht.Store.WrittenSpans
-	}
+	ht.Store.WrittenSpans.Waits(int64(b.N))
 	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
 		"127.0.0.1": &common.SpanMetrics{
 			Written: uint64(b.N), // should be less than?
@@ -394,7 +392,10 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
+		DataDirs: make([]string, 2),
+		KeepDataDirsOnClose: true,
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		t.Fatalf("failed to create datastore: %s", err.Error())
@@ -424,6 +425,7 @@
 	if err != nil {
 		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}
+	ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
 
 	// Look up the spans we wrote.
 	var span *common.Span
@@ -494,7 +496,8 @@
 		Cnf: map[string]string{
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, 100)}
+		WrittenSpans: common.NewSemaphore(0),
+	}
 	ht, err := htraceBld.Build()
 	if err != nil {
 		panic(err)
@@ -502,7 +505,7 @@
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
 	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1:1234": &common.SpanMetrics{
+		"127.0.0.1": &common.SpanMetrics{
 			Written: uint64(len(SIMPLE_TEST_SPANS)),
 		},
 	})
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 353beae..667a17b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -53,8 +53,8 @@
 	// If true, we will keep the data dirs around after MiniHTraced#Close
 	KeepDataDirsOnClose bool
 
-	// If non-null, the WrittenSpans channel to use when creating the DataStore.
-	WrittenSpans chan *common.Span
+	// If non-null, the WrittenSpans semaphore to use when creating the DataStore.
+	WrittenSpans *common.Semaphore
 
 	// The test hooks to use for the HRPC server
 	HrpcTestHooks *hrpcTestHooks
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index 7aef1d1..dcc916a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -45,7 +45,7 @@
 			conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
 			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
 		},
-		WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+		WrittenSpans: common.NewSemaphore(0),
 		DataDirs:     make([]string, 2),
 	}
 	ht, err := htraceBld.Build()
@@ -54,14 +54,12 @@
 	}
 	for i := range testSpans {
 		ht.Store.WriteSpan(&IncomingSpan{
-			Addr: "127.0.0.1:1234",
+			Addr: "127.0.0.1",
 			Span: testSpans[i],
 		})
 	}
 	// Wait the spans to be created
-	for i := 0; i < len(testSpans); i++ {
-		<-ht.Store.WrittenSpans
-	}
+	ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
 	// Set a reaper date that will remove all the spans except final one.
 	ht.Store.rpr.SetReaperDate(now)