blob: 7b649145fca6cd225434fdc1bbefffcf22983717 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"fmt"
"github.com/ugorji/go/codec"
"math"
"math/rand"
htrace "org/apache/htrace/client"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"org/apache/htrace/test"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestClientGetServerVersion(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion",
DataDirs: make([]string, 2)}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
defer hcl.Close()
_, err = hcl.GetServerVersion()
if err != nil {
t.Fatalf("failed to call GetServerVersion: %s", err.Error())
}
}
func TestClientGetServerDebugInfo(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo",
DataDirs: make([]string, 2)}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
defer hcl.Close()
debugInfo, err := hcl.GetServerDebugInfo()
if err != nil {
t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error())
}
if debugInfo.StackTraces == "" {
t.Fatalf(`debugInfo.StackTraces == ""`)
}
if debugInfo.GCStats == "" {
t.Fatalf(`debugInfo.GCStats == ""`)
}
}
func createRandomTestSpans(amount int) common.SpanSlice {
rnd := rand.New(rand.NewSource(2))
allSpans := make(common.SpanSlice, amount)
allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
for i := 1; i < amount; i++ {
allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
}
allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)}
return allSpans
}
func TestClientOperations(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
DataDirs: make([]string, 2),
WrittenSpans: common.NewSemaphore(0),
}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
defer hcl.Close()
// Create some random trace spans.
NUM_TEST_SPANS := 30
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
// Write half of the spans to htraced via the client.
err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
if err != nil {
t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
err.Error())
}
ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2))
// Look up the first half of the spans. They should be found.
var span *common.Span
for i := 0; i < NUM_TEST_SPANS/2; i++ {
span, err = hcl.FindSpan(allSpans[i].Id)
if err != nil {
t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
}
common.ExpectSpansEqual(t, allSpans[i], span)
}
// Look up the second half of the spans. They should not be found.
for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
span, err = hcl.FindSpan(allSpans[i].Id)
if err != nil {
t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
}
if span != nil {
t.Fatalf("Unexpectedly found a span we never write to "+
"the server: FindSpan(%d) succeeded\n", i)
}
}
// Test FindChildren
childSpan := allSpans[1]
parentId := childSpan.Parents[0]
var children []common.SpanId
children, err = hcl.FindChildren(parentId, 1)
if err != nil {
t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
}
if len(children) != 1 {
t.Fatalf("FindChildren(%s) returned an invalid number of "+
"children: expected %d, got %d\n", parentId, 1, len(children))
}
if !children[0].Equal(childSpan.Id) {
t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+
" got %s\n", parentId, childSpan.Id, children[0])
}
// Test FindChildren on a span that has no children
childlessSpan := allSpans[NUM_TEST_SPANS/2]
children, err = hcl.FindChildren(childlessSpan.Id, 10)
if err != nil {
t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error())
}
if len(children) != 0 {
t.Fatalf("FindChildren(%d) returned an invalid number of "+
"children: expected %d, got %d\n", childlessSpan.Id, 0, len(children))
}
// Test Query
var query common.Query
query = common.Query{Lim: 10}
spans, err := hcl.Query(&query)
if err != nil {
t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
}
if len(spans) != 10 {
t.Fatalf("Query({lim: %d}) returned an invalid number of "+
"children: expected %d, got %d\n", 10, 10, len(spans))
}
}
func TestDumpAll(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
DataDirs: make([]string, 2),
WrittenSpans: common.NewSemaphore(0),
Cnf: map[string]string{
conf.HTRACE_LOG_LEVEL: "INFO",
},
}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
defer hcl.Close()
NUM_TEST_SPANS := 100
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
sort.Sort(allSpans)
err = hcl.WriteSpans(allSpans)
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
out := make(chan *common.Span, NUM_TEST_SPANS)
var dumpErr error
go func() {
dumpErr = hcl.DumpAll(3, out)
}()
var numSpans int
nextLogTime := time.Now().Add(time.Millisecond * 5)
for {
span, channelOpen := <-out
if !channelOpen {
break
}
common.ExpectSpansEqual(t, allSpans[numSpans], span)
numSpans++
if testing.Verbose() {
now := time.Now()
if !now.Before(nextLogTime) {
nextLogTime = now
nextLogTime = nextLogTime.Add(time.Millisecond * 5)
fmt.Printf("read back %d span(s)...\n", numSpans)
}
}
}
if numSpans != len(allSpans) {
t.Fatalf("expected to read %d spans... but only read %d\n",
len(allSpans), numSpans)
}
if dumpErr != nil {
t.Fatalf("got dump error %s\n", dumpErr.Error())
}
}
const EXAMPLE_CONF_KEY = "example.conf.key"
const EXAMPLE_CONF_VALUE = "foo.bar.baz"
func TestClientGetServerConf(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
Cnf: map[string]string{
EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
},
DataDirs: make([]string, 2)}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
defer hcl.Close()
serverCnf, err2 := hcl.GetServerConf()
if err2 != nil {
t.Fatalf("failed to call GetServerConf: %s", err2.Error())
}
if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
t.Fatalf("unexpected value for %s: %s",
EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
}
}
const TEST_NUM_HRPC_HANDLERS = 2
const TEST_NUM_WRITESPANS = 4
// Tests that HRPC limits the number of simultaneous connections being processed.
func TestHrpcAdmissionsControl(t *testing.T) {
var wg sync.WaitGroup
wg.Add(TEST_NUM_WRITESPANS)
var numConcurrentHrpcCalls int32
testHooks := &hrpcTestHooks{
HandleAdmission: func() {
defer wg.Done()
n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
if n > TEST_NUM_HRPC_HANDLERS {
t.Fatalf("The number of concurrent HRPC calls went above "+
"%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n)
}
time.Sleep(1 * time.Millisecond)
n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
if n >= TEST_NUM_HRPC_HANDLERS {
t.Fatalf("The number of concurrent HRPC calls went above "+
"%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n+1)
}
},
}
htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
DataDirs: make([]string, 2),
Cnf: map[string]string{
conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
},
WrittenSpans: common.NewSemaphore(0),
HrpcTestHooks: testHooks,
}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
// Create some random trace spans.
allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
go func(i int) {
err = hcl.WriteSpans(allSpans[i : i+1])
if err != nil {
t.Fatalf("WriteSpans failed: %s\n", err.Error())
}
}(iter)
}
wg.Wait()
ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
}
// Tests that HRPC I/O timeouts work.
func TestHrpcIoTimeout(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
DataDirs: make([]string, 2),
Cnf: map[string]string{
conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
},
}
ht, err := htraceBld.Build()
if err != nil {
t.Fatalf("failed to create datastore: %s", err.Error())
}
defer ht.Close()
var hcl *htrace.Client
finishClient := make(chan interface{})
defer func() {
// Close the finishClient channel, if it hasn't already been closed.
defer func() { recover() }()
close(finishClient)
}()
testHooks := &htrace.TestHooks{
HandleWriteRequestBody: func() {
<-finishClient
},
}
hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
// Create some random trace spans.
allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
var wg sync.WaitGroup
wg.Add(TEST_NUM_WRITESPANS)
for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
go func(i int) {
defer wg.Done()
// Ignore the error return because there are internal retries in
// the client which will make this succeed eventually, usually.
// Keep in mind that we only block until we have seen
// TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
// we let requests through so that the test can exit cleanly.
hcl.WriteSpans(allSpans[i : i+1])
}(iter)
}
for {
if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
break
}
time.Sleep(1000 * time.Nanosecond)
}
close(finishClient)
wg.Wait()
}
func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
Cnf: map[string]string{
conf.HTRACE_LOG_LEVEL: "INFO",
conf.HTRACE_NUM_HRPC_HANDLERS: "20",
},
WrittenSpans: common.NewSemaphore(int64(1 - N)),
}
ht, err := htraceBld.Build()
if err != nil {
panic(err)
}
defer ht.Close()
rnd := rand.New(rand.NewSource(1))
allSpans := make([]*common.Span, N)
for n := 0; n < N; n++ {
allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
}
// Determine how many calls to WriteSpans we should make. Each writeSpans
// message should be small enough so that it doesn't exceed the max RPC
// body length limit. TODO: a production-quality golang client would do
// this internally rather than needing us to do it here in the unit test.
bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
reqs := make([][]*common.Span, 0, 4)
curReq := -1
curReqLen := bodyLen
var curReqSpans uint32
mh := new(codec.MsgpackHandle)
mh.WriteExt = true
var mbuf [8192]byte
buf := mbuf[:0]
enc := codec.NewEncoderBytes(&buf, mh)
for n := 0; n < N; n++ {
span := allSpans[n]
if (curReqSpans >= maxSpansPerRpc) ||
(curReqLen >= bodyLen) {
reqs = append(reqs, make([]*common.Span, 0, 16))
curReqLen = 0
curReq++
curReqSpans = 0
}
buf = mbuf[:0]
enc.ResetBytes(&buf)
err := enc.Encode(span)
if err != nil {
panic(fmt.Sprintf("Error encoding span %s: %s\n",
span.String(), err.Error()))
}
bufLen := len(buf)
if bufLen > (bodyLen / 5) {
panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
}
curReqLen += bufLen
reqs[curReq] = append(reqs[curReq], span)
curReqSpans++
}
ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs))
var hcl *htrace.Client
hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
panic(fmt.Sprintf("failed to create client: %s", err.Error()))
}
defer hcl.Close()
// Reset the timer to avoid including the time required to create new
// random spans in the benchmark total.
if b != nil {
b.ResetTimer()
}
// Write many random spans.
for reqIdx := range reqs {
go func(i int) {
err = hcl.WriteSpans(reqs[i])
if err != nil {
panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
i, err.Error()))
}
}(reqIdx)
}
// Wait for all the spans to be written.
ht.Store.WrittenSpans.Wait()
}
// This is a test of how quickly we can create new spans via WriteSpans RPCs.
// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
// Unlike that benchmark, it sends the spans via RPC.
// Suggested flags for running this:
// -tags unsafe -cpu 16 -benchtime=1m
func BenchmarkWriteSpans(b *testing.B) {
doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
}
func TestWriteSpansRpcs(t *testing.T) {
doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
}