HTRACE-88. Add REST query API to htraced (cmccabe)
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index fbcdcc6..52fe78e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -106,6 +106,25 @@
 	return spanIds, nil
 }
 
+// Make a query
+func (hcl *Client) Query(query *common.Query) ([]common.Span, error) {
+	in, err := json.Marshal(query)
+	if err != nil {
+		return nil, errors.New(fmt.Sprintf("Error marshalling query: %s", err.Error()))
+	}
+	var out []byte
+	out, _, err = hcl.makeRestRequest("GET", "query", bytes.NewReader(in))
+	if err != nil {
+		return nil, err
+	}
+	var spans []common.Span
+	err = json.Unmarshal(out, &spans)
+	if err != nil {
+		return nil, errors.New(fmt.Sprintf("Error unmarshalling results: %s", err.Error()))
+	}
+	return spans, nil
+}
+
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
 	return hcl.makeRestRequest("GET", reqName, nil)
 }
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go b/htrace-core/src/go/src/org/apache/htrace/common/query.go
new file mode 100644
index 0000000..0c909a1
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+	"encoding/json"
+)
+
+//
+// Represents queries that can be sent to htraced.
+//
+// Each query consists of set of predicates that will be 'AND'ed together to
+// return a set of spans.  Predicates contain an operation, a field, and a
+// value.
+//
+// For example, a query might be "return the first 100 spans between 5:00pm
+// and 5:01pm"  This query would have two predicates: time greater than or
+// equal to 5:00pm, and time less than or equal to 5:01pm.
+// In HTrace, times are always expressed in milliseconds since the Epoch.
+// So this would become:
+// { "lim" : 100, "pred" : [
+//   { "op" : "ge", "field" : "begin", "val" : 1234 },
+//   { "op" : "le", "field" : "begin", "val" : 5678 },
+// ] }
+//
+// Where '1234' and '5678' were replaced by times since the epoch in
+// milliseconds.
+//
+
+type Op string
+
+const (
+	CONTAINS               Op = "cn"
+	EQUALS                 Op = "eq"
+	LESS_THAN_OR_EQUALS    Op = "le"
+	GREATER_THAN_OR_EQUALS Op = "ge"
+)
+
+func (op Op) IsDescending() bool {
+	return op == LESS_THAN_OR_EQUALS
+}
+
+type Field string
+
+const (
+	SPAN_ID     Field = "spanid"
+	DESCRIPTION Field = "description"
+	BEGIN_TIME  Field = "begin"
+	END_TIME    Field = "end"
+	DURATION    Field = "duration"
+)
+
+type Predicate struct {
+	Op    Op     `json:"op"`
+	Field Field  `json:"field"`
+	Val   string `val:"val"`
+}
+
+type Query struct {
+	Predicates []Predicate `json:"pred"`
+	Lim        int         `json:"lim"`
+}
+
+func (query *Query) String() string {
+	buf, err := json.Marshal(query)
+	if err != nil {
+		panic(err)
+	}
+	return string(buf)
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go
index 36e716a..64975d2 100644
--- a/htrace-core/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go
@@ -80,7 +80,11 @@
 	if b[len(b)-1] != DOUBLE_QUOTE {
 		return errors.New("Expected spanID to end with a string quote.")
 	}
-	v, err := strconv.ParseUint(string(b[1:len(b)-1]), 16, 64)
+	return id.FromString(string(b[1 : len(b)-1]))
+}
+
+func (id *SpanId) FromString(str string) error {
+	v, err := strconv.ParseUint(str, 16, 64)
 	if err != nil {
 		return err
 	}
@@ -111,3 +115,8 @@
 	}
 	return jbytes
 }
