Merge branch 'GSOC_HTRACE' of https://github.com/nisalanirmana/incubator-htrace
diff --git a/htrace-kudu/pom.xml b/htrace-kudu/pom.xml
new file mode 100644
index 0000000..19195e5
--- /dev/null
+++ b/htrace-kudu/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+license agreements. See the NOTICE file distributed with this work for additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>htrace-kudu</artifactId>
+ <packaging>jar</packaging>
+
+ <parent>
+ <artifactId>htrace</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ <version>4.2.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <name>htrace-kudu</name>
+ <description>
+ htrace-kudu is the tools to send tracing information
+ to an kudu database for analysis later.
+ </description>
+ <url>http://incubator.apache.org/projects/htrace.html</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kudu.version>0.9.1</kudu.version>
+ <commons.version>1.3.2</commons.version>
+ <jetty.version>9.2.13.v20150730</jetty.version>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>cdh.repo</id>
+ <name>Cloudera Repositories</name>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main</directory>
+ <includes>
+ <include>webapps/**</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <createDependencyReducedPom>${createDependencyReducedPom}</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.commons.logging</pattern>
+ <shadedPattern>org.apache.htrace.shaded.commons.logging</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <!-- explicitly define maven-deploy-plugin after other to force exec order -->
+ <artifactId>maven-deploy-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Module deps. -->
+ <dependency>
+ <groupId>org.apache.htrace</groupId>
+ <artifactId>htrace-core4</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.htrace</groupId>
+ <artifactId>htrace-core4</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <!-- Global deps. -->
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- KUDU specific deps. -->
+ <dependency>
+ <groupId>org.kududb</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.kududb</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>dist</id>
+ <build>
+ <plugins>
+ <plugin>
+ <!--Make it so assembly:single does nothing in here-->
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <skipAssembly>true</skipAssembly>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>doclint-disable</id>
+ <activation><jdk>[1.8,)</jdk></activation>
+ <properties>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </properties>
+ </profile>
+ </profiles>
+</project>
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
new file mode 100644
index 0000000..c13e7b4
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduClient.KuduClientBuilder;
+
+public class KuduClientConfiguration {
+
+ private final String host;
+ private final Integer port;
+ private final Integer workerCount;
+ private final Integer bossCount;
+ private final Boolean isStatisticsEnabled;
+ private final Long adminOperationTimeout;
+ private final Long operationTimeout;
+ private final Long socketReadTimeout;
+
+ public KuduClientConfiguration(String host,
+ Integer port,
+ Integer workerCount,
+ Integer bossCount,
+ Boolean isStatisticsEnabled,
+ Long adminOperationTimeout,
+ Long operationTimeout,
+ Long socketReadTimeout) {
+
+ this.host = host;
+ this.port = port;
+ this.workerCount = workerCount;
+ this.bossCount = bossCount;
+ this.isStatisticsEnabled = isStatisticsEnabled;
+ this.adminOperationTimeout = adminOperationTimeout;
+ this.operationTimeout = operationTimeout;
+ this.socketReadTimeout = socketReadTimeout;
+ }
+
+ public KuduClient buildClient() {
+ KuduClientBuilder builder = new KuduClient
+ .KuduClientBuilder(host.concat(":").concat(port.toString()));
+ if (workerCount != null) {
+ builder.workerCount(workerCount);
+ }
+ if (bossCount != null) {
+ builder.bossCount(bossCount);
+ }
+ if (isStatisticsEnabled != null && isStatisticsEnabled == false) {
+ builder.disableStatistics();
+ }
+ if (adminOperationTimeout != null) {
+ builder.defaultAdminOperationTimeoutMs(adminOperationTimeout);
+ }
+ if (operationTimeout != null) {
+ builder.defaultOperationTimeoutMs(operationTimeout);
+ }
+ if (socketReadTimeout != null) {
+ builder.defaultSocketReadTimeoutMs(socketReadTimeout);
+ }
+ return builder.build();
+ }
+
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
new file mode 100644
index 0000000..c7f7484
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+public class KuduReceiverConstants {
+
+ static final String KUDU_MASTER_HOST_KEY = "kudu.master.host";
+ static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
+ static final String KUDU_MASTER_PORT_KEY = "kudu.master.port";
+ static final String DEFAULT_KUDU_MASTER_PORT = "7051";
+ static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table";
+ static final String DEFAULT_KUDU_SPAN_TABLE = "span";
+ static final String KUDU_SPAN_PARENT_TABLE_KEY = "kudu.span.parent.table";
+ static final String DEFAULT_KUDU_SPAN_PARENT_TABLE = "span.parent";
+ static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = "kudu.span.timeline.annotation.table";
+ static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id";
+ static final String DEFAULT_KUDU_COLUMN_PARENT_ID_LOW = "parent_id_low";
+ static final String DEFAULT_KUDU_COLUMN_PARENT_ID_HIGH = "parent_id_high";
+ static final String DEFAULT_KUDU_COLUMN_PARENT_CHILD_SPANID = "parent_child_span_id";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid";
+ static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count";
+ static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count";
+ static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = "kudu.client.statistics.enabled";
+ static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = "kudu.client.timeout.admin.operation";
+ static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = "kudu.client.timeout.operation";
+ static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = "kudu.client.timeout.socket.read";
+
+}
+
+
+
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
new file mode 100644
index 0000000..5db017b
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Insert;
+import org.kududb.client.PartialRow;
+
+import java.io.IOException;
+
+public class KuduSpanReceiver extends SpanReceiver {
+
+ private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class);
+
+ private final KuduClientConfiguration clientConf;
+ private KuduSession session;
+ private KuduClient client;
+
+ private String table_span;
+ private String column_span_trace_id;
+ private String column_span_start_time;
+ private String column_span_stop_time;
+ private String column_span_span_id;
+ private String column_span_description;
+ private String column_span_parent;
+
+ private String table_timeline;
+ private String column_timeline_timeline_id;
+ private String column_timeline_time;
+ private String column_timeline_message;
+ private String column_timeline_span_id;
+
+ private String table_span_parent;
+ private String column_parent_id_low;
+ private String column_parent_id_high;
+ private String column_parent_child_span_id;
+
+ private KuduTable tableSpan;
+ private KuduTable tableTimeline;
+ private KuduTable tableParent;
+
+ public KuduSpanReceiver(HTraceConfiguration conf) {
+
+ String masterHost;
+ Integer masterPort;
+ Integer workerCount;
+ Integer bossCount;
+ Boolean isStatisticsEnabled;
+ Long adminOperationTimeout;
+ Long operationTimeout;
+ Long socketReadTimeout;
+
+ masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST);
+ masterPort = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT));
+
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
+ bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
+ } else {
+ bossCount = null;
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) {
+ workerCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY));
+ } else {
+ workerCount = null;
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != null) {
+ isStatisticsEnabled = Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY));
+ } else {
+ isStatisticsEnabled = null;
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != null) {
+ adminOperationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY));
+ } else {
+ adminOperationTimeout = null;
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != null) {
+ operationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY));
+ } else {
+ operationTimeout = null;
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != null) {
+ socketReadTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY));
+ } else {
+ socketReadTimeout = null;
+ }
+
+ this.clientConf = new KuduClientConfiguration(masterHost,
+ masterPort,
+ workerCount,
+ bossCount,
+ isStatisticsEnabled,
+ adminOperationTimeout,
+ operationTimeout,
+ socketReadTimeout);
+ //table names made configurable
+ this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
+ this.table_timeline = conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
+ this.table_span_parent = conf.get(KuduReceiverConstants.KUDU_SPAN_PARENT_TABLE_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_SPAN_PARENT_TABLE);
+ //default column names have used
+ this.column_span_trace_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID;
+ this.column_span_start_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME;
+ this.column_span_stop_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME;
+ this.column_span_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID;
+
+ this.column_parent_id_low = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_LOW;
+ this.column_parent_id_high = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_HIGH;
+ this.column_parent_child_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_CHILD_SPANID;
+
+ this.column_span_description = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION;
+ this.column_span_parent = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT;
+ this.column_timeline_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME;
+ this.column_timeline_message = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE;
+ this.column_timeline_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID;
+ this.column_timeline_timeline_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID;
+ //kudu backend session initialization
+ if (this.session == null) {
+ if (this.client == null) {
+ client = clientConf.buildClient();
+ }
+ session = client.newSession();
+ }
+ try {
+ tableSpan = client.openTable(table_span);
+ tableTimeline = client.openTable(table_timeline);
+ tableParent = client.openTable(table_span_parent);
+ } catch (java.lang.Exception ex) {
+ LOG.warn("Failed to open kudu tables to store Spans. " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (this.session != null) {
+ if (this.session.isClosed()) {
+ this.session.close();
+ }
+ this.client.close();
+ }
+ } catch (java.lang.Exception e) {
+ LOG.warn("Failed to close Kudu session. " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void receiveSpan(Span span) {
+ try {
+ Insert spanInsert = tableSpan.newInsert();
+ PartialRow spanRow = spanInsert.getRow();
+ spanRow.addLong(column_span_trace_id, span.getSpanId().getLow());
+ spanRow.addLong(column_span_start_time, span.getStartTimeMillis());
+ spanRow.addLong(column_span_stop_time, span.getStopTimeMillis());
+ spanRow.addLong(column_span_span_id, span.getSpanId().getHigh());
+ if (span.getParents().length == 0) {
+ spanRow.addBoolean(column_span_parent, true);
+ } else if (span.getParents().length > 0) {
+ for (int i = 0; i < span.getParents().length; i++) {
+ Insert parentInsert = tableParent.newInsert();
+ PartialRow parentRow = parentInsert.getRow();
+ parentRow.addLong(column_parent_id_low, span.getParents()[i].getLow());
+ parentRow.addLong(column_parent_id_high, span.getParents()[i].getHigh());
+ parentRow.addLong(column_parent_child_span_id, span.getSpanId().getLow());
+ session.apply(parentInsert);
+ }
+ spanRow.addBoolean(column_span_parent, false);
+ }
+ spanRow.addString(column_span_description, span.getDescription());
+ session.apply(spanInsert);
+ long annotationCounter = 0;
+ for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+ annotationCounter++;
+ Insert timelineInsert = tableTimeline.newInsert();
+ PartialRow timelineRow = timelineInsert.getRow();
+ timelineRow.addLong(column_timeline_timeline_id, span.getSpanId().getLow() + annotationCounter);
+ timelineRow.addLong(column_timeline_time, ta.getTime());
+ timelineRow.addString(column_timeline_message, ta.getMessage());
+ timelineRow.addLong(column_timeline_span_id, span.getSpanId().getLow());
+ session.apply(timelineInsert);
+ }
+ } catch (java.lang.Exception ex) {
+ LOG.error("Failed to write span to Kudu backend", ex);
+ } finally {
+ try {
+ session.flush();
+ } catch (java.lang.Exception ex) {
+ //Ignore
+ }
+ }
+ }
+
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduClientConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduClientConstants.java
new file mode 100644
index 0000000..5945ad6
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduClientConstants.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+
+public class KuduClientConstants {
+ static final String KUDU_MASTER_HOST_KEY = "kudu.master.host";
+ static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
+ static final String KUDU_MASTER_PORT_KEY = "kudu.master.port";
+ static final String DEFAULT_KUDU_MASTER_PORT = "7051";
+ static final String DEFAULT_KUDU_SPAN_TABLE = "span";
+ static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW = "parent_id_low";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH = "parent_id_high";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time";;
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message";
+ static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid";
+ static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count";
+ static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count";
+ static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = "kudu.client.statistics.enabled";
+ static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = "kudu.client.timeout.admin.operation";
+ static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = "kudu.client.timeout.operation";
+ static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = "kudu.client.timeout.socket.read";
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewer.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewer.java
new file mode 100644
index 0000000..d3f82d9
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewer.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.impl.KuduClientConfiguration;
+import org.kududb.ColumnSchema;
+import org.kududb.Type;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+
+import java.io.OutputStreamWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+public class KuduSpanViewer {
+
+ private static final Log LOG = LogFactory.getLog(KuduSpanViewer.class);
+ private static final String JSON_FIELD_TRACE_ID = "trace_id";
+ private static final String JSON_FIELD_PARENT_ID = "parent_id";
+ private static final String JSON_FIELD_START = "start";
+ private static final String JSON_FIELD_STOP = "stop";
+ private static final String JSON_FIELD_SPAN_ID = "span_id";
+ private static final String JSON_FIELD_DESCRIPTION = "description";
+ private static final String JSON_FIELD_TIMELINE = "timeline";
+ private static final String JSON_FIELD_TIMELINE_TIME = "time";
+ private static final String JSON_FIELD_TIMELINE_MESSEGE = "message";
+ private KuduClient client;
+ private KuduClientConfiguration clientConf;
+
+
+ public KuduSpanViewer(HTraceConfiguration conf) {
+ String masterHost;
+ Integer masterPort;
+ Integer workerCount;
+ Integer bossCount;
+ Boolean isStatisticsEnabled;
+ Long adminOperationTimeout;
+ Long operationTimeout;
+ Long socketReadTimeout;
+ masterHost = conf.get(KuduClientConstants.KUDU_MASTER_HOST_KEY,
+ KuduClientConstants.DEFAULT_KUDU_MASTER_HOST);
+ masterPort = Integer.valueOf(conf.get(KuduClientConstants.KUDU_MASTER_PORT_KEY,
+ KuduClientConstants.DEFAULT_KUDU_MASTER_PORT));
+
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
+ bossCount = Integer.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
+ } else {
+ bossCount = null;
+ }
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) {
+ workerCount = Integer.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_WORKER_COUNT_KEY));
+ } else {
+ workerCount = null;
+ }
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != null) {
+ isStatisticsEnabled = Boolean.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY));
+ } else {
+ isStatisticsEnabled = null;
+ }
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != null) {
+ adminOperationTimeout = Long.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY));
+ } else {
+ adminOperationTimeout = null;
+ }
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != null) {
+ operationTimeout = Long.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY));
+ } else {
+ operationTimeout = null;
+ }
+ if (conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != null) {
+ socketReadTimeout = Long.valueOf(conf.get(KuduClientConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY));
+ } else {
+ socketReadTimeout = null;
+ }
+ this.clientConf = new KuduClientConfiguration(masterHost,
+ masterPort,
+ workerCount,
+ bossCount,
+ isStatisticsEnabled,
+ adminOperationTimeout,
+ operationTimeout,
+ socketReadTimeout);
+ this.client = clientConf.buildClient();
+ }
+
+ public List<Span> getSpans(long spanId) throws Exception {
+ List<Span> spans = new ArrayList<Span>();
+ List<String> spanColumns = new ArrayList<>();
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW);
+ KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduClientConstants.DEFAULT_KUDU_SPAN_TABLE))
+ .setProjectedColumnNames(spanColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID, Type.INT64)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, spanId))
+ .build();
+ MilliSpan dbSpan;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator results = scanner.nextRows();
+ while (results.hasNext()) {
+ RowResult result = results.next();
+ long traceId = result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .spanId(new SpanId(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID),
+ result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID)))
+ .description(result.getString(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
+ .begin(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
+ .end(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
+ if (!(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH) == 0 &&
+ result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW) == 0)) {
+ SpanId[] parents = new SpanId[1];
+ parents[0] = new SpanId(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH),
+ result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW));
+ builder.parents(parents);
+ }
+ List<String> timelineColumns = new ArrayList<>();
+ timelineColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
+ timelineColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
+ KuduScanner timelineScanner = client
+ .newScannerBuilder(client.openTable(KuduClientConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE))
+ .setProjectedColumnNames(timelineColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID, Type.INT64)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, traceId))
+ .build();
+ List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+ while (timelineScanner.hasMoreRows()) {
+ RowResultIterator timelineResults = timelineScanner.nextRows();
+ while (timelineResults.hasNext()) {
+ RowResult timelineRow = timelineResults.next();
+ timelineList.add(new TimelineAnnotation
+ (timelineRow.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
+ timelineRow.getString(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
+ }
+ }
+ builder.timeline(timelineList);
+ dbSpan = builder.build();
+ spans.add(dbSpan);
+ }
+ }
+ return spans;
+ }
+
+ public List<Span> getRootSpans() throws Exception {
+ List<Span> spans = new ArrayList<Span>();
+ List<String> spanColumns = new ArrayList<>();
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
+ spanColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+ KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduClientConstants.DEFAULT_KUDU_SPAN_TABLE))
+ .setProjectedColumnNames(spanColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT, Type.BOOL)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, true))
+ .build();
+ MilliSpan dbSpan;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator results = scanner.nextRows();
+ while (results.hasNext()) {
+ RowResult result = results.next();
+ long traceId = result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .spanId(new SpanId(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID),
+ result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID)))
+ .description(result.getString(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
+ .begin(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
+ .end(result.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
+ List<String> timelineColumns = new ArrayList<>();
+ timelineColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
+ timelineColumns.add(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
+ KuduScanner timelineScanner = client
+ .newScannerBuilder(client.openTable(KuduClientConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE))
+ .setProjectedColumnNames(timelineColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID, Type.INT64)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, traceId))
+ .build();
+ List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+ while (timelineScanner.hasMoreRows()) {
+ RowResultIterator timelineResults = timelineScanner.nextRows();
+ while (timelineResults.hasNext()) {
+ RowResult timelineRow = timelineResults.next();
+ timelineList.add(new TimelineAnnotation
+ (timelineRow.getLong(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
+ timelineRow.getString(KuduClientConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
+ }
+ }
+ builder.timeline(timelineList);
+ dbSpan = builder.build();
+ spans.add(dbSpan);
+ }
+ }
+ return spans;
+ }
+
+ public void close() {
+ try {
+ this.client.close();
+ } catch (java.lang.Exception ex) {
+ LOG.error("Couln't close the Kudu DB client connection.", ex);
+ }
+ }
+
+
+ public static String toJsonString(final Span span) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ OutputStreamWriter writer =
+ new OutputStreamWriter(out, Charset.defaultCharset());
+ appendJsonString(span, writer);
+ writer.flush();
+ out.flush();
+ return out.toString();
+ }
+
+ public static void appendJsonString(Span span, OutputStreamWriter writer) throws IOException {
+ writer.append("{");
+ appendField(JSON_FIELD_TRACE_ID, span.getSpanId().getLow(), writer);
+ appendField(JSON_FIELD_SPAN_ID, span.getSpanId().getHigh(), writer);
+ appendField(JSON_FIELD_DESCRIPTION, span.getDescription(), writer);
+ if (span.getParents().length != 0) {
+ appendField(JSON_FIELD_PARENT_ID, span.getParents()[0].getLow(), writer);
+ }
+ appendField(JSON_FIELD_START, span.getStartTimeMillis(), writer);
+ appendField(JSON_FIELD_STOP, span.getStopTimeMillis(), writer);
+ if (!span.getTimelineAnnotations().isEmpty()) {
+ writer.append("\"");
+ writer.append(JSON_FIELD_TIMELINE);
+ writer.append("\"");
+ writer.append(":");
+ writer.append("[");
+ for (TimelineAnnotation annotation : span.getTimelineAnnotations()) {
+ writer.append("{");
+ appendField(JSON_FIELD_TIMELINE_TIME, annotation.getTime(), writer);
+ appendField(JSON_FIELD_TIMELINE_MESSEGE, annotation.getMessage(), writer);
+ writer.append("}");
+ }
+ writer.append("]");
+ }
+ writer.append("}");
+ }
+
+ private static void appendField(String field,
+ Object value,
+ OutputStreamWriter writer) throws IOException {
+ writer.append("\"");
+ writer.append(field);
+ writer.append("\"");
+ writer.append(":");
+ appendStringValue(value.toString(), writer);
+ writer.append(",");
+ }
+
+ private static void appendStringValue(String value,
+ OutputStreamWriter writer) throws IOException {
+ writer.append("\"");
+ writer.append(value.toString());
+ writer.append("\"");
+ }
+
+}
\ No newline at end of file
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerRunner.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerRunner.java
new file mode 100644
index 0000000..e530c8a
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerRunner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+public class KuduSpanViewerRunner extends Thread {
+ private HTraceConfiguration conf;
+
+ public KuduSpanViewerRunner(HTraceConfiguration conf){
+ this.conf = conf;
+ }
+
+ public void run(){
+ try {
+ new KuduSpanViewerServer().run(conf);
+ }catch (java.lang.Exception ex){
+ //Ignore
+ }
+ }
+
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerServer.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerServer.java
new file mode 100644
index 0000000..d9b6c6a
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerServer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+public class KuduSpanViewerServer {
+
+ private static final Log LOG = LogFactory.getLog(KuduSpanViewerServer.class);
+ public static final String HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT = "0.0.0.0:17000";
+ private Server server;
+
+ public void stop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ LOG.info("Embedded jetty server stopped successfully.");
+ }
+
+ public int run(HTraceConfiguration conf) throws Exception {
+ URI uri = new URI("http://" + HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT);
+ InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
+ server = new Server(addr);
+ ServletContextHandler root =
+ new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
+ server.setHandler(root);
+ root.addServlet(new ServletHolder(new DefaultServlet()),
+ "/");
+ root.addServlet(new ServletHolder(new KuduSpanViewerTracesServlet(conf)),
+ "/gettraces");
+ root.addServlet(new ServletHolder(new KuduSpanViewerSpansServlet(conf)),
+ "/getspans/*");
+
+ server.start();
+ server.join();
+ return 0;
+ }
+
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerSpansServlet.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerSpansServlet.java
new file mode 100644
index 0000000..662c21b
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerSpansServlet.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+public class KuduSpanViewerSpansServlet extends HttpServlet {
+
+ private static final Log LOG = LogFactory.getLog(KuduSpanViewerSpansServlet.class);
+ public static final String PREFIX = "/getspans";
+ private static final ThreadLocal<KuduSpanViewer> kuduSpanViewer =
+ new ThreadLocal<KuduSpanViewer>() {
+ @Override
+ protected KuduSpanViewer initialValue() {
+ return null;
+ }
+ };
+ private HTraceConfiguration conf;
+
+ public KuduSpanViewerSpansServlet(HTraceConfiguration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ final String path = request.getRequestURI().substring(PREFIX.length());
+ if (path == null || path.length() == 0) {
+ response.setContentType("text/plain");
+ response.getWriter().print("Invalid input");
+ return;
+ }
+ KuduSpanViewer viewer = kuduSpanViewer.get();
+
+ if (viewer == null) {
+ viewer = new KuduSpanViewer(conf);
+ kuduSpanViewer.set(viewer);
+ }
+ Long traceid = Long.parseLong(path.substring(1));
+ response.setContentType("application/javascript");
+ PrintWriter out = response.getWriter();
+ out.print("[");
+ boolean first = true;
+ try {
+ for (Span span : viewer.getSpans(traceid)) {
+ if (first) {
+ first = false;
+ } else {
+ out.print(",");
+ }
+ out.print(KuduSpanViewer.toJsonString(span));
+ }
+ } catch (java.lang.Exception ex) {
+ LOG.error("Exception occured while retrieving spans from Kudu Backend.");
+ }
+ out.print("]");
+ }
+
+ @Override
+ public void init() throws ServletException {
+ }
+
+ @Override
+ public void destroy() {
+ KuduSpanViewer viewer = kuduSpanViewer.get();
+ if (viewer != null) {
+ viewer.close();
+ }
+ }
+}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerTracesServlet.java b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerTracesServlet.java
new file mode 100644
index 0000000..b296c28
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/viewer/KuduSpanViewerTracesServlet.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+
+public class KuduSpanViewerTracesServlet extends HttpServlet {
+ private static final Log LOG = LogFactory.getLog(KuduSpanViewerTracesServlet.class);
+ public static final String PREFIX = "/gettraces";
+ private static final ThreadLocal<KuduSpanViewer> kuduSpanViewer =
+ new ThreadLocal<KuduSpanViewer>() {
+ @Override
+ protected KuduSpanViewer initialValue() {
+ return null;
+ }
+ };
+ private HTraceConfiguration conf;
+
+ public KuduSpanViewerTracesServlet(HTraceConfiguration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ KuduSpanViewer viewer = kuduSpanViewer.get();
+ if (viewer == null) {
+ viewer = new KuduSpanViewer(conf);
+ kuduSpanViewer.set(viewer);
+ }
+ response.setContentType("application/javascript");
+ PrintWriter out = response.getWriter();
+ out.print("[");
+ boolean first = true;
+ try {
+ for (Span span : viewer.getRootSpans()) {
+ if (first) {
+ first = false;
+ } else {
+ out.print(",");
+ }
+ out.print(KuduSpanViewer.toJsonString(span));
+ }
+ } catch (java.lang.Exception ex) {
+ LOG.error("Exception occured while retrieving spans from Kudu Backend.");
+ }
+ out.print("]");
+ }
+
+ @Override
+ public void init() throws ServletException {
+ }
+
+ @Override
+ public void destroy() {
+ KuduSpanViewer viewer = kuduSpanViewer.get();
+ if (viewer != null) {
+ viewer.close();
+ }
+ }
+}
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
new file mode 100644
index 0000000..4ac3657
--- /dev/null
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.ColumnSchema;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduClient;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.RowResult;
+import org.kududb.client.KuduPredicate;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestKuduSpanReceiver extends BaseKuduTest {
+
+ private static final String BIN_DIR_PROP = "binDir";
+ private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
+ //set kudu binary location and enable test execution from here
+ private static final boolean TEST_ENABLE = false;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ if (TEST_ENABLE) {
+ System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
+ BaseKuduTest.setUpBeforeClass();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if(TEST_ENABLE) {
+ BaseKuduTest.tearDownAfterClass();
+ }
+ }
+
+ private void createTable() throws Exception {
+ KuduClient client = BaseKuduTest.syncClient;
+ List<ColumnSchema> span_columns = new ArrayList();
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID,
+ Type.INT64)
+ .key(true)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
+ Type.BOOL)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION,
+ Type.STRING)
+ .build());
+
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ Schema schema = new Schema(span_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeys));
+
+ List<ColumnSchema> timeline_columns = new ArrayList();
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64)
+ .key(true)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME,
+ Type.INT64)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID,
+ Type.INT64)
+ .build());
+ List<String> rangeKeysTimeline = new ArrayList<>();
+ rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
+ Schema timelineSchema = new Schema(timeline_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE, timelineSchema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline));
+
+ List<ColumnSchema> parent_columns = new ArrayList();
+ parent_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_LOW, Type.INT64)
+ .key(true)
+ .build());
+ parent_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_HIGH,
+ Type.INT64)
+ .build());
+ parent_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_CHILD_SPANID,
+ Type.INT64)
+ .build());
+ List<String> rangeKeysParent= new ArrayList<>();
+ rangeKeysParent.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_LOW);
+ Schema parentSchema = new Schema(parent_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_PARENT_TABLE, parentSchema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeysParent));
+
+
+ }
+
+ @Ignore
+ @Test
+ public void TestKuduSpanReceiver() throws Exception {
+ createTable();
+ Tracer tracer = new Tracer.Builder().
+ name("testKuduSpanReceiver").
+ tracerPool(new TracerPool("testKuduSpanReceiver")).
+ conf(HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
+ KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
+ .build();
+ TraceScope scope = tracer.newScope("testKuduScope");
+ scope.addTimelineAnnotation("test");
+ Span testSpan = scope.getSpan();
+ TraceScope childScope = tracer.newScope("testKuduChildScope", new SpanId(100,200));
+ Span childScopeSpan = childScope.getSpan();
+ childScope.close();
+ scope.close();
+ tracer.close();
+ KuduClient client = BaseKuduTest.syncClient;
+ List<String> spanColumns = new ArrayList<>();
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+ KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE))
+ .setProjectedColumnNames(spanColumns)
+ .build();
+ MilliSpan dbSpan = null;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator results = scanner.nextRows();
+ while (results.hasNext()) {
+ RowResult result = results.next();
+ long traceId = result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID),
+ result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID)))
+ .description(result.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
+ .begin(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
+ .end(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
+
+ List<String> parentColumns = new ArrayList<>();
+ parentColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_LOW);
+ parentColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_HIGH);
+ parentColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_CHILD_SPANID);
+ KuduScanner parentScanner = client
+ .newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_PARENT_TABLE))
+ .setProjectedColumnNames(parentColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_CHILD_SPANID, Type.INT64)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, traceId))
+ .build();
+ List<SpanId> parentList = new LinkedList<SpanId>();
+ while (parentScanner.hasMoreRows()) {
+ RowResultIterator parentResults = parentScanner.nextRows();
+ while (parentResults.hasNext()) {
+ RowResult parentRow = parentResults.next();
+ parentList.add(new SpanId(
+ parentRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_HIGH),
+ parentRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_PARENT_ID_LOW)));
+ }
+ }
+ SpanId[] array = new SpanId[parentList.size()];
+ parentList.toArray(array);
+ builder.parents(array);
+ List<String> timelineColumns = new ArrayList<>();
+ timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
+ timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
+ KuduScanner timelineScanner = client
+ .newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE))
+ .setProjectedColumnNames(timelineColumns)
+ .addPredicate(KuduPredicate
+ .newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID, Type.INT64)
+ .build(), KuduPredicate.ComparisonOp.EQUAL, traceId))
+ .build();
+ List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+ while (timelineScanner.hasMoreRows()) {
+ RowResultIterator timelineResults = timelineScanner.nextRows();
+ while (timelineResults.hasNext()) {
+ RowResult timelineRow = timelineResults.next();
+ timelineList.add(new TimelineAnnotation
+ (timelineRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
+ timelineRow.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
+ }
+ }
+ builder.timeline(timelineList);
+ dbSpan = builder.build();
+ break;
+ }
+ }
+ Assert.assertEquals(testSpan.getSpanId().getHigh(), dbSpan.getSpanId().getHigh());
+ Assert.assertEquals(testSpan.getSpanId().getLow(), dbSpan.getSpanId().getLow());
+ Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStartTimeMillis());
+ Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStopTimeMillis());
+ Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription());
+ Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getMessage(),
+ dbSpan.getTimelineAnnotations().get(0).getMessage());
+ Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getTime(),
+ dbSpan.getTimelineAnnotations().get(0).getTime());
+ syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
+ syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
+ }
+
+}
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
new file mode 100644
index 0000000..c32a3bd
--- /dev/null
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.apache.htrace.viewer.KuduSpanViewer;
+import org.junit.*;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduClient;
+import org.kududb.client.CreateTableOptions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestKuduSpanViewer extends BaseKuduTest {
+
+ private static final String BIN_DIR_PROP = "binDir";
+ private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
+ //set kudu binary location and enable test execution from here
+ private static final boolean TEST_ENABLE = false;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ if (TEST_ENABLE) {
+ System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
+ BaseKuduTest.setUpBeforeClass();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if(TEST_ENABLE) {
+ BaseKuduTest.tearDownAfterClass();
+ }
+ }
+
+ private void createTable() throws Exception {
+ KuduClient client = BaseKuduTest.syncClient;
+ List<ColumnSchema> span_columns = new ArrayList();
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID,
+ Type.INT64)
+ .key(true)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
+ Type.BOOL)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION,
+ Type.STRING)
+ .build());
+
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ Schema schema = new Schema(span_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeys));
+
+ List<ColumnSchema> timeline_columns = new ArrayList();
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64)
+ .key(true)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME,
+ Type.INT64)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID,
+ Type.INT64)
+ .build());
+ List<String> rangeKeysTimeline = new ArrayList<>();
+ rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
+ Schema timelineSchema = new Schema(timeline_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE, timelineSchema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline));
+ }
+
+
+ @Test
+ public void testSpanToJson() {
+ SpanId[] parent = new SpanId[1];
+ parent[0] = new SpanId(1,1);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .parents(parent)
+ .begin(1)
+ .end(2)
+ .spanId(new SpanId(10,20))
+ .description("description");
+ List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+ for (int i = 0; i < 3; i++) {
+ timelineList.add(new TimelineAnnotation(i,"message" + i));
+ }
+ builder.timeline(timelineList);
+ Span span = builder.build();
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\",\"parent_id\":\"1\"," +
+ "\"start\":\"1\",\"stop\":\"2\",\"timeline\":[{\"time\":\"0\",\"message\":\"message0\",}{\"time\":\"1\"," +
+ "\"message\":\"message1\",}{\"time\":\"2\",\"message\":\"message2\",}]}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSpanWithoutTimelineToJson() {
+ SpanId[] parent = new SpanId[1];
+ parent[0] = new SpanId(200,111);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .parents(parent)
+ .begin(1)
+ .end(2)
+ .spanId(new SpanId(10,20))
+ .tracerId("pid")
+ .description("description");
+ Span span = builder.build();
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\"," +
+ "\"parent_id\":\"111\",\"start\":\"1\",\"stop\":\"2\",}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+
+ @Ignore
+ @Test
+ public void TestKuduSpanViewer() throws Exception {
+ createTable();
+ Tracer tracer = new Tracer.Builder().
+ name("testKuduSpanReceiver").
+ tracerPool(new TracerPool("testKuduSpanReceiver")).
+ conf(HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
+ KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
+ .build();
+ TraceScope scope = tracer.newScope("testKuduScope");
+ scope.addTimelineAnnotation("test");
+ Span testSpan = scope.getSpan();
+ TraceScope childScope = tracer.newScope("testKuduChildScope", new SpanId(100,200));
+ Span childScopeSpan = childScope.getSpan();
+ childScope.addTimelineAnnotation("testChild");
+ childScope.close();
+ scope.close();
+ tracer.close();
+ HTraceConfiguration conf = HTraceConfiguration
+ .fromKeyValuePairs(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+ BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]);
+ KuduSpanViewer viewer = new KuduSpanViewer(conf);
+ List<Span> list = viewer.getRootSpans();
+ Assert.assertEquals(list.size(), 1);
+ Span span = viewer.getRootSpans().get(0);
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected = KuduSpanViewer.toJsonString(testSpan);
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ List<Span> list2 = viewer.getSpans(span.getSpanId().getHigh());
+ Assert.assertEquals(list2.size(), 2);
+ Span span2 = list2.get(0);
+ try {
+ String json = KuduSpanViewer.toJsonString(span2);
+ String expected = null;
+ if(span2.getParents().length != 0) {
+ expected = KuduSpanViewer.toJsonString(childScopeSpan);
+ } else {
+ expected = KuduSpanViewer.toJsonString(testSpan);
+ }
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ Span span3 = list2.get(1);
+ try {
+ String json = KuduSpanViewer.toJsonString(span3);
+ String expected = null;
+ if(span3.getParents().length != 0) {
+ expected = KuduSpanViewer.toJsonString(childScopeSpan);
+ } else {
+ expected = KuduSpanViewer.toJsonString(testSpan);
+ }
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+}
diff --git a/htrace-kudu/src/test/resources/log4j.properties b/htrace-kudu/src/test/resources/log4j.properties
new file mode 100644
index 0000000..416c975
--- /dev/null
+++ b/htrace-kudu/src/test/resources/log4j.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# By default, everything goes to console and file
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.appender.A1.ImmediateFlush=true
+
+log4j.logger.org.apache.htrace=INFO, A1
diff --git a/pom.xml b/pom.xml
index 44e850a..86eab9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,6 +35,7 @@
<module>htrace-hbase</module>
<module>htrace-flume</module>
<module>htrace-htraced</module>
+ <module>htrace-kudu</module>
</modules>
<scm>