HTRACE-237. Optimize htraced span receiver (cmccabe)
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
index e63d414..33908db 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
@@ -17,6 +17,7 @@
package org.apache.htrace.core;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -170,8 +171,11 @@
Map<String, String> traceInfoMap = span.getKVAnnotations();
if (!traceInfoMap.isEmpty()) {
jgen.writeObjectFieldStart("n");
- for (Map.Entry<String, String> e : traceInfoMap.entrySet()) {
- jgen.writeStringField(e.getKey(), e.getValue());
+ String[] keys = traceInfoMap.keySet().
+ toArray(new String[traceInfoMap.size()]);
+ Arrays.sort(keys);
+ for (String key : keys) {
+ jgen.writeStringField(key, traceInfoMap.get(key));
}
jgen.writeEndObject();
}
diff --git a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
index 7cb4aed..0869ca0 100644
--- a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
+++ b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
@@ -16,8 +16,18 @@
*/
package org.apache.htrace.util;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TimelineAnnotation;
+
import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -88,4 +98,69 @@
Thread.sleep(periodMs);
}
}
+
+ private static long nonZeroRandomLong(Random rand) {
+ long r = 0;
+ do {
+ r = rand.nextLong();
+ } while (r == 0);
+ return r;
+ }
+
+ private static long positiveRandomLong(Random rand) {
+ long r = rand.nextLong();
+ if (r == Long.MIN_VALUE) {
+ // Math.abs can't handle this case
+ return Long.MAX_VALUE;
+ } else if (r > 0) {
+ return r;
+ } else {
+ return -r;
+ }
+ }
+
+ private static String randomString(Random rand) {
+ return new UUID(positiveRandomLong(rand),
+ positiveRandomLong(rand)).toString();
+ }
+
+ public static Span randomSpan(Random rand) {
+ MilliSpan.Builder builder = new MilliSpan.Builder();
+ builder.spanId(
+ new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand)));
+ builder.begin(positiveRandomLong(rand));
+ builder.end(positiveRandomLong(rand));
+ builder.description(randomString(rand));
+ builder.tracerId(randomString(rand));
+ int numParents = rand.nextInt(4);
+ SpanId[] parents = new SpanId[numParents];
+ for (int i = 0; i < numParents; i++) {
+ parents[i] =
+ new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand));
+ }
+ builder.parents(parents);
+ int numTraceInfos = rand.nextInt(4);
+ Map<String, String> traceInfo = new HashMap<String, String>(numTraceInfos);
+ for (int i = 0; i < numTraceInfos; i++) {
+ traceInfo.put(randomString(rand), randomString(rand));
+ }
+ builder.traceInfo(traceInfo);
+ int numTimelineAnnotations = rand.nextInt(4);
+ List<TimelineAnnotation> timeline =
+ new LinkedList<TimelineAnnotation>();
+ for (int i = 0; i < numTimelineAnnotations; i++) {
+ timeline.add(new TimelineAnnotation(positiveRandomLong(rand),
+ randomString(rand)));
+ }
+ builder.timeline(timeline);
+ return builder.build();
+ }
+
+ public static Span[] randomSpans(Random rand, int numSpans) {
+ Span[] spans = new Span[numSpans];
+ for (int i = 0; i < spans.length; i++) {
+ spans[i] = randomSpan(rand);
+ }
+ return spans;
+ }
}
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 2ac8a1e..dd3f8a3 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -114,32 +114,13 @@
func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
var w bytes.Buffer
- var err error
- for i := range req.Spans {
- var buf []byte
- buf, err = json.Marshal(req.Spans[i])
- if err != nil {
- return errors.New(fmt.Sprintf("Error serializing span: %s",
- err.Error()))
- }
- _, err = w.Write(buf)
- if err != nil {
- return errors.New(fmt.Sprintf("Error writing span: %s",
- err.Error()))
- }
- _, err = w.Write([]byte{'\n'})
- //err = io.WriteString(&w, "\n")
- if err != nil {
- return errors.New(fmt.Sprintf("Error writing: %s",
- err.Error()))
- }
+ enc := json.NewEncoder(&w)
+ err := enc.Encode(req)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error serializing span: %s",
+ err.Error()))
}
- customHeaders := make(map[string]string)
- if req.DefaultTrid != "" {
- customHeaders["htrace-trid"] = req.DefaultTrid
- }
- _, _, err = hcl.makeRestRequest("POST", "writeSpans",
- &w, customHeaders)
+ _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
if err != nil {
return err
}
@@ -182,24 +163,19 @@
return spans, nil
}
-var EMPTY = make(map[string]string)
-
func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
- return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
+ return hcl.makeRestRequest("GET", reqName, nil)
}
// Make a general JSON REST request.
// Returns the request body, the response code, and the error.
// Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader,
- customHeaders map[string]string) ([]byte, int, error) {
+func (hcl *Client) makeRestRequest(reqType string, reqName string,
+ reqBody io.Reader) ([]byte, int, error) {
url := fmt.Sprintf("http://%s/%s",
hcl.restAddr, reqName)
req, err := http.NewRequest(reqType, url, reqBody)
req.Header.Set("Content-Type", "application/json")
- for k, v := range customHeaders {
- req.Header.Set(k, v)
- }
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rest.go b/htrace-htraced/go/src/org/apache/htrace/common/rest.go
deleted file mode 100644
index b367ed1..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/common/rest.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package common
-
-// Info returned by /server/info
-type ServerInfo struct {
- // The server release version.
- ReleaseVersion string
-
- // The git hash that this software was built with.
- GitVersion string
-}
-
-// Info returned by /server/stats
-type ServerStats struct {
- Shards []ShardStats
-}
-
-type ShardStats struct {
- Path string
-
- // The approximate number of spans present in this shard. This may be an
- // underestimate.
- ApproxNumSpans uint64
-
- // leveldb.stats information
- LevelDbStats string
-}
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 28521a5..9c7bfad 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,14 +38,39 @@
// A request to write spans to htraced.
type WriteSpansReq struct {
- DefaultTrid string
+ DefaultTrid string `json:",omitempty"`
Spans []*Span
}
+// Info returned by /server/info
+type ServerInfo struct {
+ // The server release version.
+ ReleaseVersion string
+
+ // The git hash that this software was built with.
+ GitVersion string
+}
+
// A response to a WriteSpansReq
type WriteSpansResp struct {
}
+// Info returned by /server/stats
+type ServerStats struct {
+ Shards []ShardStats
+}
+
+type ShardStats struct {
+ Path string
+
+ // The approximate number of spans present in this shard. This may be an
+ // underestimate.
+ ApproxNumSpans uint64
+
+ // leveldb.stats information
+ LevelDbStats string
+}
+
// The header which is sent over the wire for HRPC
type HrpcRequestHeader struct {
Magic uint32
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
index 9de7cee..7fb128d 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
@@ -92,7 +92,7 @@
End: 5678,
Description: "getFileDescriptors",
Parents: []SpanId{},
- TracerId: "testTracerId",
+ TracerId: "testTracerId",
}}
mh := new(codec.MsgpackHandle)
mh.WriteExt = true
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
index b8f6c84..76af7a6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
@@ -69,7 +69,7 @@
}
func getDefaultHTracedConfDir() string {
- return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf";
+ return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"
}
func getHTracedConfDirs(dlog io.Writer) []string {
@@ -78,7 +78,7 @@
if len(paths) < 1 {
def := getDefaultHTracedConfDir()
io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def))
- return []string{ def }
+ return []string{def}
}
io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir))
return paths
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 8bc0c64..b5549c5 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,8 +31,8 @@
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
- "time"
"strings"
+ "time"
)
var RELEASE_VERSION string
@@ -126,7 +126,7 @@
case serverInfo.FullCommand():
os.Exit(printServerInfo(hcl))
case serverStats.FullCommand():
- if (*serverStatsJson) {
+ if *serverStatsJson {
os.Exit(printServerStatsJson(hcl))
} else {
os.Exit(printServerStats(hcl))
@@ -195,7 +195,7 @@
}
fmt.Printf("HTraced server stats:\n")
fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
- for i := range(stats.Shards) {
+ for i := range stats.Shards {
shard := stats.Shards[i]
fmt.Printf("==== %s ===\n", shard.Path)
fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans)
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 0595d36..9fb9920 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -953,19 +953,19 @@
}
func (store *dataStore) ServerStats() *common.ServerStats {
- serverStats := common.ServerStats {
- Shards : make([]common.ShardStats, len(store.shards)),
+ serverStats := common.ServerStats{
+ Shards: make([]common.ShardStats, len(store.shards)),
}
- for shardIdx := range(store.shards) {
+ for shardIdx := range store.shards {
shard := store.shards[shardIdx]
serverStats.Shards[shardIdx].Path = shard.path
- r := levigo.Range {
- Start : append([]byte{SPAN_ID_INDEX_PREFIX},
+ r := levigo.Range{
+ Start: append([]byte{SPAN_ID_INDEX_PREFIX},
common.INVALID_SPAN_ID.Val()...),
- Limit : append([]byte{SPAN_ID_INDEX_PREFIX + 1},
+ Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1},
common.INVALID_SPAN_ID.Val()...),
}
- vals := shard.ldb.GetApproximateSizes([]levigo.Range { r })
+ vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
serverStats.Shards[shardIdx].LevelDbStats =
shard.ldb.PropertyValue("leveldb.stats")
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 97b2bca..cbfc508 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -193,37 +193,24 @@
} else {
dec = json.NewDecoder(req.Body)
}
- spans := make([]*common.Span, 0, 32)
- defaultTrid := req.Header.Get("htrace-trid")
- for {
- var span common.Span
- err := dec.Decode(&span)
- if err != nil {
- if err != io.EOF {
- writeError(hand.lg, w, http.StatusBadRequest,
- fmt.Sprintf("Error parsing spans: %s", err.Error()))
- return
- }
- break
- }
- spanIdProblem := span.Id.FindProblem()
- if spanIdProblem != "" {
- writeError(hand.lg, w, http.StatusBadRequest,
- fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
- return
- }
- if span.TracerId == "" {
- span.TracerId = defaultTrid
- }
- spans = append(spans, &span)
+ var msg common.WriteSpansReq
+ err := dec.Decode(&msg)
+ if (err != nil) && (err != io.EOF) {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
+ return
}
hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n",
- len(spans), defaultTrid)
- for spanIdx := range spans {
+ len(msg.Spans), msg.DefaultTrid)
+ for spanIdx := range msg.Spans {
if hand.lg.DebugEnabled() {
- hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson())
+ hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson())
}
- hand.store.WriteSpan(spans[spanIdx])
+ span := msg.Spans[spanIdx]
+ if span.TracerId == "" {
+ span.TracerId = msg.DefaultTrid
+ }
+ hand.store.WriteSpan(span)
}
}
diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml
index 88fd2fc..4478dc5 100644
--- a/htrace-htraced/pom.xml
+++ b/htrace-htraced/pom.xml
@@ -86,6 +86,10 @@
<pattern>org.eclipse.jetty</pattern>
<shadedPattern>org.apache.htrace.shaded.jetty</shadedPattern>
</relocation>
+ <dependency>
+ <pattern>org.msgpack</pattern>
+ <shadedPattern>org.apache.htrace.msgpack</shadedPattern>
+ </dependency>
</relocations>
</configuration>
<goals>
@@ -200,12 +204,11 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- <!-- htraced rest client deps. -->
- <!--Is this too much? Pulls down jetty-http, jetty-server, jetty-io
- This is new-style jetty client, jetty9 and jdk7 required.
- It can do async but we will use it synchronously at first.
- Has nice tutorial: http://www.eclipse.org/jetty/documentation/9.1.5.v20140505/http-client-api.html
- -->
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>0.7.0-p9</version>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
new file mode 100644
index 0000000..29816eb
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+
+import org.apache.htrace.core.Span;
+
+/**
+ * A buffer which contains span data and is able to send it over the network.
+ *
+ * BufferManager functions are not thread-safe. You must rely on external
+ * synchronization to protect buffers from concurrent operations.
+ */
+interface BufferManager {
+ /**
+ * Write a span to this buffer.
+ *
+ * @param span The span to write.
+ *
+ * @throws IOException If the buffer doesn't have enough space to hold the
+ * new span. We will not write a partial span to the
+ * buffer in this case.
+ */
+ void writeSpan(Span span) throws IOException;
+
+ /**
+ * Get the amount of content currently in the buffer.
+ */
+ int contentLength();
+
+ /**
+ * Get the number of spans currently buffered.
+ */
+ int getNumberOfSpans();
+
+ /**
+ * Prepare the buffers to be flushed.
+ */
+ void prepare() throws IOException;
+
+ /**
+ * Flush this buffer to htraced.
+ *
+ * This is a blocking operation which will not return until the buffer is
+ * completely flushed.
+ */
+ void flush() throws IOException;
+
+ /**
+ * Clear the data in this buffer.
+ */
+ void clear();
+
+ /**
+ * Closes the buffer manager and frees all resources.
+ */
+ void close();
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
new file mode 100644
index 0000000..cdd176f
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The configuration of the HTracedSpanReceiver.
+ *
+ * This class extracts all the relevant configuration information for
+ * HTracedSpanReceiver from the HTraceConfiguration. It performs parsing and
+ * bounds-checking for the configuration keys.
+ *
+ * It is more efficient to store the configuration as final values in this
+ * structure than to access the HTraceConfiguration object directly. This is
+ * especially true when the HTraceConfiguration object is a thin shim around a
+ * Hadoop Configuration object, which requires synchronization to access.
+ */
+class Conf {
+ private static final Log LOG = LogFactory.getLog(Conf.class);
+
+ /**
+ * Address of the htraced server.
+ */
+ final static String ADDRESS_KEY =
+ "htraced.receiver.address";
+
+ /**
+ * The minimum number of milliseconds to wait for a read or write
+ * operation on the network.
+ */
+ final static String IO_TIMEOUT_MS_KEY =
+ "htraced.receiver.io.timeout.ms";
+ final static int IO_TIMEOUT_MS_DEFAULT = 60000;
+ final static int IO_TIMEOUT_MS_MIN = 50;
+
+ /**
+ * The minimum number of milliseconds to wait for a network
+ * connection attempt.
+ */
+ final static String CONNECT_TIMEOUT_MS_KEY =
+ "htraced.receiver.connect.timeout.ms";
+ final static int CONNECT_TIMEOUT_MS_DEFAULT = 60000;
+ final static int CONNECT_TIMEOUT_MS_MIN = 50;
+
+ /**
+ * The minimum number of milliseconds to keep alive a connection when it's
+ * not in use.
+ */
+ final static String IDLE_TIMEOUT_MS_KEY =
+ "htraced.receiver.idle.timeout.ms";
+ final static int IDLE_TIMEOUT_MS_DEFAULT = 60000;
+ final static int IDLE_TIMEOUT_MS_MIN = 0;
+
+ /**
+ * Configure the retry times to use when an attempt to flush spans to
+ * htraced fails. This is configured as a comma-separated list of delay
+ * times in milliseconds. If the configured value is empty, no retries
+ * will be made.
+ */
+ final static String FLUSH_RETRY_DELAYS_KEY =
+ "htraced.flush.retry.delays.key";
+ final static String FLUSH_RETRY_DELAYS_DEFAULT =
+ "1000,30000";
+
+ /**
+ * The maximum length of time to go in between flush attempts.
+ * Once this time elapses, a flush will be triggered even if we don't
+ * have that many spans buffered.
+ */
+ final static String MAX_FLUSH_INTERVAL_MS_KEY =
+ "htraced.receiver.max.flush.interval.ms";
+ final static int MAX_FLUSH_INTERVAL_MS_DEFAULT = 60000;
+ final static int MAX_FLUSH_INTERVAL_MS_MIN = 10;
+
+ /**
+ * Whether or not to use msgpack for span serialization.
+ * If this key is false, JSON over REST will be used.
+ * If this key is true, msgpack over custom RPC will be used.
+ */
+ final static String PACKED_KEY =
+ "htraced.receiver.packed";
+ final static boolean PACKED_DEFAULT = true;
+
+ /**
+ * The size of the span buffers.
+ */
+ final static String BUFFER_SIZE_KEY =
+ "htraced.receiver.buffer.size";
+ final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+ static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
+ // The maximum buffer size should not be longer than
+ // PackedBuffer.MAX_HRPC_BODY_LENGTH.
+ final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+
+ /**
+ * Set the fraction of the span buffer which needs to fill up before we
+ * will automatically trigger a flush. This is a fraction, not a percentage.
+ * It is between 0 and 1.
+ */
+ final static String BUFFER_SEND_TRIGGER_FRACTION_KEY =
+ "htraced.receiver.buffer.send.trigger.fraction";
+ final static double BUFFER_SEND_TRIGGER_FRACTION_DEFAULT = 0.5;
+ final static double BUFFER_SEND_TRIGGER_FRACTION_MIN = 0.1;
+
+ /**
+ * The length of time which receiveSpan should wait for a free spot in a
+ * span buffer before giving up and dropping the span
+ */
+ final static String SPAN_DROP_TIMEOUT_MS_KEY =
+ "htraced.max.buffer.full.retry.ms.key";
+ final static int SPAN_DROP_TIMEOUT_MS_DEFAULT = 5000;
+
+ /**
+ * The length of time we should wait between displaying log messages on the
+ * rate-limited loggers.
+ */
+ final static String ERROR_LOG_PERIOD_MS_KEY =
+ "htraced.error.log.period.ms";
+ final static long ERROR_LOG_PERIOD_MS_DEFAULT = 30000L;
+
+ @JsonProperty("ioTimeoutMs")
+ final int ioTimeoutMs;
+
+ @JsonProperty("connectTimeoutMs")
+ final int connectTimeoutMs;
+
+ @JsonProperty("idleTimeoutMs")
+ final int idleTimeoutMs;
+
+ @JsonProperty("flushRetryDelays")
+ final int[] flushRetryDelays;
+
+ @JsonProperty("maxFlushIntervalMs")
+ final int maxFlushIntervalMs;
+
+ @JsonProperty("packed")
+ final boolean packed;
+
+ @JsonProperty("bufferSize")
+ final int bufferSize;
+
+ @JsonProperty("spanDropTimeoutMs")
+ final int spanDropTimeoutMs;
+
+ @JsonProperty("errorLogPeriodMs")
+ final long errorLogPeriodMs;
+
+ @JsonProperty("triggerSize")
+ final int triggerSize;
+
+ @JsonProperty("endpointStr")
+ final String endpointStr;
+
+ @JsonProperty("endpoint")
+ final InetSocketAddress endpoint;
+
+ private static int getBoundedInt(final HTraceConfiguration conf,
+ String key, int defaultValue, int minValue, int maxValue) {
+ int val = conf.getInt(key, defaultValue);
+ if (val < minValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
+ "of " + minValue + " instead.");
+ return minValue;
+ } else if (val > maxValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
+ "of " + maxValue + " instead.");
+ return maxValue;
+ }
+ return val;
+ }
+
+ private static long getBoundedLong(final HTraceConfiguration conf,
+ String key, long defaultValue, long minValue, long maxValue) {
+ String strVal = conf.get(key, Long.toString(defaultValue));
+ long val = 0;
+ try {
+ val = Long.parseLong(strVal);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Bad value for '" + key +
+ "': should be long");
+ }
+ if (val < minValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
+ "of " + minValue + " instead.");
+ return minValue;
+ } else if (val > maxValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
+ "of " + maxValue + " instead.");
+ return maxValue;
+ }
+ return val;
+ }
+
+ private static double getBoundedDouble(final HTraceConfiguration conf,
+ String key, double defaultValue, double minValue, double maxValue) {
+ String strVal = conf.get(key, Double.toString(defaultValue));
+ double val = 0;
+ try {
+ val = Double.parseDouble(strVal);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Bad value for '" + key +
+ "': should be double");
+ }
+ if (val < minValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
+ "of " + minValue + " instead.");
+ return minValue;
+ }
+ if (val > maxValue) {
+ LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
+ "of " + maxValue + " instead.");
+ return maxValue;
+ }
+ return val;
+ }
+
+ private static int parseColonPort(String portStr) throws IOException {
+ int colonPosition = portStr.indexOf(':');
+ if (colonPosition != 0) {
+ throw new IOException("Invalid port string " + portStr);
+ }
+ int port = Integer.parseInt(portStr.substring(1), 10);
+ if ((port < 0) || (port > 65535)) {
+ throw new IOException("Invalid port number " + port);
+ }
+ return port;
+ }
+
+ /**
+ * Parse a hostname:port or ip:port pair.
+ *
+ * @param str The string to parse.
+ * @return The socket address.
+ */
+ InetSocketAddress parseHostPortPair(String str) throws IOException {
+ str = str.trim();
+ if (str.isEmpty()) {
+ throw new IOException("No hostname:port pair given.");
+ }
+ int bracketBegin = str.indexOf('[');
+ if (bracketBegin == 0) {
+ // Parse an ipv6-style address enclosed in square brackets.
+ int bracketEnd = str.indexOf(']');
+ if (bracketEnd < 0) {
+ throw new IOException("Found left bracket, but no corresponding " +
+ "right bracket, in " + str);
+ }
+ String host = str.substring(bracketBegin + 1, bracketEnd);
+ int port = parseColonPort(str.substring(bracketEnd + 1));
+ return InetSocketAddress.createUnresolved(host, port);
+ } else if (bracketBegin > 0) {
+ throw new IOException("Found a left bracket that wasn't at the " +
+ "start of the host:port pair in " + str);
+ } else {
+ int colon = str.indexOf(':');
+ if (colon <= 0) {
+ throw new IOException("No port component found in " + str);
+ }
+ String host = str.substring(0, colon);
+ int port = parseColonPort(str.substring(colon));
+ return InetSocketAddress.createUnresolved(host, port);
+ }
+ }
+
+ static int[] getIntArray(String arrayStr) {
+ String[] array = arrayStr.split(",");
+ int nonEmptyEntries = 0;
+ for (String str : array) {
+ if (!str.trim().isEmpty()) {
+ nonEmptyEntries++;
+ }
+ }
+ int[] ret = new int[nonEmptyEntries];
+ int i = 0;
+ for (String str : array) {
+ if (!str.trim().isEmpty()) {
+ ret[i++] = Integer.parseInt(str);
+ }
+ }
+ return ret;
+ }
+
+ Conf(HTraceConfiguration conf) throws IOException {
+ this.ioTimeoutMs = getBoundedInt(conf, IO_TIMEOUT_MS_KEY,
+ IO_TIMEOUT_MS_DEFAULT,
+ IO_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+ this.connectTimeoutMs = getBoundedInt(conf, CONNECT_TIMEOUT_MS_KEY,
+ CONNECT_TIMEOUT_MS_DEFAULT,
+ CONNECT_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+ this.idleTimeoutMs = getBoundedInt(conf, IDLE_TIMEOUT_MS_KEY,
+ IDLE_TIMEOUT_MS_DEFAULT,
+ IDLE_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+ this.flushRetryDelays = getIntArray(conf.get(FLUSH_RETRY_DELAYS_KEY,
+ FLUSH_RETRY_DELAYS_DEFAULT));
+ this.maxFlushIntervalMs = getBoundedInt(conf, MAX_FLUSH_INTERVAL_MS_KEY,
+ MAX_FLUSH_INTERVAL_MS_DEFAULT,
+ MAX_FLUSH_INTERVAL_MS_MIN, Integer.MAX_VALUE);
+ this.packed = conf.getBoolean(PACKED_KEY, PACKED_DEFAULT);
+ this.bufferSize = getBoundedInt(conf, BUFFER_SIZE_KEY,
+ BUFFER_SIZE_DEFAULT,
+ BUFFER_SIZE_MIN, BUFFER_SIZE_MAX);
+ double triggerFraction = getBoundedDouble(conf,
+ BUFFER_SEND_TRIGGER_FRACTION_KEY,
+ BUFFER_SEND_TRIGGER_FRACTION_DEFAULT,
+ BUFFER_SEND_TRIGGER_FRACTION_MIN, 1.0);
+ this.spanDropTimeoutMs = conf.getInt(SPAN_DROP_TIMEOUT_MS_KEY,
+ SPAN_DROP_TIMEOUT_MS_DEFAULT);
+ this.errorLogPeriodMs = getBoundedLong(conf, ERROR_LOG_PERIOD_MS_KEY,
+ ERROR_LOG_PERIOD_MS_DEFAULT, 0, Long.MAX_VALUE);
+ this.triggerSize = (int)(this.bufferSize * triggerFraction);
+ try {
+ this.endpointStr = conf.get(ADDRESS_KEY, "");
+ this.endpoint = parseHostPortPair(endpointStr);
+ } catch (IOException e) {
+ throw new IOException("Error reading " + ADDRESS_KEY + ": " +
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
deleted file mode 100644
index 643bbd5..0000000
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanReceiver;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.util.StringContentProvider;
-import org.eclipse.jetty.http.HttpField;
-import org.eclipse.jetty.http.HttpHeader;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-
-/**
- * A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes
- * dependencies and aims for small footprint since this client will be the guest of another,
- * the process traced.
- *
- * <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging. To connect, see
- * jetty logging to commons-logging and see https://issues.apache.org/jira/browse/HADOOP-6807
- * and http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html.
- *
- * <p>This client depends on the REST defined in <code>rest.go</code> in the htraced REST server.
- *
- * <p>Create an instance by doing:
- * <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where conf is an instance
- * of {@link HTraceConfiguration}. See the public keys defined below for what we will look for in
- * the configuration. For example, set {@link #HTRACED_REST_URL_KEY} if
- * <code>htraced</code> is in a non-standard location. Then call
- * <code>receiver.receiveSpan(Span);</code> to send spans to an htraced
- * instance. This method returns immediately. It sends the spans in background.
- *
- * <p>TODO: Shading works?
- * TODO: Add lazy start; don't start background thread till a span gets queued.
- * TODO: Add some metrics; how many times we've run, how many spans and what size we've sent.
- */
-public class HTracedRESTReceiver extends SpanReceiver {
- private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
-
- /**
- * The HttpClient to use for this receiver.
- */
- private final HttpClient httpClient;
-
- /**
- * The maximum number of spans to buffer.
- */
- private final int capacity;
-
- /**
- * REST URL to use writing Spans.
- */
- private final String url;
-
- /**
- * The maximum number of spans to send in a single PUT.
- */
- private final int maxToSendAtATime;
-
- /**
- * Runs background task to do the REST PUT.
- */
- private final PostSpans postSpans;
-
- /**
- * Thread for postSpans
- */
- private final Thread postSpansThread;
-
- /**
- * The connection timeout in milliseconds.
- */
- public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
- private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
-
- /**
- * The idle timeout in milliseconds.
- */
- public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
- private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
-
- /**
- * URL of the htraced REST server we are to talk to.
- */
- public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
- private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/";
-
- /**
- * Maximum size of the queue to accumulate spans in.
- * Cleared by the background thread that does the REST POST to htraced.
- */
- public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity";
- private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
-
- /**
- * Period at which the background thread that does the REST POST to htraced in ms.
- */
- public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
- private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
-
- /**
- * Maximum spans to post to htraced at a time.
- */
- public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY =
- "client.rest.batch.size";
- private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
-
- /**
- * Lock protecting the PostSpans data.
- */
- private ReentrantLock lock = new ReentrantLock();
-
- /**
- * Condition variable used to wake up the PostSpans thread.
- */
- private Condition cond = lock.newCondition();
-
- /**
- * True if we should shut down.
- * Protected by the lock.
- */
- private boolean shutdown = false;
-
- /**
- * Simple bounded queue to hold spans between periodic runs of the httpclient.
- * Protected by the lock.
- */
- private final ArrayDeque<Span> spans;
-
- /**
- * Keep last time we logged we were at capacity; used to prevent flooding of logs with
- * "at capacity" messages.
- */
- private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
-
- /**
- * True if we should flush as soon as possible. Protected by the lock.
- */
- private boolean mustStartFlush;
-
- /**
- * Create an HttpClient instance.
- *
- * @param connTimeout The timeout to use for connecting.
- * @param idleTimeout The idle timeout to use.
- */
- HttpClient createHttpClient(long connTimeout, long idleTimeout) {
- HttpClient httpClient = new HttpClient();
- httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
- this.getClass().getSimpleName()));
- httpClient.setConnectTimeout(connTimeout);
- httpClient.setIdleTimeout(idleTimeout);
- return httpClient;
- }
-
- /**
- * Constructor.
- * You must call {@link #close()} post construction when done.
- * @param conf
- * @throws Exception
- */
- public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
- int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
- CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
- int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
- CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
- this.httpClient = createHttpClient(connTimeout, idleTimeout);
- this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
- this.spans = new ArrayDeque<Span>(capacity);
- // Build up the writeSpans URL.
- URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
- URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
- this.url = url.toString();
- // Period at which we run the background thread that does the REST POST to htraced.
- int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT);
- // Maximum spans to send in one go
- this.maxToSendAtATime =
- conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
- // Start up the httpclient.
- this.httpClient.start();
- // Start the background thread.
- this.postSpans = new PostSpans(periodInMs);
- this.postSpansThread = new Thread(postSpans);
- this.postSpansThread.setDaemon(true);
- this.postSpansThread.setName("PostSpans");
- this.postSpansThread.start();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
- connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
- capacity + ", url=" + url + ", periodInMs=" + periodInMs +
- ", maxToSendAtATime=" + maxToSendAtATime);
- }
- }
-
- /**
- * POST spans runnable.
- * Run on a period. Services the passed in queue taking spans and sending them to traced via http.
- */
- private class PostSpans implements Runnable {
- private final long periodInNs;
- private final ArrayDeque<Span> spanBuf;
-
- private PostSpans(long periodInMs) {
- this.periodInNs = TimeUnit.NANOSECONDS.
- convert(periodInMs, TimeUnit.MILLISECONDS);
- this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
- }
-
- /**
- * The span sending thread.
- *
- * We send a batch of spans for one of two reasons: there are already
- * maxToSendAtATime spans in the buffer, or the client.rest.period.ms
- * has elapsed. The idea is that we want to strike a balance between
- * sending a lot of spans at a time, for efficiency purposes, and
- * making sure that we don't buffer spans locally for too long.
- *
- * The longer we buffer spans locally, the longer we will have to wait
- * to see the results of our client operations in the GUI, and the higher
- * the risk of losing them if the client crashes.
- */
- @Override
- public void run() {
- long waitNs;
- try {
- waitNs = periodInNs;
- while (true) {
- lock.lock();
- try {
- if (shutdown) {
- if (spans.isEmpty()) {
- LOG.debug("Shutting down PostSpans thread...");
- break;
- }
- } else {
- try {
- waitNs = cond.awaitNanos(waitNs);
- if (mustStartFlush) {
- waitNs = 0;
- mustStartFlush = false;
- }
- } catch (InterruptedException e) {
- LOG.info("Got InterruptedException");
- waitNs = 0;
- }
- }
- if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
- shutdown) {
- loadSpanBuf();
- waitNs = periodInNs;
- }
- } finally {
- lock.unlock();
- }
- // Once the lock has been safely released, we can do some network
- // I/O without blocking the client process.
- if (!spanBuf.isEmpty()) {
- sendSpans();
- spanBuf.clear();
- }
- }
- } finally {
- if (httpClient != null) {
- try {
- httpClient.stop();
- } catch (Exception e) {
- LOG.error("Error shutting down httpClient", e);
- }
- }
- spans.clear();
- }
- }
-
- private void loadSpanBuf() {
- for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
- Span span = spans.pollFirst();
- if (span == null) {
- return;
- }
- spanBuf.add(span);
- }
- }
-
- private void sendSpans() {
- try {
- Request request = httpClient.newRequest(url).method(HttpMethod.POST);
- request.header(HttpHeader.CONTENT_TYPE, "application/json");
- StringBuilder bld = new StringBuilder();
- for (Span span : spanBuf) {
- bld.append(span.toJson());
- }
- request.content(new StringContentProvider(bld.toString()));
- ContentResponse response = request.send();
- if (response.getStatus() == HttpStatus.OK_200) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("POSTED " + spanBuf.size() + " spans");
- }
- } else {
- LOG.error("Status: " + response.getStatus());
- LOG.error(response.getHeaders());
- LOG.error(response.getContentAsString());
- }
- } catch (InterruptedException e) {
- LOG.error(e);
- } catch (TimeoutException e) {
- LOG.error(e);
- } catch (ExecutionException e) {
- LOG.error(e);
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
- lock.lock();
- try {
- this.shutdown = true;
- cond.signal();
- } finally {
- lock.unlock();
- }
- try {
- postSpansThread.join(120000);
- if (postSpansThread.isAlive()) {
- LOG.error("Timed out without closing HTracedRESTReceiver(" +
- url + ").");
- } else {
- LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted while joining postSpans", e);
- }
- }
-
- /**
- * Start flushing the buffered spans.
- *
- * Note that even after calling this function, you will still have to wait
- * for the flush to finish happening. This function just starts the flush;
- * it does not block until it has completed. You also do not get
- * "read-after-write consistency" with htraced... the spans that are
- * written may be buffered for a short period of time prior to being
- * readable. This is not a problem for production use (since htraced is not
- * a database), but it means that most unit tests will need a loop in their
- * "can I read what I wrote" tests.
- */
- void startFlushing() {
- LOG.info("Triggering HTracedRESTReceiver flush.");
- lock.lock();
- try {
- mustStartFlush = true;
- cond.signal();
- } finally {
- lock.unlock();
- }
- }
-
- private static long WARN_TIMEOUT_MS = 300000;
-
- @Override
- public void receiveSpan(Span span) {
- boolean added = false;
- lock.lock();
- try {
- if (shutdown) {
- LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
- "is already shut down.");
- return;
- }
- if (spans.size() < capacity) {
- spans.add(span);
- added = true;
- if (spans.size() >= maxToSendAtATime) {
- cond.signal();
- }
- } else {
- cond.signal();
- }
- } finally {
- lock.unlock();
- }
- if (!added) {
- long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
- TimeUnit.NANOSECONDS);
- long last = lastAtCapacityWarningLog.get();
- if (now - last > WARN_TIMEOUT_MS) {
- // Only log every 5 minutes. Any more than this for a guest process
- // is obnoxious.
- if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
- // If the atomic-compare-and-set succeeds, we should log. Otherwise,
- // we should assume another thread already logged and bumped up the
- // value of lastAtCapacityWarning sometime between our get and the
- // "if" statement.
- LOG.warn("There are too many HTrace spans to buffer! We have " +
- "already buffered " + capacity + " spans. Dropping spans.");
- }
- }
- }
- }
-}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
new file mode 100644
index 0000000..f5f493c
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+
+/**
+ * The SpanReceiver which sends spans to htraced.
+ *
+ * HTracedSpanReceiver sends trace spans out to the htraced daemon, where they
+ * are stored and indexed. It supports two forms of RPC: a JSON/HTTP form, and
+ * an HRPC/msgpack form. We will use the msgpack form when
+ * htraced.receiver.packed is set to true.
+ *
+ * HTraced buffers are several megabytes in size, and we reuse them to avoid
+ * creating extra garbage on the heap. They are flushed whenever a timeout
+ * elapses, or when they get more than a configurable percent full. We allocate
+ * two buffers so that we can continue filling one buffer while the other is
+ * being sent over the wire. The buffers store serialized spans. This is
+ * better than storing references to span objects because it minimzes the amount
+ * of pointers we have to follow during a GC. Buffers are managed by instances
+ * of BufferManager.
+ */
+public class HTracedSpanReceiver extends SpanReceiver {
+ private static final Log LOG = LogFactory.getLog(HTracedSpanReceiver.class);
+
+ private final static int MAX_CLOSING_WAIT_MS = 120000;
+
+ private final FaultInjector faultInjector;
+
+ private final Conf conf;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final Condition wakePostSpansThread = lock.newCondition();
+
+ private final BufferManager bufferManager[] = new BufferManager[2];
+
+ private final RateLimitedLogger flushErrorLog;
+
+ private final RateLimitedLogger spanDropLog;
+
+ private final PostSpansThread thread;
+
+ private boolean shutdown = false;
+
+ private int activeBuf = 0;
+
+ private int flushingBuf = -1;
+
+ private long lastBufferClearedTimeMs = 0;
+
+ static class FaultInjector {
+ static FaultInjector NO_OP = new FaultInjector();
+ public void handleContentLengthTrigger(int len) { }
+ public void handleThreadStart() throws Exception { }
+ public void handleFlush() throws IOException { }
+ }
+
+ public HTracedSpanReceiver(HTraceConfiguration c) throws Exception {
+ this(c, FaultInjector.NO_OP);
+ }
+
+ HTracedSpanReceiver(HTraceConfiguration c,
+ FaultInjector faultInjector) throws Exception {
+ this.faultInjector = faultInjector;
+ this.conf = new Conf(c);
+ if (this.conf.packed) {
+ for (int i = 0; i < bufferManager.length; i++) {
+ bufferManager[i] = new PackedBufferManager(conf);
+ }
+ } else {
+ for (int i = 0; i < bufferManager.length; i++) {
+ bufferManager[i] = new RestBufferManager(conf);
+ }
+ }
+ this.flushErrorLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+ this.spanDropLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+ this.thread = new PostSpansThread();
+ LOG.debug("Created new HTracedSpanReceiver with " + conf.toString());
+ }
+
+ @Override
+ public void receiveSpan(Span span) {
+ long startTimeMs = 0;
+ int numTries = 1;
+ while (true) {
+ lock.lock();
+ try {
+ if (shutdown) {
+ LOG.info("Unable to add span because HTracedSpanReceiver is shutting down.");
+ return;
+ }
+ Throwable exc = null;
+ try {
+ bufferManager[activeBuf].writeSpan(span);
+ int contentLength = bufferManager[activeBuf].contentLength();
+ if (contentLength > conf.triggerSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Triggering buffer #" + activeBuf + " flush because" +
+ " buffer contains " + contentLength + " bytes, and " +
+ "triggerSize is " + conf.triggerSize);
+ }
+ faultInjector.handleContentLengthTrigger(contentLength);
+ wakePostSpansThread.signal();
+ }
+ return;
+ } catch (Exception e) {
+ exc = e;
+ } catch (Error e) {
+ exc = e;
+ }
+ if (startTimeMs == 0) {
+ startTimeMs = TimeUtil.nowMs();
+ }
+ long deltaMs = TimeUtil.deltaMs(startTimeMs, TimeUtil.nowMs());
+ if (deltaMs > conf.spanDropTimeoutMs) {
+ spanDropLog.error("Dropping a span after unsuccessfully " +
+ "attempting to add it for " + deltaMs + " ms. There is not " +
+ "enough buffer space. Please increase " + Conf.BUFFER_SIZE_KEY +
+ " or decrease the rate of spans being generated.");
+ return;
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to write span to buffer #" + activeBuf +
+ " after " + numTries + " attempt(s) and " + deltaMs + " ms" +
+ ". Buffer already has " +
+ bufferManager[activeBuf].getNumberOfSpans() + " spans.",
+ exc);
+ }
+ numTries++;
+ } finally {
+ lock.unlock();
+ }
+ try {
+ Thread.sleep(conf.spanDropTimeoutMs / 3);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ lock.lock();
+ try {
+ shutdown = true;
+ wakePostSpansThread.signal();
+ } finally {
+ lock.unlock();
+ }
+ try {
+ thread.join(MAX_CLOSING_WAIT_MS);
+ } catch (InterruptedException e) {
+ LOG.error("HTracedSpanReceiver#close was interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private class PostSpansThread extends Thread {
+ PostSpansThread() {
+ this.setDaemon(true);
+ this.setName("PostSpans");
+ this.start();
+ }
+
+ private boolean shouldWaitForCond(long timeSinceLastClearedMs) {
+ if (shutdown) {
+ // If we're shutting down, don't wait around.
+ LOG.trace("Should not wait for cond because we're shutting down.");
+ return false;
+ }
+ int contentLength = bufferManager[activeBuf].contentLength();
+ if (contentLength == 0) {
+ // If there is nothing in the buffer, there is nothing to do.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Should wait for cond because we have no data buffered " +
+ "in bufferManager " + activeBuf);
+ }
+ lastBufferClearedTimeMs = TimeUtil.nowMs();
+ return true;
+ } else if (contentLength >= conf.triggerSize) {
+ // If the active buffer is filling up, start flushing.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Should not wait for cond because we have more than " +
+ conf.triggerSize + " bytes buffered in bufferManager " +
+ activeBuf);
+ }
+ return false;
+ }
+ if (timeSinceLastClearedMs > conf.maxFlushIntervalMs) {
+ // If we have let the spans sit in the buffer for too long,
+ // start flushing.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Should not wait for cond because it has been " +
+ timeSinceLastClearedMs + " ms since our last flush, and we " +
+ "are overdue for another because maxFlushIntervalMs is " +
+ conf.maxFlushIntervalMs);
+ }
+ return false;
+ }
+ LOG.trace("Should wait for cond.");
+ return true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ faultInjector.handleThreadStart();
+ LOG.debug("Starting HTracedSpanReceiver thread for " +
+ conf.endpointStr);
+ BufferManager flushBufManager = null;
+ while (true) {
+ lock.lock();
+ flushingBuf = -1;
+ try {
+ while (true) {
+ long timeSinceLastClearedMs = TimeUtil.
+ deltaMs(lastBufferClearedTimeMs, TimeUtil.nowMs());
+ if (!shouldWaitForCond(timeSinceLastClearedMs)) {
+ break;
+ }
+ long waitMs = conf.maxFlushIntervalMs -
+ Math.min(conf.maxFlushIntervalMs, TimeUtil.
+ deltaMs(TimeUtil.nowMs(), lastBufferClearedTimeMs));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting on wakePostSpansThread for " + waitMs +
+ " ms.");
+ }
+ try {
+ wakePostSpansThread.await(waitMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("HTraceSpanReceiver thread was interrupted.", e);
+ throw e;
+ }
+ }
+ if (shutdown && (bufferManager[activeBuf].contentLength() == 0)) {
+ LOG.debug("PostSpansThread shutting down.");
+ return;
+ }
+ flushingBuf = activeBuf;
+ flushBufManager = bufferManager[flushingBuf];
+ activeBuf = (activeBuf == 1) ? 0 : 1;
+ } finally {
+ lock.unlock();
+ }
+ doFlush(flushBufManager);
+ flushBufManager.clear();
+ lastBufferClearedTimeMs = TimeUtil.nowMs();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("setting lastBufferClearedTimeMs to " + lastBufferClearedTimeMs);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("PostSpansThread exiting on unexpected exception", e);
+ } finally {
+ for (int i = 0; i < bufferManager.length; i++) {
+ bufferManager[i].close();
+ }
+ }
+ }
+
+ private void doFlush(BufferManager flushBufManager)
+ throws InterruptedException {
+ try {
+ flushBufManager.prepare();
+ } catch (IOException e) {
+ LOG.error("Failed to prepare buffer containing " +
+ flushBufManager.getNumberOfSpans() + " spans for " +
+ "sending to " + conf.endpointStr + " Discarding " +
+ "all spans.", e);
+ return;
+ }
+ int flushTries = 0;
+ while (true) {
+ Throwable exc;
+ try {
+ faultInjector.handleFlush();
+ flushBufManager.flush();
+ exc = null;
+ } catch (RuntimeException e) {
+ exc = e;
+ } catch (Exception e) {
+ exc = e;
+ }
+ if (exc == null) {
+ return;
+ }
+ int numSpans = flushBufManager.getNumberOfSpans();
+ String excMessage = "Failed to flush " + numSpans + " htrace " +
+ "spans to " + conf.endpointStr + " on try " + (flushTries + 1);
+ if (flushTries >= conf.flushRetryDelays.length) {
+ excMessage += ". Discarding all spans.";
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.error(excMessage, exc);
+ } else {
+ flushErrorLog.error(excMessage, exc);
+ }
+ if (flushTries >= conf.flushRetryDelays.length) {
+ return;
+ }
+ int delayMs = conf.flushRetryDelays[flushTries];
+ Thread.sleep(delayMs);
+ flushTries++;
+ }
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
new file mode 100644
index 0000000..f867ad7
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.MessageBuffer;
+import org.msgpack.core.buffer.MessageBufferOutput;
+
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+
+/**
+ * A ByteBuffer which we are writing msgpack data to.
+ */
+class PackedBuffer {
+ /**
+ * A MessageBufferOutput that simply outputs to a ByteBuffer.
+ */
+ private class PackedBufferOutput implements MessageBufferOutput {
+ private MessageBuffer savedBuffer;
+
+ PackedBufferOutput() {
+ }
+
+ @Override
+ public MessageBuffer next(int bufferSize) throws IOException {
+ if (savedBuffer == null || savedBuffer.size() != bufferSize) {
+ savedBuffer = MessageBuffer.newBuffer(bufferSize);
+ }
+ MessageBuffer buffer = savedBuffer;
+ savedBuffer = null;
+ return buffer;
+ }
+
+ @Override
+ public void flush(MessageBuffer buffer) throws IOException {
+ ByteBuffer b = buffer.toByteBuffer();
+ bb.put(b);
+ savedBuffer = buffer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+ private static final Charset UTF8 = StandardCharsets.UTF_8;
+ private static final byte SPANS[] = "Spans".getBytes(UTF8);
+ private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
+ private static final byte A[] = "a".getBytes(UTF8);
+ private static final byte B[] = "b".getBytes(UTF8);
+ private static final byte E[] = "e".getBytes(UTF8);
+ private static final byte D[] = "d".getBytes(UTF8);
+ private static final byte R[] = "r".getBytes(UTF8);
+ private static final byte P[] = "p".getBytes(UTF8);
+ private static final byte N[] = "n".getBytes(UTF8);
+ private static final byte T[] = "t".getBytes(UTF8);
+ private static final byte M[] = "m".getBytes(UTF8);
+ private static final int HRPC_MAGIC = 0x43525448;
+ static final int HRPC_REQ_FRAME_LENGTH = 20;
+ static final int HRPC_RESP_FRAME_LENGTH = 20;
+ static final int MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024;
+ static final int MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024;
+ private static final int SPAN_ID_BYTE_LENGTH = 16;
+ static final MessagePack.Config MSGPACK_CONF =
+ new MessagePack.ConfigBuilder()
+ .readBinaryAsString(false)
+ .readStringAsBinary(false)
+ .build();
+ /**
+ * The array which we are filling.
+ */
+ final ByteBuffer bb;
+
+ /**
+ * Used to tell the MessagePacker to output to our array.
+ */
+ final PackedBufferOutput out;
+
+ /**
+ * A temporary buffer for serializing span ids and other things.
+ */
+ final byte[] temp;
+
+ /**
+ * Generates msgpack output.
+ */
+ final MessagePacker packer;
+
+ /**
+ * Create a new PackedBuffer.
+ *
+ * @param bb The ByteBuffer to use to create the packed buffer.
+ */
+ PackedBuffer(ByteBuffer bb) {
+ this.bb = bb;
+ this.out = new PackedBufferOutput();
+ this.temp = new byte[SPAN_ID_BYTE_LENGTH];
+ this.packer = new MessagePacker(out, MSGPACK_CONF);
+ }
+
+ /**
+ * Write the fixed-length request frame which starts packed RPC messages.
+ */
+ static void writeReqFrame(ByteBuffer bb, int methodId, long seq, int length)
+ throws IOException {
+ int oldPos = bb.position();
+ boolean success = false;
+ try {
+ bb.order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(HRPC_MAGIC);
+ bb.putInt(methodId);
+ bb.putLong(seq);
+ bb.putInt(length);
+ success = true;
+ } finally {
+ if (!success) {
+ bb.position(oldPos);
+ }
+ }
+ }
+
+ /**
+ * Write an 8-byte value to a byte array as little-endian.
+ */
+ private static void longToBigEndian(byte b[], int pos, long val) {
+ b[pos + 0] =(byte) ((val >> 56) & 0xff);
+ b[pos + 1] =(byte) ((val >> 48) & 0xff);
+ b[pos + 2] =(byte) ((val >> 40) & 0xff);
+ b[pos + 3] =(byte) ((val >> 32) & 0xff);
+ b[pos + 4] =(byte) ((val >> 24) & 0xff);
+ b[pos + 5] =(byte) ((val >> 16) & 0xff);
+ b[pos + 6] =(byte) ((val >> 8) & 0xff);
+ b[pos + 7] =(byte) ((val >> 0) & 0xff);
+ }
+
+ private void writeSpanId(SpanId spanId) throws IOException {
+ longToBigEndian(temp, 0, spanId.getHigh());
+ longToBigEndian(temp, 8, spanId.getLow());
+ packer.packBinaryHeader(SPAN_ID_BYTE_LENGTH);
+ packer.writePayload(temp, 0, SPAN_ID_BYTE_LENGTH);
+ }
+
+ /**
+ * Serialize a span to the given OutputStream.
+ */
+ void writeSpan(Span span) throws IOException {
+ boolean success = false;
+ int oldPos = bb.position();
+ try {
+ int mapSize = 0;
+ if (span.getSpanId().isValid()) {
+ mapSize++;
+ }
+ if (span.getStartTimeMillis() != 0) {
+ mapSize++;
+ }
+ if (span.getStopTimeMillis() != 0) {
+ mapSize++;
+ }
+ if (!span.getDescription().isEmpty()) {
+ mapSize++;
+ }
+ if (!span.getTracerId().isEmpty()) {
+ mapSize++;
+ }
+ if (span.getParents().length > 0) {
+ mapSize++;
+ }
+ if (!span.getKVAnnotations().isEmpty()) {
+ mapSize++;
+ }
+ if (!span.getTimelineAnnotations().isEmpty()) {
+ mapSize++;
+ }
+ packer.packMapHeader(mapSize);
+ if (span.getSpanId().isValid()) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(A);
+ writeSpanId(span.getSpanId());
+ }
+ if (span.getStartTimeMillis() != 0) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(B);
+ packer.packLong(span.getStartTimeMillis());
+ }
+ if (span.getStopTimeMillis() != 0) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(E);
+ packer.packLong(span.getStopTimeMillis());
+ }
+ if (!span.getDescription().isEmpty()) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(D);
+ packer.packString(span.getDescription());
+ }
+ if (!span.getTracerId().isEmpty()) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(R);
+ packer.packString(span.getTracerId());
+ }
+ if (span.getParents().length > 0) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(P);
+ packer.packArrayHeader(span.getParents().length);
+ for (int i = 0; i < span.getParents().length; i++) {
+ writeSpanId(span.getParents()[i]);
+ }
+ }
+ if (!span.getKVAnnotations().isEmpty()) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(N);
+ Map<String, String> map = span.getKVAnnotations();
+ packer.packMapHeader(map.size());
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ packer.packString(entry.getKey());
+ packer.packString(entry.getValue());
+ }
+ }
+ if (!span.getTimelineAnnotations().isEmpty()) {
+ packer.packRawStringHeader(1);
+ packer.writePayload(T);
+ List<TimelineAnnotation> list = span.getTimelineAnnotations();
+ packer.packArrayHeader(list.size());
+ for (TimelineAnnotation annotation : list) {
+ packer.packMapHeader(2);
+ packer.packRawStringHeader(1);
+ packer.writePayload(T);
+ packer.packLong(annotation.getTime());
+ packer.packRawStringHeader(1);
+ packer.writePayload(M);
+ packer.packString(annotation.getMessage());
+ }
+ }
+ packer.flush();
+ success = true;
+ } finally {
+ if (!success) {
+ // If we failed earlier, restore the old position.
+ // This is so that if we run out of space, we don't add a
+ // partial span to the buffer.
+ bb.position(oldPos);
+ }
+ }
+ }
+
+ static SpanId readSpanId(MessageUnpacker unpacker) throws IOException {
+ int alen = unpacker.unpackBinaryHeader();
+ if (alen != SPAN_ID_BYTE_LENGTH) {
+ throw new IOException("Invalid length given for spanID array. " +
+ "Expected " + SPAN_ID_BYTE_LENGTH + "; got " + alen);
+ }
+ byte[] payload = new byte[SPAN_ID_BYTE_LENGTH];
+ unpacker.readPayload(payload);
+ return new SpanId(
+ ((payload[ 7] & 0xffL) << 0) |
+ ((payload[ 6] & 0xffL) << 8) |
+ ((payload[ 5] & 0xffL) << 16) |
+ ((payload[ 4] & 0xffL) << 24) |
+ ((payload[ 3] & 0xffL) << 32) |
+ ((payload[ 2] & 0xffL) << 40) |
+ ((payload[ 1] & 0xffL) << 48) |
+ ((payload[ 0] & 0xffL) << 56),
+ ((payload[15] & 0xffL) << 0) |
+ ((payload[14] & 0xffL) << 8) |
+ ((payload[13] & 0xffL) << 16) |
+ ((payload[12] & 0xffL) << 24) |
+ ((payload[11] & 0xffL) << 32) |
+ ((payload[10] & 0xffL) << 40) |
+ ((payload[ 9] & 0xffL) << 48) |
+ ((payload[ 8] & 0xffL) << 56)
+ );
+ }
+
+ /**
+ * Read a span. Used in unit tests. Not optimized.
+ */
+ static Span readSpan(MessageUnpacker unpacker) throws IOException {
+ int numEntries = unpacker.unpackMapHeader();
+ MilliSpan.Builder builder = new MilliSpan.Builder();
+ while (--numEntries >= 0) {
+ String key = unpacker.unpackString();
+ if (key.length() != 1) {
+ throw new IOException("Unknown key " + key);
+ }
+ switch (key.charAt(0)) {
+ case 'a':
+ builder.spanId(readSpanId(unpacker));
+ break;
+ case 'b':
+ builder.begin(unpacker.unpackLong());
+ break;
+ case 'e':
+ builder.end(unpacker.unpackLong());
+ break;
+ case 'd':
+ builder.description(unpacker.unpackString());
+ break;
+ case 'r':
+ builder.tracerId(unpacker.unpackString());
+ break;
+ case 'p':
+ int numParents = unpacker.unpackArrayHeader();
+ SpanId[] parents = new SpanId[numParents];
+ for (int i = 0; i < numParents; i++) {
+ parents[i] = readSpanId(unpacker);
+ }
+ builder.parents(parents);
+ break;
+ case 'n':
+ int mapEntries = unpacker.unpackMapHeader();
+ HashMap<String, String> entries =
+ new HashMap<String, String>(mapEntries);
+ for (int i = 0; i < mapEntries; i++) {
+ String k = unpacker.unpackString();
+ String v = unpacker.unpackString();
+ entries.put(k, v);
+ }
+ builder.traceInfo(entries);
+ break;
+ case 't':
+ int listEntries = unpacker.unpackArrayHeader();
+ ArrayList<TimelineAnnotation> list =
+ new ArrayList<TimelineAnnotation>(listEntries);
+ for (int i = 0; i < listEntries; i++) {
+ int timelineObjectSize = unpacker.unpackMapHeader();
+ long time = 0;
+ String msg = "";
+ for (int j = 0; j < timelineObjectSize; j++) {
+ String tlKey = unpacker.unpackString();
+ if (tlKey.length() != 1) {
+ throw new IOException("Unknown timeline map key " + tlKey);
+ }
+ switch (tlKey.charAt(0)) {
+ case 't':
+ time = unpacker.unpackLong();
+ break;
+ case 'm':
+ msg = unpacker.unpackString();
+ break;
+ default:
+ throw new IOException("Unknown timeline map key " + tlKey);
+ }
+ }
+ list.add(new TimelineAnnotation(time, msg));
+ }
+ builder.timeline(list);
+ break;
+ default:
+ throw new IOException("Unknown key " + key);
+ }
+ }
+ return builder.build();
+ }
+
+ void beginWriteSpansRequest(String defaultPid, int numSpans)
+ throws IOException {
+ boolean success = false;
+ int oldPos = bb.position();
+ try {
+ int mapSize = 1;
+ if (defaultPid != null) {
+ mapSize++;
+ }
+ packer.packMapHeader(mapSize);
+ if (defaultPid != null) {
+ packer.packRawStringHeader(DEFAULT_PID.length);
+ packer.writePayload(DEFAULT_PID);
+ packer.packString(defaultPid);
+ }
+ packer.packRawStringHeader(SPANS.length);
+ packer.writePayload(SPANS);
+ packer.packArrayHeader(numSpans);
+ packer.flush();
+ success = true;
+ } finally {
+ if (!success) {
+ bb.position(oldPos);
+ }
+ }
+ }
+
+ /**
+ * Get the underlying ByteBuffer.
+ */
+ ByteBuffer getBuffer() {
+ return bb;
+ }
+
+ /**
+ * Reset our position in the array.
+ */
+ void reset() throws IOException {
+ packer.reset(out);
+ }
+
+ void close() {
+ try {
+ packer.close();
+ } catch (IOException e) {
+ LOG.error("Error closing MessagePacker", e);
+ }
+ }
+
+ public String toHexString() {
+ String prefix = "";
+ StringBuilder bld = new StringBuilder();
+ ByteBuffer b = bb.duplicate();
+ b.flip();
+ while (b.hasRemaining()) {
+ bld.append(String.format("%s%02x", prefix, b.get()));
+ prefix = " ";
+ }
+ return bld.toString();
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
new file mode 100644
index 0000000..8b59a72
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+
+class PackedBufferManager implements BufferManager {
+ private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+ private static final int MAX_PREQUEL_LENGTH = 2048;
+ private static final int METHOD_ID_WRITE_SPANS = 0x1;
+ private final Conf conf;
+ private final ByteBuffer frameBuffer;
+ private final PackedBuffer prequel;
+ private final PackedBuffer spans;
+ private final Selector selector;
+ private int numSpans;
+
+ PackedBufferManager(Conf conf) throws IOException {
+ this.conf = conf;
+ this.frameBuffer = ByteBuffer.allocate(PackedBuffer.HRPC_REQ_FRAME_LENGTH);
+ this.prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH));
+ this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
+ this.selector = SelectorProvider.provider().openSelector();
+ clear();
+ }
+
+ @Override
+ public void writeSpan(Span span) throws IOException {
+ spans.writeSpan(span);
+ numSpans++;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " +
+ conf.endpointStr + ". numSpans = " + numSpans +
+ ", buffer position = " + spans.getBuffer().position());
+ }
+ }
+
+ @Override
+ public int contentLength() {
+ return spans.getBuffer().position();
+ }
+
+ @Override
+ public int getNumberOfSpans() {
+ return numSpans;
+ }
+
+ @Override
+ public void prepare() throws IOException {
+ prequel.beginWriteSpansRequest(null, numSpans);
+ long totalLength =
+ prequel.getBuffer().position() + spans.getBuffer().position();
+ if (totalLength > PackedBuffer.MAX_HRPC_BODY_LENGTH) {
+ throw new IOException("Can't send RPC of " + totalLength + " bytes " +
+ "because it is longer than " + PackedBuffer.MAX_HRPC_BODY_LENGTH);
+ }
+ PackedBuffer.writeReqFrame(frameBuffer,
+ METHOD_ID_WRITE_SPANS, 1, (int)totalLength);
+ frameBuffer.flip();
+ prequel.getBuffer().flip();
+ spans.getBuffer().flip();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Preparing to send RPC of length " +
+ (totalLength + PackedBuffer.HRPC_REQ_FRAME_LENGTH) + " to " +
+ conf.endpointStr + ", containing " + numSpans + " spans.");
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ SelectionKey sockKey = null;
+ IOException ioe = null;
+ frameBuffer.position(0);
+ prequel.getBuffer().position(0);
+ spans.getBuffer().position(0);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Preparing to flush " + numSpans + " spans to " +
+ conf.endpointStr);
+ }
+ try {
+ sockKey = doConnect();
+ doSend(sockKey, new ByteBuffer[] {
+ frameBuffer, prequel.getBuffer(), spans.getBuffer() });
+ ByteBuffer response = prequel.getBuffer();
+ readAndValidateResponseFrame(sockKey, response,
+ 1, METHOD_ID_WRITE_SPANS);
+ } catch (IOException e) {
+ // This LOG message is only at debug level because we also log these
+ // exceptions at error level inside HTracedReceiver. The logging in
+ // HTracedReceiver is rate-limited to avoid overwhelming the client log
+ // if htraced goes down. The debug and trace logging is not
+ // rate-limited.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception during flush", e);
+ }
+ ioe = e;
+ } finally {
+ if (sockKey != null) {
+ sockKey.cancel();
+ try {
+ SocketChannel sock = (SocketChannel)sockKey.attachment();
+ sock.close();
+ } catch (IOException e) {
+ if (ioe != null) {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+ if (ioe != null) {
+ throw ioe;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Successfully flushed " + numSpans + " spans to " +
+ conf.endpointStr);
+ }
+ }
+
+ private long updateRemainingMs(long startMs, long timeoutMs) {
+ long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs());
+ if (deltaMs > timeoutMs) {
+ return 0;
+ }
+ return timeoutMs - deltaMs;
+ }
+
+ private SelectionKey doConnect() throws IOException {
+ SocketChannel sock = SocketChannel.open();
+ SelectionKey sockKey = null;
+ boolean success = false;
+ try {
+ if (sock.isBlocking()) {
+ sock.configureBlocking(false);
+ }
+ InetSocketAddress resolvedEndpoint =
+ new InetSocketAddress(conf.endpoint.getHostString(),
+ conf.endpoint.getPort());
+ resolvedEndpoint.getHostName(); // trigger DNS resolution
+ sock.connect(resolvedEndpoint);
+ sockKey = sock.register(selector, SelectionKey.OP_CONNECT, sock);
+ long startMs = TimeUtil.nowMs();
+ long remainingMs = conf.connectTimeoutMs;
+ while (true) {
+ selector.select(remainingMs);
+ for (SelectionKey key : selector.keys()) {
+ if (key.isConnectable()) {
+ SocketChannel s = (SocketChannel)key.attachment();
+ s.finishConnect();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Successfully connected to " + conf.endpointStr + ".");
+ }
+ success = true;
+ return sockKey;
+ }
+ }
+ remainingMs = updateRemainingMs(startMs, conf.connectTimeoutMs);
+ if (remainingMs == 0) {
+ throw new IOException("Attempt to connect to " + conf.endpointStr +
+ " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+ " ms.");
+ }
+ }
+ } finally {
+ if (!success) {
+ if (sockKey != null) {
+ sockKey.cancel();
+ }
+ sock.close();
+ }
+ }
+ }
+
+ /**
+ * Send the provided ByteBuffer objects.
+ *
+ * We use non-blocking I/O because Java does not provide write timeouts.
+ * Without a write timeout, the socket could get hung and we'd never recover.
+ * We also use the GatheringByteChannel#write method which calls the pread()
+ * system call under the covers. This ensures that even if TCP_NODELAY is on,
+ * we send the minimal number of packets.
+ */
+ private void doSend(SelectionKey sockKey, ByteBuffer[] bufs)
+ throws IOException {
+ long totalWritten = 0;
+ sockKey.interestOps(SelectionKey.OP_WRITE);
+ SocketChannel sock = (SocketChannel)sockKey.attachment();
+ long startMs = TimeUtil.nowMs();
+ long remainingMs = conf.ioTimeoutMs;
+ while (true) {
+ selector.select(remainingMs);
+ int firstBuf = 0;
+ for (SelectionKey key : selector.selectedKeys()) {
+ if (key.isWritable()) {
+ long written = sock.write(bufs, firstBuf, bufs.length - firstBuf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sent " + written + " bytes to " + conf.endpointStr);
+ }
+ totalWritten += written;
+ }
+ }
+ while (true) {
+ if (firstBuf == bufs.length) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Finished sending " + totalWritten + " bytes to " +
+ conf.endpointStr);
+ }
+ return;
+ }
+ if (bufs[firstBuf].remaining() > 0) {
+ break;
+ }
+ firstBuf++;
+ }
+ remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+ if (remainingMs == 0) {
+ throw new IOException("Attempt to write to " + conf.endpointStr +
+ " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+ " ms.");
+ }
+ }
+ }
+
+ private void doRecv(SelectionKey sockKey, ByteBuffer response)
+ throws IOException {
+ sockKey.interestOps(SelectionKey.OP_READ);
+ SocketChannel sock = (SocketChannel)sockKey.attachment();
+ int totalRead = response.remaining();
+ long startMs = TimeUtil.nowMs();
+ long remainingMs = conf.ioTimeoutMs;
+ while (remainingMs > 0) {
+ selector.select(remainingMs);
+ for (SelectionKey key : selector.selectedKeys()) {
+ if (key.isReadable()) {
+ sock.read(response);
+ }
+ }
+ if (response.remaining() == 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received all " + totalRead + " bytes from " +
+ conf.endpointStr);
+ }
+ return;
+ }
+ remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received " + (totalRead - response.remaining()) +
+ " out of " + totalRead + " bytes from " + conf.endpointStr);
+ }
+ if (remainingMs == 0) {
+ throw new IOException("Attempt to write to " + conf.endpointStr +
+ " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+ " ms.");
+ }
+ }
+ }
+
+ private void readAndValidateResponseFrame(SelectionKey sockKey,
+ ByteBuffer buf, long expectedSeq, int expectedMethodId)
+ throws IOException {
+ buf.clear();
+ buf.limit(PackedBuffer.HRPC_RESP_FRAME_LENGTH);
+ doRecv(sockKey, buf);
+ buf.flip();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long seq = buf.getLong();
+ if (seq != expectedSeq) {
+ throw new IOException("Expected sequence number " + expectedSeq +
+ ", but got sequence number " + seq);
+ }
+ int methodId = buf.getInt();
+ if (expectedMethodId != methodId) {
+ throw new IOException("Expected method id " + expectedMethodId +
+ ", but got " + methodId);
+ }
+ int errorLength = buf.getInt();
+ buf.getInt();
+ if ((errorLength < 0) ||
+ (errorLength > PackedBuffer.MAX_HRPC_ERROR_LENGTH)) {
+ throw new IOException("Got server error with invalid length " +
+ errorLength);
+ } else if (errorLength > 0) {
+ buf.clear();
+ buf.limit(errorLength);
+ doRecv(sockKey, buf);
+ buf.flip();
+ CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf);
+ String serverErrorStr = charBuf.toString();
+ throw new IOException("Got server error " + serverErrorStr);
+ }
+ }
+
+ @Override
+ public void clear() {
+ frameBuffer.clear();
+ prequel.getBuffer().clear();
+ spans.getBuffer().clear();
+ numSpans = 0;
+ }
+
+ @Override
+ public void close() {
+ clear();
+ prequel.close();
+ spans.close();
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing selector", e);
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
new file mode 100644
index 0000000..ac42ee8
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * A logger which rate-limits its logging to a configurable level.
+ */
+class RateLimitedLogger {
+ private final Log log;
+ private final long timeoutMs;
+ private long lastLogTimeMs;
+
+ public RateLimitedLogger(Log log, long timeoutMs) {
+ this.log = log;
+ this.timeoutMs = timeoutMs;
+ synchronized (this) {
+ this.lastLogTimeMs = 0L;
+ }
+ }
+
+ public void warn(String what) {
+ long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+ TimeUnit.NANOSECONDS);
+ synchronized (this) {
+ if (now >= lastLogTimeMs + timeoutMs) {
+ log.warn(what);
+ lastLogTimeMs = now;
+ }
+ }
+ }
+
+ public void error(String what) {
+ long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+ TimeUnit.NANOSECONDS);
+ synchronized (this) {
+ if (now >= lastLogTimeMs + timeoutMs) {
+ log.error(what);
+ lastLogTimeMs = now;
+ }
+ }
+ }
+
+ public void error(String what, Throwable e) {
+ long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+ TimeUnit.NANOSECONDS);
+ synchronized (this) {
+ if (now >= lastLogTimeMs + timeoutMs) {
+ log.error(what, e);
+ lastLogTimeMs = now;
+ }
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
new file mode 100644
index 0000000..2e1aa70
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+
+class RestBufferManager implements BufferManager {
+ private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
+ private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer();
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final byte COMMA_BYTE = (byte)0x2c;
+ private static final int MAX_PREQUEL_LENGTH = 512;
+ private static final int MAX_EPILOGUE_LENGTH = 32;
+ private final Conf conf;
+ private final HttpClient httpClient;
+ private final String urlString;
+ private final ByteBuffer prequel;
+ private final ByteBuffer spans;
+ private final ByteBuffer epilogue;
+ private int numSpans;
+
+ private static class RestBufferManagerContentProvider
+ implements ContentProvider {
+ private final ByteBuffer[] bufs;
+
+ private class ByteBufferIterator implements Iterator<ByteBuffer> {
+ private int bufIdx = -1;
+
+ @Override
+ public boolean hasNext() {
+ return (bufIdx + 1) < bufs.length;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ if ((bufIdx + 1) >= bufs.length) {
+ throw new NoSuchElementException();
+ }
+ bufIdx++;
+ return bufs[bufIdx];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ RestBufferManagerContentProvider(ByteBuffer[] bufs) {
+ this.bufs = bufs;
+ }
+
+ @Override
+ public long getLength() {
+ long total = 0;
+ for (int i = 0; i < bufs.length; i++) {
+ total += bufs[i].remaining();
+ }
+ return total;
+ }
+
+ @Override
+ public Iterator<ByteBuffer> iterator() {
+ return new ByteBufferIterator();
+ }
+ }
+
+ /**
+ * Create an HttpClient instance.
+ *
+ * @param connTimeout The timeout to use for connecting.
+ * @param idleTimeout The idle timeout to use.
+ */
+ static HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+ HttpClient httpClient = new HttpClient();
+ httpClient.setUserAgentField(
+ new HttpField(HttpHeader.USER_AGENT, "HTracedSpanReceiver"));
+ httpClient.setConnectTimeout(connTimeout);
+ httpClient.setIdleTimeout(idleTimeout);
+ return httpClient;
+ }
+
+ RestBufferManager(Conf conf) throws Exception {
+ this.conf = conf;
+ this.httpClient =
+ createHttpClient(conf.connectTimeoutMs, conf.idleTimeoutMs);
+ this.urlString = new URL("http", conf.endpoint.getHostName(),
+ conf.endpoint.getPort(), "/writeSpans").toString();
+ this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
+ this.spans = ByteBuffer.allocate(conf.bufferSize);
+ this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
+ clear();
+ this.httpClient.start();
+ }
+
+ @Override
+ public void writeSpan(Span span) throws IOException {
+ byte[] spanJsonBytes = JSON_WRITER.writeValueAsBytes(span);
+ if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
+ // Make sure we have enough space for the span JSON and a comma.
+ throw new IOException("Not enough space remaining in span buffer.");
+ }
+ spans.put(COMMA_BYTE);
+ spans.put(spanJsonBytes);
+ numSpans++;
+ }
+
+ @Override
+ public int contentLength() {
+ return Math.max(spans.position() - 1, 0);
+ }
+
+ @Override
+ public int getNumberOfSpans() {
+ return numSpans;
+ }
+
+ @Override
+ public void prepare() throws IOException {
+ String prequelString = "{\"Spans\":[";
+ prequel.put(prequelString.getBytes(UTF8));
+ prequel.flip();
+
+ spans.flip();
+
+ String epilogueString = "]}";
+ epilogue.put(epilogueString.toString().getBytes(UTF8));
+ epilogue.flip();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
+ "data to " + conf.endpointStr + ", containing " + numSpans +
+ " spans.");
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // Position the buffers at the beginning.
+ prequel.position(0);
+ spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
+ epilogue.position(0);
+
+ RestBufferManagerContentProvider contentProvider =
+ new RestBufferManagerContentProvider(
+ new ByteBuffer[] { prequel, spans, epilogue });
+ long rpcLength = contentProvider.getLength();
+ try {
+ Request request = httpClient.
+ newRequest(urlString).method(HttpMethod.POST);
+ request.header(HttpHeader.CONTENT_TYPE, "application/json");
+ request.content(contentProvider);
+ ContentResponse response = request.send();
+ if (response.getStatus() != HttpStatus.OK_200) {
+ throw new IOException("Got back error response " +
+ response.getStatus() + " from " + conf.endpointStr + "; " +
+ response.getContentAsString());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sent WriteSpansReq of length " + rpcLength + " to " + conf.endpointStr);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while sending spans via REST", e);
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out sending spans via REST", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Execution exception sending spans via REST", e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ prequel.clear();
+ spans.clear();
+ epilogue.clear();
+ numSpans = 0;
+ }
+
+ @Override
+ public void close() {
+ try {
+ httpClient.stop();
+ } catch (Exception e) {
+ LOG.error("Error stopping HTracedReceiver httpClient", e);
+ }
+ }
+}
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
new file mode 100644
index 0000000..7361c97
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for dealing with monotonic time.
+ */
+class TimeUtil {
+ /**
+ * Returns the current monotonic time in milliseconds.
+ */
+ static long nowMs() {
+ return TimeUnit.MILLISECONDS.convert(
+ System.nanoTime(), TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Get the approximate delta between two monotonic times.
+ *
+ * This function makes the following assumptions:
+ * 1. We read startMs from the monotonic clock prior to endMs.
+ * 2. The two times are not more than 100 years or so apart.
+ *
+ * With these two assumptions in hand, we can smooth over some of the
+ * unpleasant features of the monotonic clock:
+ * 1. It can return either positive or negative values.
+ * 2. When the number of nanoseconds reaches Long.MAX_VALUE it wraps around
+ * to Long.MIN_VALUE.
+ * 3. On some messed up systems it has been known to jump backwards every
+ * now and then. Oops. CPU core synchronization mumble mumble.
+ *
+ * @param startMs The start time.
+ * @param endMs The end time.
+ * @return The delta between the two times.
+ */
+ static long deltaMs(long startMs, long endMs) {
+ BigInteger startNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+ convert(startMs, TimeUnit.MILLISECONDS));
+ BigInteger endNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+ convert(endMs, TimeUnit.MILLISECONDS));
+ BigInteger deltaNs = endNs.subtract(startNs);
+ if (deltaNs.signum() >= 0) {
+ return TimeUnit.MILLISECONDS.convert(deltaNs.min(
+ BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+ }
+ deltaNs = deltaNs.negate();
+ if (deltaNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE / 2)) < 0) {
+ // If the 'startNs' is numerically less than the 'endNs', and the
+ // difference between the two is less than 100 years, it's probably
+ // just clock jitter. Certain old OSes and CPUs had monotonic clocks
+ // that could go backwards under certain conditions (ironic, given
+ // the name).
+ return 0L;
+ }
+ // Handle rollover.
+ BigInteger revDeltaNs = BigInteger.ONE.shiftLeft(64).subtract(deltaNs);
+ return TimeUnit.MILLISECONDS.convert(revDeltaNs.min(
+ BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+ }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
new file mode 100644
index 0000000..f224f6f
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+
+/**
+ * Small util for making a data directory for tests to use when running tests. We put it up at
+ * target/test-data/UUID. Create an instance of this class per unit test run and it will take
+ * care of setting up the dirs for you. Pass what is returned here as location from which to
+ * have daemons and tests dump data.
+ */
+public class DataDir implements Closeable {
+ private final File dir;
+
+ public DataDir() throws IOException {
+ String baseDir = System.getProperty(
+ "test.data.base.dir", "target");
+ File testData = new File(new File(baseDir), "test-data");
+ this.dir = new File(testData, UUID.randomUUID().toString());
+ Files.createDirectories(this.dir.toPath());
+ }
+
+ public File get() {
+ return dir;
+ }
+
+ @Override
+ public void close() throws IOException {
+ /*for (File file : this.dir.listFiles()) {
+ file.delete();
+ }
+ Files.delete(this.dir.toPath()); */
+ }
+
+ @Override
+ public String toString() {
+ return "DataDir{" + dir.getAbsolutePath() + "}";
+ }
+}
\ No newline at end of file
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
similarity index 60%
rename from htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
rename to htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
index 3e800d2..26c1a10 100644
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.htrace.util;
+package org.apache.htrace.impl;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -24,27 +24,76 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.ProcessBuilder.Redirect;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.URL;
+import java.net.URI;
+import java.nio.file.Paths;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.Assert;
/**
* To get instance of HTraced up and running, create an instance of this class.
* Upon successful construction, htraced is running using <code>dataDir</code> as directory to
* host data (leveldbs and logs).
- * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff
- * moves.
*/
-public class HTracedProcess extends Process {
+class HTracedProcess extends Process {
private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
+
+ static class Builder {
+ String host = "localhost";
+
+ Builder() {
+ }
+
+ Builder host(String host) {
+ this.host = host;
+ return this;
+ }
+
+ HTracedProcess build() throws Exception {
+ return new HTracedProcess(this);
+ }
+ }
+
+ /**
+ * Path to the htraced binary.
+ */
+ private final File htracedPath;
+
+ /**
+ * Temporary directory for test files.
+ */
+ private final DataDir dataDir;
+
+ /**
+ * The Java Process object for htraced.
+ */
private final Process delegate;
+ /**
+ * The HTTP host:port returned from htraced.
+ */
private final String httpAddr;
/**
+ * The HRPC host:port returned from htraced.
+ */
+ private final String hrpcAddr;
+
+ /**
+ * REST client to use to talk to htraced.
+ */
+ private final HttpClient httpClient;
+
+ /**
* Data send back from the HTraced process on the notification port.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -56,42 +105,59 @@
String httpAddr;
/**
+ * The hostname:port pair which the HTraced process uses for HRPC requests.
+ */
+ @JsonProperty("HrpcAddr")
+ String hrpcAddr;
+
+ /**
* The process ID of the HTraced process.
*/
@JsonProperty("ProcessId")
long processId;
}
- public HTracedProcess(final File binPath, final File dataDir,
- final String host) throws IOException {
+ private HTracedProcess(Builder builder) throws Exception {
+ this.htracedPath = Paths.get(
+ "target", "..", "go", "build", "htraced").toFile();
+ if (!this.htracedPath.exists()) {
+ throw new RuntimeException("No htraced binary exists at " +
+ this.htracedPath);
+ }
+ this.dataDir = new DataDir();
// Create a notifier socket bound to a random port.
ServerSocket listener = new ServerSocket(0);
boolean success = false;
Process process = null;
+ HttpClient http = null;
try {
// Use a random port for the web address. No 'scheme' yet.
- String webAddress = host + ":0";
- String logPath = new File(dataDir, "log.txt").getAbsolutePath();
+ String random = builder.host + ":0";
+ String logPath = new File(dataDir.get(), "log.txt").getAbsolutePath();
// Pass cmdline args to htraced to it uses our test dir for data.
- ProcessBuilder pb = new ProcessBuilder(binPath.toString(),
+ ProcessBuilder pb = new ProcessBuilder(htracedPath.getAbsolutePath(),
"-Dlog.level=TRACE",
"-Dlog.path=" + logPath,
- "-Dweb.address=" + webAddress,
+ "-Dweb.address=" + random,
+ "-Dhrpc.address=" + random,
"-Ddata.store.clear=true",
"-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
- "-Ddata.store.directories=" + dataDir.toString());
+ "-Ddata.store.directories=" + dataDir.get().getAbsolutePath());
pb.redirectErrorStream(true);
// Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later.
pb.inheritIO();
- pb.directory(dataDir);
+ pb.directory(dataDir.get());
//assert pb.redirectInput() == Redirect.PIPE;
//assert pb.redirectOutput().file() == dataDir;
process = pb.start();
assert process.getInputStream().read() == -1;
StartupNotificationData data = readStartupNotification(listener);
httpAddr = data.httpAddr;
+ hrpcAddr = data.hrpcAddr;
LOG.info("Started htraced process " + data.processId + " with http " +
"address " + data.httpAddr + ", logging to " + logPath);
+ http = RestBufferManager.createHttpClient(60000L, 60000L);
+ http.start();
success = true;
} finally {
if (!success) {
@@ -100,9 +166,13 @@
process.destroy();
process = null;
}
+ if (http != null) {
+ http.stop();
+ }
}
delegate = process;
listener.close();
+ httpClient = http;
}
}
@@ -149,7 +219,18 @@
}
public void destroy() {
+ try {
+ httpClient.stop();
+ } catch (Exception e) {
+ LOG.error("Error stopping httpClient", e);
+ }
delegate.destroy();
+ try {
+ dataDir.close();
+ } catch (Exception e) {
+ LOG.error("Error closing " + dataDir, e);
+ }
+ LOG.trace("Destroyed htraced process.");
}
public String toString() {
@@ -160,6 +241,10 @@
return httpAddr;
}
+ public String getHrpcAddr() {
+ return hrpcAddr;
+ }
+
/**
* Ugly but how else to do file-math?
* @param topLevel Presumes top-level of the htrace checkout.
@@ -169,4 +254,24 @@
return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"),
"htraced");
}
+
+ public String getServerInfoJson() throws Exception {
+ ContentResponse response = httpClient.GET(
+ new URI(String.format("http://%s/server/info", httpAddr)));
+ Assert.assertEquals("application/json", response.getMediaType());
+ Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+ return response.getContentAsString();
+ }
+
+ public Span getSpan(SpanId spanId) throws Exception {
+ ContentResponse response = httpClient.GET(
+ new URI(String.format("http://%s/span/%s",
+ httpAddr, spanId.toString())));
+ Assert.assertEquals("application/json", response.getMediaType());
+ String responseJson = response.getContentAsString().trim();
+ if (responseJson.isEmpty()) {
+ return null;
+ }
+ return MilliSpan.fromJson(responseJson);
+ }
}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
deleted file mode 100644
index d52f071..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URL;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.MilliSpan;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanId;
-import org.apache.htrace.core.TracerId;
-import org.apache.htrace.util.DataDir;
-import org.apache.htrace.util.HTracedProcess;
-import org.apache.htrace.util.TestUtil;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.http.HttpStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestHTracedRESTReceiver {
- private static final Log LOG =
- LogFactory.getLog(TestHTracedRESTReceiver.class);
- private URL restServerUrl;
- private DataDir dataDir;
- HTracedProcess htraced;
-
- @Before
- public void setUp() throws Exception {
- this.dataDir = new DataDir();
- File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir());
- File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir);
- this.htraced = new HTracedProcess(pathToHTracedBinary,
- dataDir.getDataDir(), "localhost");
- this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/");
- }
-
- @After
- public void tearDown() throws Exception {
- if (this.htraced != null) this.htraced.destroy();
- }
-
- /**
- * Our simple version of htrace configuration for testing.
- */
- private final class TestHTraceConfiguration extends HTraceConfiguration {
- private final URL restServerUrl;
- final static String TRACER_ID = "TestHTracedRESTReceiver";
-
- public TestHTraceConfiguration(final URL restServerUrl) {
- this.restServerUrl = restServerUrl;
- }
-
- @Override
- public String get(String key) {
- return null;
- }
-
- @Override
- public String get(String key, String defaultValue) {
- if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
- return this.restServerUrl.toString();
- } else if (key.equals(TracerId.TRACER_ID_KEY)) {
- return TRACER_ID;
- }
- return defaultValue;
- }
- }
-
- /**
- * Make sure the REST server basically works.
- * @throws Exception
- */
- @Test (timeout = 10000)
- public void testBasicGet() throws Exception {
- HTracedRESTReceiver receiver =
- new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
- HttpClient http = receiver.createHttpClient(60000L, 60000L);
- http.start();
- try {
- // Do basic a GET /server/info against htraced
- ContentResponse response =
- http.GET(restServerUrl + "server/info");
- assertEquals("application/json", response.getMediaType());
- String content = processGET(response);
- assertTrue(content.contains("ReleaseVersion"));
- System.out.println(content);
- } finally {
- http.stop();
- receiver.close();
- }
- }
-
- private String processGET(final ContentResponse response) {
- assertTrue("" + response.getStatus(), HttpStatus.OK_200 <= response.getStatus() &&
- response.getStatus() <= HttpStatus.NO_CONTENT_204);
- return response.getContentAsString();
- }
-
- private void testSendingSpansImpl(boolean testClose) throws Exception {
- final HTracedRESTReceiver receiver =
- new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
- final int NUM_SPANS = 3;
- final HttpClient http = receiver.createHttpClient(60000, 60000);
- http.start();
- Span spans[] = new Span[NUM_SPANS];
- for (int i = 0; i < NUM_SPANS; i++) {
- MilliSpan.Builder builder = new MilliSpan.Builder().
- parents(new SpanId[] { new SpanId(1L, 1L) }).
- spanId(new SpanId(1L, i));
- if (i == NUM_SPANS - 1) {
- builder.tracerId("specialTrid");
- } else {
- builder.tracerId(TestHTraceConfiguration.TRACER_ID);
- }
- spans[i] = builder.build();
- }
- try {
- for (int i = 0; i < NUM_SPANS; i++) {
- LOG.info("receiving " + spans[i].toString());
- receiver.receiveSpan(spans[i]);
- }
- if (testClose) {
- receiver.close();
- } else {
- receiver.startFlushing();
- }
- TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
- @Override
- public Boolean get() {
- try {
- for (int i = 0; i < NUM_SPANS; i++) {
- // This is what the REST server expects when querying for a
- // span id.
- String findSpan = String.format("span/%s",
- new SpanId(1L, i).toString());
- ContentResponse response =
- http.GET(restServerUrl + findSpan);
- String content = processGET(response);
- if ((content == null) || (content.length() == 0)) {
- LOG.info("Failed to find span " + i);
- return false;
- }
- LOG.info("Got " + content + " for span " + i);
- MilliSpan dspan = MilliSpan.fromJson(content);
- assertEquals(new SpanId(1, i).toString(),
- dspan.getSpanId().toString());
- // Every span should have the tracer ID we set in the
- // configuration... except for the last span, which had
- // a custom value set.
- if (i == NUM_SPANS - 1) {
- assertEquals("specialTrid", dspan.getTracerId());
- } else {
- assertEquals(TestHTraceConfiguration.TRACER_ID,
- dspan.getTracerId());
- }
- }
- return true;
- } catch (Throwable t) {
- LOG.error("Got exception", t);
- return false;
- }
- }
- }, 10, 20000);
- } finally {
- http.stop();
- if (!testClose) {
- receiver.close();
- }
- }
- }
-
- /**
- * Send 100 spans then confirm they made it in.
- * @throws Exception
- */
- @Test (timeout = 60000)
- public void testSendingSpans() throws Exception {
- testSendingSpansImpl(false);
- }
-
- /**
- * Test that the REST receiver blocks during shutdown until all spans are sent
- * (or a long timeout elapses). Otherwise, short-lived client processes will
- * never have a chance to send all their spans and we will have incomplete
- * information.
- */
- @Test (timeout = 60000)
- public void testShutdownBlocksUntilSpanAreSent() throws Exception {
- testSendingSpansImpl(true);
- }
-}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
new file mode 100644
index 0000000..99f00a1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TracerId;
+import org.apache.htrace.impl.HTracedSpanReceiver.FaultInjector;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+public class TestHTracedReceiver {
+ private static final Log LOG = LogFactory.getLog(TestHTracedReceiver.class);
+
+ @BeforeClass
+ public static void beforeClass() {
+ // Allow setting really small buffer sizes for testing purposes.
+ // We do not allow setting such small sizes in production.
+ Conf.BUFFER_SIZE_MIN = 0;
+ }
+
+ @Rule
+ public TestRule watcher = new TestWatcher() {
+ protected void starting(Description description) {
+ LOG.info("*** Starting junit test: " + description.getMethodName());
+ }
+
+ protected void finished(Description description) {
+ LOG.info("*** Finished junit test: " + description.getMethodName());
+ }
+ };
+
+ @Test(timeout = 60000)
+ public void testGetServerInfoJson() throws Exception {
+ HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ String response = ht.getServerInfoJson();
+ assertTrue(response.contains("ReleaseVersion"));
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ private void waitForSpans(final HTracedProcess ht, Span[] spans)
+ throws Exception {
+ waitForSpans(ht, spans, spans.length);
+ }
+
+ private void waitForSpans(final HTracedProcess ht, Span[] spans,
+ int numSpans) throws Exception {
+ final LinkedList<SpanId> spanIds = new LinkedList<SpanId>();
+ for (int i = 0; i < numSpans; i++) {
+ spanIds.add(spans[i].getSpanId());
+ }
+ boolean success = false;
+ try {
+ TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (Iterator<SpanId> iter = spanIds.iterator();
+ iter.hasNext(); ) {
+ SpanId spanId = iter.next();
+ try {
+ if (ht.getSpan(spanId) == null) {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Got InterruptedException while looking for " +
+ "span ID " + spanId, e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.error("Got error looking for span ID " + spanId, e);
+ return false;
+ }
+ iter.remove();
+ }
+ return true;
+ }
+ }, 10, 30000);
+ success = true;
+ } finally {
+ if (!success) {
+ String prefix = "";
+ StringBuilder idStringBld = new StringBuilder();
+ for (Iterator<SpanId> iter = spanIds.iterator();
+ iter.hasNext(); ) {
+ idStringBld.append(prefix);
+ idStringBld.append(iter.next());
+ prefix = ",";
+ }
+ LOG.error("Unable to find span IDs " + idStringBld.toString());
+ }
+ }
+ }
+
+ /**
+ * Test that we can send spans via the HRPC interface.
+ */
+ @Test(timeout = 10000) //60000)
+ public void testSendSpansViaPacked() throws Exception {
+ final Random rand = new Random(123);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testSendSpansViaPacked");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+ put(Conf.ERROR_LOG_PERIOD_MS_KEY, "0");
+ }});
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+ Span[] spans = TestUtil.randomSpans(rand, 10);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ waitForSpans(ht, spans);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that when the SpanReceiver is closed, we send any spans we have
+ * buffered via the HRPC interface.
+ */
+ @Test(timeout = 60000)
+ public void testSendSpansViaPackedAndClose() throws Exception {
+ final Random rand = new Random(456);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testSendSpansViaPackedAndClose");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+ }});
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+ Span[] spans = TestUtil.randomSpans(rand, 10);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ rcvr.close();
+ waitForSpans(ht, spans);
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that we can send spans via the REST interface.
+ */
+ @Test(timeout = 60000)
+ public void testSendSpansViaRest() throws Exception {
+ final Random rand = new Random(789);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testSendSpansViaRest");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+ }});
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+ Span[] spans = TestUtil.randomSpans(rand, 10);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ waitForSpans(ht, spans);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that when the SpanReceiver is closed, we send any spans we have
+ * buffered via the REST interface.
+ */
+ @Test(timeout = 60000)
+ public void testSendSpansViaRestAndClose() throws Exception {
+ final Random rand = new Random(321);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testSendSpansViaRestAndClose");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+ }});
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+ Span[] spans = TestUtil.randomSpans(rand, 10);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ rcvr.close();
+ waitForSpans(ht, spans);
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ private static class Mutable<T> {
+ private T t;
+
+ Mutable(T t) {
+ this.t = t;
+ }
+
+ void set(T t) {
+ this.t = t;
+ }
+
+ T get() {
+ return this.t;
+ }
+ }
+
+ private static class TestHandleContentLengthTriggerInjector
+ extends HTracedSpanReceiver.FaultInjector {
+ final Semaphore threadStartSem = new Semaphore(0);
+ int contentLengthOnTrigger = 0;
+
+ @Override
+ public synchronized void handleContentLengthTrigger(int len) {
+ contentLengthOnTrigger = len;
+ }
+ @Override
+ public void handleThreadStart() throws Exception {
+ threadStartSem.acquire();
+ }
+
+ public synchronized int getContentLengthOnTrigger() {
+ return contentLengthOnTrigger;
+ }
+ }
+
+ /**
+ * Test that filling up one of the buffers causes us to trigger a flush and
+ * start using the other buffer, when using PackedBufferManager.
+ * This also tests that PackedBufferManager can correctly handle a buffer
+ * getting full.
+ */
+ @Test(timeout = 60000)
+ public void testFullBufferCausesPackedThreadTrigger() throws Exception {
+ final Random rand = new Random(321);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY,
+ "testFullBufferCausesPackedThreadTrigger");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ put(Conf.BUFFER_SIZE_KEY, "16384");
+ put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+ }});
+ TestHandleContentLengthTriggerInjector injector =
+ new TestHandleContentLengthTriggerInjector();
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 47);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ Assert.assertTrue("The wakePostSpansThread should have been " +
+ "triggered by the spans added so far. " +
+ "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+ injector.getContentLengthOnTrigger() > 16000);
+ injector.threadStartSem.release();
+ rcvr.close();
+ waitForSpans(ht, spans, 45);
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that filling up one of the buffers causes us to trigger a flush and
+ * start using the other buffer, when using RestBufferManager.
+ * This also tests that RestBufferManager can correctly handle a buffer
+ * getting full.
+ */
+ @Test(timeout = 60000)
+ public void testFullBufferCausesRestThreadTrigger() throws Exception {
+ final Random rand = new Random(321);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY,
+ "testFullBufferCausesRestThreadTrigger");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ put(Conf.BUFFER_SIZE_KEY, "16384");
+ put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+ }});
+ TestHandleContentLengthTriggerInjector injector =
+ new TestHandleContentLengthTriggerInjector();
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 34);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ Assert.assertTrue("The wakePostSpansThread should have been " +
+ "triggered by the spans added so far. " +
+ "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+ injector.getContentLengthOnTrigger() > 16000);
+ injector.threadStartSem.release();
+ rcvr.close();
+ waitForSpans(ht, spans, 33);
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * A FaultInjector that causes all flushes to fail until a specified
+ * number of milliseconds have passed.
+ */
+ private static class TestInjectFlushFaults
+ extends HTracedSpanReceiver.FaultInjector {
+ private long remainingFaults;
+
+ TestInjectFlushFaults(long remainingFaults) {
+ this.remainingFaults = remainingFaults;
+ }
+
+ @Override
+ public synchronized void handleFlush() throws IOException {
+ if (remainingFaults > 0) {
+ remainingFaults--;
+ throw new IOException("Injected IOException into flush " +
+ "code path.");
+ }
+ }
+ }
+
+ /**
+ * Test that even if the flush fails, the system stays stable and we can
+ * still close the span receiver.
+ */
+ @Test(timeout = 60000)
+ public void testPackedThreadHandlesFlushFailure() throws Exception {
+ final Random rand = new Random(321);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testPackedThreadHandlesFlushFailure");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ }});
+ TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 15);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that even if the flush fails, the system stays stable and we can
+ * still close the span receiver.
+ */
+ @Test(timeout = 60000)
+ public void testRestThreadHandlesFlushFailure() throws Exception {
+ final Random rand = new Random(321);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testRestThreadHandlesFlushFailure");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ }});
+ TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 15);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * A FaultInjector that causes all flushes to fail until a specified
+ * number of milliseconds have passed.
+ */
+ private static class WaitForFlushes
+ extends HTracedSpanReceiver.FaultInjector {
+ final Semaphore flushSem;
+
+ WaitForFlushes(int numFlushes) {
+ this.flushSem = new Semaphore(-numFlushes);
+ }
+
+ @Override
+ public void handleFlush() throws IOException {
+ flushSem.release();
+ }
+ }
+
+ /**
+ * Test that the packed code works when performing multiple flushes.
+ */
+ @Test(timeout = 60000)
+ public void testMultiplePackedFlushes() throws Exception {
+ final Random rand = new Random(123);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testMultiplePackedFlushes");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+ }});
+ WaitForFlushes injector = new WaitForFlushes(5);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 3);
+ while (true) {
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ if (injector.flushSem.availablePermits() >= 0) {
+ break;
+ }
+ Thread.sleep(1);
+ }
+ waitForSpans(ht, spans, 3);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that the REST code works when performing multiple flushes.
+ */
+ @Test(timeout = 60000)
+ public void testMultipleRestFlushes() throws Exception {
+ final Random rand = new Random(123);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testMultipleRestFlushes");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+ }});
+ WaitForFlushes injector = new WaitForFlushes(5);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 3);
+ while (true) {
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ if (injector.flushSem.availablePermits() >= 0) {
+ break;
+ }
+ Thread.sleep(1);
+ }
+ waitForSpans(ht, spans, 3);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that the packed code works when performing multiple flushes.
+ */
+ @Test(timeout = 60000)
+ public void testPackedRetryAfterFlushError() throws Exception {
+ final Random rand = new Random(123);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testPackedRetryAfterFlushError");
+ put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+ put(Conf.PACKED_KEY, "true");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+ put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+ }});
+ TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 3);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ waitForSpans(ht, spans);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+
+ /**
+ * Test that the REST code works when performing multiple flushes.
+ */
+ @Test(timeout = 60000)
+ public void testRestRetryAfterFlushError() throws Exception {
+ final Random rand = new Random(123);
+ final HTracedProcess ht = new HTracedProcess.Builder().build();
+ try {
+ HTraceConfiguration conf = HTraceConfiguration.fromMap(
+ new HashMap<String, String>() {{
+ put(TracerId.TRACER_ID_KEY, "testRestRetryAfterFlushError");
+ put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+ put(Conf.PACKED_KEY, "false");
+ put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+ put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+ }});
+ TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+ HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+ Span[] spans = TestUtil.randomSpans(rand, 3);
+ for (Span span : spans) {
+ rcvr.receiveSpan(span);
+ }
+ waitForSpans(ht, spans);
+ rcvr.close();
+ } finally {
+ ht.destroy();
+ }
+ }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
new file mode 100644
index 0000000..bf038f1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHTracedReceiverConf {
+ private static final Log LOG =
+ LogFactory.getLog(TestHTracedReceiverConf.class);
+
+ @Test(timeout = 60000)
+ public void testParseHostPort() throws Exception {
+ InetSocketAddress addr = new Conf(
+ HTraceConfiguration.fromKeyValuePairs(
+ Conf.ADDRESS_KEY, "example.com:8080")).endpoint;
+ Assert.assertEquals("example.com", addr.getHostName());
+ Assert.assertEquals(8080, addr.getPort());
+
+ addr = new Conf(
+ HTraceConfiguration.fromKeyValuePairs(
+ Conf.ADDRESS_KEY, "127.0.0.1:8081")).endpoint;
+ Assert.assertEquals("127.0.0.1", addr.getHostName());
+ Assert.assertEquals(8081, addr.getPort());
+
+ addr = new Conf(
+ HTraceConfiguration.fromKeyValuePairs(
+ Conf.ADDRESS_KEY, "[ff02:0:0:0:0:0:0:12]:9095")).endpoint;
+ Assert.assertEquals("ff02:0:0:0:0:0:0:12", addr.getHostName());
+ Assert.assertEquals(9095, addr.getPort());
+ }
+
+ private static void verifyFail(String hostPort) {
+ try {
+ new Conf(HTraceConfiguration.fromKeyValuePairs(
+ Conf.ADDRESS_KEY, hostPort));
+ Assert.fail("Expected bad host:port configuration " + hostPort +
+ " to fail, but it succeeded.");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testFailToParseHostPort() throws Exception {
+ verifyFail("localhost"); // no port
+ verifyFail("127.0.0.1"); // no port
+ verifyFail(":8080"); // no hostname
+ verifyFail("bob[ff02:0:0:0:0:0:0:12]:9095"); // bracket at incorrect place
+ }
+
+ @Test(timeout = 60000)
+ public void testGetIntArray() throws Exception {
+ int[] arr = Conf.getIntArray("");
+ Assert.assertEquals(0, arr.length);
+ arr = Conf.getIntArray("123");
+ Assert.assertEquals(1, arr.length);
+ Assert.assertEquals(123, arr[0]);
+ arr = Conf.getIntArray("1,2,3");
+ Assert.assertEquals(3, arr.length);
+ Assert.assertEquals(1, arr[0]);
+ Assert.assertEquals(2, arr[1]);
+ Assert.assertEquals(3, arr[2]);
+ arr = Conf.getIntArray(",-4,5,66,");
+ Assert.assertEquals(3, arr.length);
+ Assert.assertEquals(-4, arr[0]);
+ Assert.assertEquals(5, arr[1]);
+ Assert.assertEquals(66, arr[2]);
+ }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
new file mode 100644
index 0000000..ed7d904
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+
+public class TestPackedBuffer {
+ private static final Log LOG = LogFactory.getLog(TestPackedBuffer.class);
+
+ @Test(timeout = 60000)
+ public void testWriteReqFrame() throws Exception {
+ byte[] arr = new byte[PackedBuffer.HRPC_REQ_FRAME_LENGTH];
+ ByteBuffer bb = ByteBuffer.wrap(arr);
+ PackedBuffer buf = new PackedBuffer(bb);
+ PackedBuffer.writeReqFrame(bb, 1, 123, 456);
+ Assert.assertEquals(PackedBuffer.HRPC_REQ_FRAME_LENGTH, bb.position());
+ Assert.assertEquals("48 54 52 43 " +
+ "01 00 00 00 " +
+ "7b 00 00 00 00 00 00 00 " +
+ "c8 01 00 00",
+ buf.toHexString());
+ }
+
+ @Test(timeout = 60000)
+ public void testPackSpans() throws Exception {
+ Random rand = new Random(123);
+ byte[] arr = new byte[16384];
+ ByteBuffer bb = ByteBuffer.wrap(arr);
+ bb.limit(bb.capacity());
+ PackedBuffer buf = new PackedBuffer(bb);
+ final int NUM_TEST_SPANS = 5;
+ Span[] spans = new Span[NUM_TEST_SPANS];
+ for (int i = 0; i < NUM_TEST_SPANS; i++) {
+ spans[i] = TestUtil.randomSpan(rand);
+ }
+ for (int i = 0; i < NUM_TEST_SPANS; i++) {
+ buf.writeSpan(spans[i]);
+ }
+ LOG.info("wrote " + buf.toHexString());
+ MessagePack msgpack = new MessagePack(PackedBuffer.MSGPACK_CONF);
+ MessageUnpacker unpacker = msgpack.newUnpacker(arr, 0, bb.position());
+ Span[] respans = new Span[NUM_TEST_SPANS];
+ for (int i = 0; i < NUM_TEST_SPANS; i++) {
+ respans[i] = PackedBuffer.readSpan(unpacker);
+ }
+ for (int i = 0; i < NUM_TEST_SPANS; i++) {
+ Assert.assertEquals("Failed to read back span " + i,
+ spans[i].toJson(), respans[i].toJson());
+ }
+ }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
new file mode 100644
index 0000000..630a02a
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimeUtil {
+ /**
+ * Test that our deltaMs function can compute the time difference between any
+ * two monotonic times in milliseconds.
+ */
+ @Test(timeout = 60000)
+ public void testDeltaMs() throws Exception {
+ Assert.assertEquals(0, TimeUtil.deltaMs(0, 0));
+ Assert.assertEquals(1, TimeUtil.deltaMs(0, 1));
+ Assert.assertEquals(0, TimeUtil.deltaMs(1, 0));
+ Assert.assertEquals(10, TimeUtil.deltaMs(1000, 1010));
+ long minMs = TimeUnit.MILLISECONDS.convert(Long.MIN_VALUE,
+ TimeUnit.NANOSECONDS);
+ long maxMs = TimeUnit.MILLISECONDS.convert(Long.MAX_VALUE,
+ TimeUnit.NANOSECONDS);
+ Assert.assertEquals(10, TimeUtil.deltaMs(minMs, minMs + 10));
+ Assert.assertEquals(maxMs, TimeUtil.deltaMs(minMs, maxMs));
+ Assert.assertEquals(11, TimeUtil.deltaMs(maxMs - 10, minMs));
+ }
+}
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
deleted file mode 100644
index 74731fa..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-/**
- * Small util for making a data directory for tests to use when running tests. We put it up at
- * target/test-data/UUID. Create an instance of this class per unit test run and it will take
- * care of setting up the dirs for you. Pass what is returned here as location from which to
- * have daemons and tests dump data.
- * TODO: Add close on exit.
- */
-public class DataDir {
- private File baseTestDir = null;
- private File testDir = null;
-
- /**
- * System property key to get base test directory value
- */
- public static final String TEST_BASE_DIRECTORY_KEY = "test.data.base.dir";
-
- /**
- * Default base directory for test output.
- */
- public static final String TEST_BASE_DIRECTORY_DEFAULT = "target";
-
- public static final String TEST_BASE_DIRECTORY_NAME = "test-data";
-
- /**
- * @return Where to write test data on local filesystem; usually
- * {@link #TEST_BASE_DIRECTORY_DEFAULT}
- * Should not be used directly by the unit tests, hence its's private.
- * Unit test will use a subdirectory of this directory.
- * @see #setupDataTestDir()
- */
- private synchronized File getBaseTestDir() {
- if (this.baseTestDir != null) return this.baseTestDir;
- String testHome = System.getProperty(TEST_BASE_DIRECTORY_KEY, TEST_BASE_DIRECTORY_DEFAULT);
- this.baseTestDir = new File(testHome, TEST_BASE_DIRECTORY_NAME);
- return this.baseTestDir;
- }
-
- /**
- * @return Absolute path to the dir created by this instance.
- * @throws IOException
- */
- public synchronized File getDataDir() throws IOException {
- if (this.testDir != null) return this.testDir;
- this.testDir = new File(getBaseTestDir(), UUID.randomUUID().toString());
- if (!this.testDir.exists()) {
- if (!this.testDir.mkdirs()) throw new IOException("Failed mkdirs for " + this.testDir);
- }
- // Return absolute path. A relative passed to htraced will have it create data dirs relative
- // to its data dir rather than in it.
- return this.testDir.getAbsoluteFile();
- }
-
- /**
- * Fragile. Ugly. Presumes paths. Best we can do for now until htraced comes local to this module
- * and is moved out of src dir.
- * @param dataDir A datadir gotten from {@link #getDataDir()}
- * @return Top-level of the checkout.
- */
- public static File getTopLevelOfCheckout(final File dataDir) {
- // Need absolute else we run out of road when dir is relative to this module.
- File absolute = dataDir.getAbsoluteFile();
- // Check we are where we think we are.
- File testDataDir = absolute.getParentFile();
- if (!testDataDir.getName().equals(TEST_BASE_DIRECTORY_NAME)) {
- throw new IllegalArgumentException(dataDir.toString());
- }
- // Do another check.
- File targetDir = testDataDir.getParentFile();
- if (!targetDir.getName().equals(TEST_BASE_DIRECTORY_DEFAULT)) {
- throw new IllegalArgumentException(dataDir.toString());
- }
- // Back up last two dirs out of the htrace-htraced dir.
- return targetDir.getParentFile().getParentFile();
- }
-}
\ No newline at end of file
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
deleted file mode 100644
index 67e3a21..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test putting up an htraced and making sure it basically works.
- * Makes presumption about paths; where data is relative to the htraced binary, etc., encoded
- * in methods in the below.
- */
-public class TestHTracedProcess {
- private static final Log LOG =
- LogFactory.getLog(TestHTracedProcess.class);
- private DataDir testDir = null;
- private final int TIMEOUT = 10000;
-
- @Before
- public void setupTest() {
- this.testDir = new DataDir();
- }
-
- /*
- * Do a basic GET of the server info from the running htraced instance.
- */
- private String doGet(final URL url) throws IOException {
- URLConnection connection = url.openConnection();
- connection.setConnectTimeout(TIMEOUT);
- connection.setReadTimeout(TIMEOUT);
- connection.connect();
- StringBuffer sb = new StringBuffer();
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(connection.getInputStream()));
- try {
- String line = null;
- while ((line = reader.readLine()) != null) {
- System.out.println(line);
- sb.append(line);
- }
- } finally {
- reader.close();
- }
- return sb.toString();
- }
-
- /**
- * Put up an htraced instance and do a Get against /server/info.
- * @throws IOException
- * @throws InterruptedException
- */
- @Test (timeout=10000)
- public void testStartStopHTraced() throws Exception {
- HTracedProcess htraced = null;
- File dataDir = this.testDir.getDataDir();
- File topLevel = DataDir.getTopLevelOfCheckout(dataDir);
- try {
- htraced = new HTracedProcess(HTracedProcess.
- getPathToHTraceBinaryFromTopLevel(topLevel),
- dataDir, "localhost");
- LOG.info("Started HTracedProcess with REST server URL " +
- htraced.getHttpAddr());
- String str = doGet(new URL(
- "http://" + htraced.getHttpAddr() + "/server/info"));
- // Assert we go something back.
- assertTrue(str.contains("ReleaseVersion"));
- // Assert that the datadir is not empty.
- } finally {
- if (htraced != null) {
- htraced.destroy();
- System.out.println("ExitValue=" + htraced.waitFor());
- }
- }
- }
-}
\ No newline at end of file