+
+// Compute the span duration.  We ignore overflow since we never deal with negative times.
+func (span *Span) Duration() int64 {
+	return span.End - span.Begin
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
index 40678bd..523b7ab 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -22,10 +22,13 @@
 import (
 	"bytes"
 	"encoding/gob"
+	"errors"
+	"fmt"
 	"github.com/jmhodges/levigo"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"os"
+	"strconv"
 	"strings"
 	"sync/atomic"
 	"syscall"
@@ -49,14 +52,23 @@
 // Schema
 // m -> dataStoreMetadata
 // s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
 // p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-// t[8-byte-big-endian-time][8-byte-big-endian-child-sid] -> {}
 //
 
 const DATA_STORE_VERSION = 1
 
 var EMPTY_BYTE_BUF []byte = []byte{}
 
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
 type Statistics struct {
 	NumSpansWritten uint64
 }
@@ -190,15 +202,21 @@
 	if err != nil {
 		return err
 	}
-	batch.Put(makeKey('s', span.Id.Val()), spanDataBuf.Bytes())
+	batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes())
 
 	// Add this to the parent index.
 	for parentIdx := range span.Parents {
-		batch.Put(makeSecondaryKey('p', span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
+		batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
+			span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
 	}
 
-	// Add this to the timeline index.
-	batch.Put(makeSecondaryKey('t', span.Begin, span.Id.Val()), EMPTY_BYTE_BUF)
+	// Add to the other secondary indices.
+	batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin,
+		span.Id.Val()), EMPTY_BYTE_BUF)
+	batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End,
+		span.Id.Val()), EMPTY_BYTE_BUF)
+	batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(),
+		span.Id.Val()), EMPTY_BYTE_BUF)
 
 	err = shd.ldb.Write(shd.store.writeOpts, batch)
 	if err != nil {
@@ -407,21 +425,30 @@
 			shd.path, sid, err.Error())
 		return nil
 	}
-	r := bytes.NewBuffer(buf)
-	decoder := gob.NewDecoder(r)
-	data := common.SpanData{}
-	err = decoder.Decode(&data)
+	var span *common.Span
+	span, err = shd.decodeSpan(sid, buf)
 	if err != nil {
 		lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
 			shd.path, sid, err.Error())
 		return nil
 	}
+	return span
+}
+
+func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) {
+	r := bytes.NewBuffer(buf)
+	decoder := gob.NewDecoder(r)
+	data := common.SpanData{}
+	err := decoder.Decode(&data)
+	if err != nil {
+		return nil, err
+	}
 	// Gob encoding translates empty slices to nil.  Reverse this so that we're always dealing with
 	// non-nil slices.
 	if data.Parents == nil {
 		data.Parents = []common.SpanId{}
 	}
-	return &common.Span{Id: common.SpanId(sid), SpanData: data}
+	return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
 }
 
 // Find the children of a given span id.
@@ -453,5 +480,371 @@
 	return childIds
 }
 
-//func (store *dataStore) FindByTimeRange(startTime int64, endTime int64, lim int32) []int64 {
-//}
+type predicateData struct {
+	*common.Predicate
+	intKey int64
+	strKey string
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+	p := predicateData{Predicate: pred}
+
+	// Parse the input value given to make sure it matches up with the field
+	// type.
+	switch pred.Field {
+	case common.SPAN_ID:
+		// Span IDs are sent as hex strings.
+		var id common.SpanId
+		if err := id.FromString(pred.Val); err != nil {
+			return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
+				pred.Val, err.Error()))
+		}
+		p.intKey = id.Val()
+		break
+	case common.DESCRIPTION:
+		// Any string is valid for a description.
+		p.strKey = pred.Val
+		break
+	case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+		// Base-10 numeric fields.
+		v, err := strconv.ParseInt(pred.Val, 10, 64)
+		if err != nil {
+			return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
+				pred.Field, pred.Val, err.Error()))
+		}
+		p.intKey = v
+		break
+	default:
+		return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
+	}
+
+	// Validate the predicate operation.
+	switch pred.Op {
+	case common.EQUALS, common.LESS_THAN_OR_EQUALS, common.GREATER_THAN_OR_EQUALS:
+		break
+	case common.CONTAINS:
+		if p.fieldIsNumeric() {
+			return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
+				"numeric field like '%s'", pred.Field))
+		}
+	default:
+		return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
+			pred.Op))
+	}
+
+	return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+	switch pred.Field {
+	case common.SPAN_ID:
+		return SPAN_ID_INDEX_PREFIX
+	case common.BEGIN_TIME:
+		return BEGIN_TIME_INDEX_PREFIX
+	case common.END_TIME:
+		return END_TIME_INDEX_PREFIX
+	case common.DURATION:
+		return DURATION_INDEX_PREFIX
+	default:
+		return INVALID_INDEX_PREFIX
+	}
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+	switch pred.Field {
+	case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
+		return true
+	default:
+		return false
+	}
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, string) {
+	switch pred.Field {
+	case common.SPAN_ID:
+		return span.Id.Val(), ""
+	case common.DESCRIPTION:
+		return 0, span.Description
+	case common.BEGIN_TIME:
+		return span.Begin, ""
+	case common.END_TIME:
+		return span.End, ""
+	case common.DURATION:
+		return span.Duration(), ""
+	default:
+		panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field))
+	}
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
+	// nil is after everything.
+	if a == nil {
+		if b == nil {
+			return false
+		}
+		return false
+	} else if b == nil {
+		return true
+	}
+	// Compare the spans according to this predicate.
+	aInt, aStr := pred.extractRelevantSpanData(a)
+	bInt, bStr := pred.extractRelevantSpanData(b)
+	if pred.fieldIsNumeric() {
+		if pred.Op.IsDescending() {
+			return aInt > bInt
+		} else {
+			return aInt < bInt
+		}
+	} else {
+		if pred.Op.IsDescending() {
+			return aStr > bStr
+		} else {
+			return aStr < bStr
+		}
+	}
+}
+
+// Returns true if the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) bool {
+	intVal, strVal := pred.extractRelevantSpanData(span)
+	if pred.fieldIsNumeric() {
+		switch pred.Op {
+		case common.EQUALS:
+			return intVal == pred.intKey
+		case common.LESS_THAN_OR_EQUALS:
+			return intVal <= pred.intKey
+		case common.GREATER_THAN_OR_EQUALS:
+			return intVal >= pred.intKey
+		default:
+			panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+				"during normalization", pred.Op))
+		}
+	} else {
+		switch pred.Op {
+		case common.CONTAINS:
+			return strings.Contains(strVal, pred.strKey)
+		case common.EQUALS:
+			return strVal == pred.strKey
+		case common.LESS_THAN_OR_EQUALS:
+			return strVal <= pred.strKey
+		case common.GREATER_THAN_OR_EQUALS:
+			return strVal >= pred.strKey
+		default:
+			panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+				"during normalization", pred.Op))
+		}
+	}
+}
+
+func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+	var ret *source
+	src := source{store: store,
+		pred:      pred,
+		iters:     make([]*levigo.Iterator, 0, len(store.shards)),
+		nexts:     make([]*common.Span, len(store.shards)),
+		numRead:   make([]int, len(store.shards)),
+		keyPrefix: pred.getIndexPrefix(),
+	}
+	if src.keyPrefix == INVALID_INDEX_PREFIX {
+		return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
+			"predicate on field %s", pred.Field))
+	}
+	defer func() {
+		if ret == nil {
+			src.Close()
+		}
+	}()
+	for shardIdx := range store.shards {
+		shd := store.shards[shardIdx]
+		src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
+	}
+	searchKey := makeKey(src.keyPrefix, pred.intKey)
+	for i := range src.iters {
+		src.iters[i].Seek(searchKey)
+	}
+	ret = &src
+	return ret, nil
+}
+
+// A source of spans.
+type source struct {
+	store     *dataStore
+	pred      *predicateData
+	iters     []*levigo.Iterator
+	nexts     []*common.Span
+	numRead   []int
+	keyPrefix byte
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+	lg := src.store.lg
+	var err error
+	iter := src.iters[shardIdx]
+	if iter == nil {
+		lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
+		return // There are no more entries in this shard.
+	}
+	if src.nexts[shardIdx] != nil {
+		lg.Debugf("No need to populate shard %d\n", shardIdx)
+		return // We already have a valid entry for this shard.
+	}
+	for {
+		if !iter.Valid() {
+			lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
+			break // Can't read past end of DB
+		}
+		src.numRead[shardIdx]++
+		key := iter.Key()
+		if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
+			lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s",
+				shardIdx, string(src.keyPrefix))
+			break // Can't read past end of indexed section
+		}
+		var span *common.Span
+		var sid int64
+		if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+			// The span id maps to the span itself.
+			sid = keyToInt(key[1:])
+			span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
+			if err != nil {
+				lg.Debugf("Internal error decoding span %016x in shard %d: %s\n",
+					sid, shardIdx, err.Error())
+				break
+			}
+		} else {
+			// With a secondary index, we have to look up the span by id.
+			sid = keyToInt(key[9:])
+			span = src.store.shards[shardIdx].FindSpan(sid)
+			if span == nil {
+				lg.Debugf("Internal error rehydrating span %016x in shard %d\n",
+					sid, shardIdx)
+				break
+			}
+		}
+		if src.pred.Op.IsDescending() {
+			iter.Prev()
+		} else {
+			iter.Next()
+		}
+		if src.pred.satisfiedBy(span) {
+			lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
+			src.nexts[shardIdx] = span // Found valid entry
+			return
+		} else {
+			lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n",
+				sid, shardIdx)
+			if src.numRead[shardIdx] <= 1 && src.pred.Op.IsDescending() {
+				// When dealing with descending predicates, the first span we read might not satisfy
+				// the predicate, even though subsequent ones will.  This is because the iter.Seek()
+				// function "moves the iterator the position of the key given or, if the key doesn't
+				// exist, the next key that does exist in the database."  So if we're on that "next
+				// key" it will not satisfy the predicate, but the keys previous to it might.
+				continue
+			}
+			// This and subsequent entries don't satisfy predicate
+			break
+		}
+	}
+	lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+	iter.Close()
+	src.iters[shardIdx] = nil
+}
+
+func (src *source) next() *common.Span {
+	for shardIdx := range src.iters {
+		src.populateNextFromShard(shardIdx)
+	}
+	var best *common.Span
+	bestIdx := -1
+	for shardIdx := range src.iters {
+		span := src.nexts[shardIdx]
+		if src.pred.spanPtrIsBefore(span, best) {
+			best = span
+			bestIdx = shardIdx
+		}
+	}
+	if bestIdx >= 0 {
+		src.nexts[bestIdx] = nil
+	}
+	return best
+}
+
+func (src *source) Close() {
+	for i := range src.iters {
+		if src.iters[i] != nil {
+			src.iters[i].Close()
+		}
+	}
+	src.iters = nil
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
+	// Read spans from the first predicate that is indexed.
+	p := *preds
+	for i := range p {
+		pred := p[i]
+		if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+			*preds = append(p[0:i], p[i+1:]...)
+			return pred.createSource(store)
+		}
+	}
+	// If there are no predicates that are indexed, read rows in order of span id.
+	spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+		Field: common.SPAN_ID,
+		Val:   "0000000000000000",
+	}
+	spanIdPredData, err := loadPredicateData(&spanIdPred)
+	if err != nil {
+		return nil, err
+	}
+	return spanIdPredData.createSource(store)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
+	lg := store.lg
+	// Parse predicate data.
+	var err error
+	preds := make([]*predicateData, len(query.Predicates))
+	for i := range query.Predicates {
+		preds[i], err = loadPredicateData(&query.Predicates[i])
+		if err != nil {
+			return nil, err
+		}
+	}
+	// Get a source of rows.
+	var src *source
+	src, err = store.obtainSource(&preds)
+	if err != nil {
+		return nil, err
+	}
+	defer src.Close()
+	lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
+
+	// Filter the spans through the remaining predicates.
+	ret := make([]*common.Span, 0, 32)
+	for {
+		if len(ret) >= query.Lim {
+			break // we hit the result size limit
+		}
+		span := src.next()
+		if span == nil {
+			break // the source has no more spans to give
+		}
+		lg.Debugf("src.next returned span %s\n", span.ToJson())
+		satisfied := true
+		for predIdx := range preds {
+			if !preds[predIdx].satisfiedBy(span) {
+				satisfied = false
+				break
+			}
+		}
+		if satisfied {
+			ret = append(ret, span)
+		}
+	}
+	return ret, nil
+}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index f0449fe..3330723 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -20,6 +20,8 @@
 package main
 
 import (
+	"bytes"
+	"encoding/json"
 	"math/rand"
 	"org/apache/htrace/common"
 	"org/apache/htrace/test"
@@ -116,6 +118,159 @@
 	}
 }
 
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+	expectedSpans []common.Span) {
+	spans, err := ht.Store.HandleQuery(query)
+	if err != nil {
+		t.Fatalf("First query failed: %s\n", err.Error())
+	}
+	expectedBuf := new(bytes.Buffer)
+	dec := json.NewEncoder(expectedBuf)
+	err = dec.Encode(expectedSpans)
+	if err != nil {
+		t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error())
+	}
+	spansBuf := new(bytes.Buffer)
+	dec = json.NewEncoder(spansBuf)
+	err = dec.Encode(spans)
+	if err != nil {
+		t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error())
+	}
+	t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+		len(expectedSpans))
+	common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+	t.Parallel()
+	htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+		WrittenSpans: make(chan *common.Span, 100)}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		panic(err)
+	}
+	defer ht.Close()
+	createSpans(SIMPLE_TEST_SPANS, ht.Store)
+	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+		t.Fatal()
+	}
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.GREATER_THAN_OR_EQUALS,
+				Field: common.BEGIN_TIME,
+				Val:   "125",
+			},
+		},
+		Lim: 5,
+	}, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+	t.Parallel()
+	htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+		WrittenSpans: make(chan *common.Span, 100)}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		panic(err)
+	}
+	defer ht.Close()
+	createSpans(SIMPLE_TEST_SPANS, ht.Store)
+	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+		t.Fatal()
+	}
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.LESS_THAN_OR_EQUALS,
+				Field: common.BEGIN_TIME,
+				Val:   "125",
+			},
+		},
+		Lim: 5,
+	}, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.LESS_THAN_OR_EQUALS,
+				Field: common.BEGIN_TIME,
+				Val:   "125",
+			},
+			common.Predicate{
+				Op:    common.EQUALS,
+				Field: common.DESCRIPTION,
+				Val:   "getFileDescriptors",
+			},
+		},
+		Lim: 2,
+	}, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.EQUALS,
+				Field: common.DESCRIPTION,
+				Val:   "getFileDescriptors",
+			},
+		},
+		Lim: 2,
+	}, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+	t.Parallel()
+	htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+		WrittenSpans: make(chan *common.Span, 100)}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		panic(err)
+	}
+	defer ht.Close()
+	createSpans(SIMPLE_TEST_SPANS, ht.Store)
+	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+		t.Fatal()
+	}
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.CONTAINS,
+				Field: common.DESCRIPTION,
+				Val:   "Fd",
+			},
+			common.Predicate{
+				Op:    common.GREATER_THAN_OR_EQUALS,
+				Field: common.BEGIN_TIME,
+				Val:   "100",
+			},
+		},
+		Lim: 5,
+	}, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.LESS_THAN_OR_EQUALS,
+				Field: common.SPAN_ID,
+				Val:   "0",
+			},
+		},
+		Lim: 200,
+	}, []common.Span{})
+
+	testQuery(t, ht, &common.Query{
+		Predicates: []common.Predicate{
+			common.Predicate{
+				Op:    common.LESS_THAN_OR_EQUALS,
+				Field: common.SPAN_ID,
+				Val:   "2",
+			},
+		},
+		Lim: 200,
+	}, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
 func BenchmarkDatastoreWrites(b *testing.B) {
 	htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
 		WrittenSpans: make(chan *common.Span, b.N)}
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index efc89e1..39e5744 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -141,7 +141,9 @@
 	children := hand.store.FindChildren(sid, lim)
 	jbytes, err := json.Marshal(children)
 	if err != nil {
-		panic(err)
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Error marshalling children: %s", err.Error()))
+		return
 	}
 	w.Write(jbytes)
 }
@@ -173,6 +175,42 @@
 	}
 }
 
+type queryHandler struct {
+	dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	_, ok := hand.getReqField32("lim", w, req)
+	if !ok {
+		return
+	}
+	var query common.Query
+	dec := json.NewDecoder(req.Body)
+	err := dec.Decode(&query)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Error parsing query: %s", err.Error()))
+		return
+	}
+	var results []*common.Span
+	results, err = hand.store.HandleQuery(&query)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Internal error processing query %s: %s",
+				query.String(), err.Error()))
+		return
+	}
+	var jbytes []byte
+	jbytes, err = json.Marshal(results)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Error marshalling results: %s", err.Error()))
+		return
+	}
+	w.Write(jbytes)
+}
+
 type defaultServeHandler struct {
 	lg *common.Logger
 }
@@ -225,6 +263,9 @@
 		store: store, lg: rsv.lg}}
 	r.Handle("/writeSpans", writeSpansH).Methods("POST")
 
+	queryH := &queryHandler{dataStoreHandler: dataStoreHandler{store: store}}
+	r.Handle("/query", queryH).Methods("GET")
+
 	span := r.PathPrefix("/span").Subrouter()
 	findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}}
 	span.Handle("/{id}", findSidH).Methods("GET")