adding review changes according to Colin
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
index 25d6fc4..c13e7b4 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -23,7 +23,7 @@
public class KuduClientConfiguration {
private final String host;
- private final String port;
+ private final Integer port;
private final Integer workerCount;
private final Integer bossCount;
private final Boolean isStatisticsEnabled;
@@ -32,7 +32,7 @@
private final Long socketReadTimeout;
public KuduClientConfiguration(String host,
- String port,
+ Integer port,
Integer workerCount,
Integer bossCount,
Boolean isStatisticsEnabled,
@@ -52,7 +52,7 @@
public KuduClient buildClient() {
KuduClientBuilder builder = new KuduClient
- .KuduClientBuilder(host.concat(":").concat(port));
+ .KuduClientBuilder(host.concat(":").concat(port.toString()));
if (workerCount != null) {
builder.workerCount(workerCount);
}
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
index be98311..805ec80 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
@@ -23,39 +23,21 @@
static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
static final String KUDU_MASTER_PORT_KEY = "kudu.master.port";
static final String DEFAULT_KUDU_MASTER_PORT = "7051";
- static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "kudu.span.queue.size";
- static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000;
static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table";
static final String DEFAULT_KUDU_SPAN_TABLE = "span";
static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = "kudu.span.timeline.annotation.table";
static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline";
- static final String MAX_SPAN_BATCH_SIZE_KEY = "kudu.batch.size";
- static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
- static final String NUM_PARALLEL_THREADS_KEY = "kudu.num.threads";
- static final int DEFAULT_NUM_PARALLEL_THREADS = 1;
- static final String KUDU_COLUMN_SPAN_TRACE_ID_KEY = "kudu.column.span.traceid";
static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id";
- static final String KUDU_COLUMN_SPAN_START_TIME_KEY = "kudu.column.span.starttime";
static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time";
- static final String KUDU_COLUMN_SPAN_STOP_TIME_KEY = "kudu.column.span.stoptime";
static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time";
- static final String KUDU_COLUMN_SPAN_SPAN_ID_KEY = "kudu.column.span.spanid";
static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id";
- static final String KUDU_COLUMN_SPAN_PROCESS_ID_KEY = "kudu.column.span.processid";
- static final String DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID = "process_id";
- static final String KUDU_COLUMN_SPAN_PARENT_ID_KEY = "kudu.column.span.parentid";
- static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID = "parent_id";
- static final String KUDU_COLUMN_SPAN_DESCRIPTION_KEY = "kudu.column.span.description";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW = "parent_id_low";
+ static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH = "parent_id_high";
static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description";
- static final String KUDU_COLUMN_SPAN_PARENT_KEY = "kudu.column.span.parent";
static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent";
- static final String KUDU_COLUMN_TIMELINE_TIME_KEY = "kudu.column.timeline.time";
static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time";
- static final String KUDU_COLUMN_TIMELINE_MESSAGE_KEY = "kudu.column.timeline.message";
static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message";
- static final String KUDU_COLUMN_TIMELINE_SPANID_KEY = "kudu.column.timeline.spanid";
static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid";
- static final String KUDU_COLUMN_TIMELINE_TIMELINEID_KEY = "kudu.column.timeline.timelineid";
static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid";
static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count";
static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count";
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
index 46c324a..745f24d 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -30,49 +30,22 @@
import org.kududb.client.PartialRow;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
public class KuduSpanReceiver extends SpanReceiver {
private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class);
- private static final int SHUTDOWN_TIMEOUT = 30;
- private static final int MAX_ERRORS = 10;
- private final BlockingQueue<Span> queue;
- private final AtomicBoolean running = new AtomicBoolean(true);
private final KuduClientConfiguration clientConf;
- private final int maxSpanBatchSize;
- private final ThreadFactory threadFactory = new ThreadFactory() {
- private final AtomicLong receiverIndex = new AtomicLong(0);
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable);
- thread.setDaemon(true);
- thread.setName(String.format("kuduSpanReceiver-%d",
- receiverIndex.getAndIncrement()));
- return thread;
- }
- };
-
- private ExecutorService service;
+ private KuduSession session;
+ private KuduClient client;
private String table_span;
private String column_span_trace_id;
private String column_span_start_time;
private String column_span_stop_time;
private String column_span_span_id;
- private String column_span_process_id;
- private String column_span_parent_id;
+ private String column_span_parent_id_low;
+ private String column_span_parent_id_high;
private String column_span_description;
private String column_span_parent;
@@ -85,7 +58,7 @@
public KuduSpanReceiver(HTraceConfiguration conf) {
String masterHost;
- String masterPort;
+ Integer masterPort;
Integer workerCount;
Integer bossCount;
Boolean isStatisticsEnabled;
@@ -95,8 +68,8 @@
masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST);
- masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT);
+ masterPort = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT));
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
@@ -137,186 +110,86 @@
adminOperationTimeout,
operationTimeout,
socketReadTimeout);
-
- this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
- KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
-
+ //table names made configurable
this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
- this.table_timeline= conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
+ this.table_timeline = conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
-
- this.column_span_trace_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
- this.column_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
- this.column_span_stop_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
- this.column_span_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
- this.column_span_process_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID);
- this.column_span_parent_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID);
- this.column_span_description = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
- this.column_span_parent = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT);
- this.column_timeline_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
- this.column_timeline_message = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
- this.column_timeline_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID);
- this.column_timeline_timeline_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY,
- KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
-
- this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY,
- KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
- if (this.service != null) {
- this.service.shutdownNow();
- this.service = null;
- }
- int numThreads = conf.getInt(KuduReceiverConstants.NUM_PARALLEL_THREADS_KEY,
- KuduReceiverConstants.DEFAULT_NUM_PARALLEL_THREADS);
- this.service = Executors.newFixedThreadPool(numThreads, threadFactory);
- for (int i = 0; i < numThreads; i++) {
- this.service.submit(new KuduSpanReceiver.WriteSpanRunnable());
+ //default column names have used
+ this.column_span_trace_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID;
+ this.column_span_start_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME;
+ this.column_span_stop_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME;
+ this.column_span_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID;
+ this.column_span_parent_id_low = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW;
+ this.column_span_parent_id_high = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH;
+ this.column_span_description = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION;
+ this.column_span_parent = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT;
+ this.column_timeline_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME;
+ this.column_timeline_message = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE;
+ this.column_timeline_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID;
+ this.column_timeline_timeline_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID;
+ //kudu backend session initialization
+ if (this.session == null) {
+ if (this.client == null) {
+ client = clientConf.buildClient();
+ }
+ session = client.newSession();
}
}
@Override
public void close() throws IOException {
- running.set(false);
- service.shutdown();
try {
- if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
- LOG.error("Timeout " + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
- " reached while shutting worker threads which process enqued spans." +
- " Enqueued spans which are left in blocking queue is dropped.");
+ if (this.session != null) {
+ if (this.session.isClosed()) {
+ this.session.close();
+ }
+ this.client.close();
}
- } catch (InterruptedException e) {
- LOG.warn("Interrupted exception occured while terminating thread service executor.", e);
+ } catch (java.lang.Exception e) {
+ LOG.warn("Failed to close Kudu session. " + e.getMessage());
}
}
@Override
public void receiveSpan(Span span) {
- if (running.get()) {
+ try {
+ KuduTable tableSpan = client.openTable(table_span);
+ Insert spanInsert = tableSpan.newInsert();
+ PartialRow spanRow = spanInsert.getRow();
+ spanRow.addLong(column_span_trace_id, span.getSpanId().getLow());
+ spanRow.addLong(column_span_start_time, span.getStartTimeMillis());
+ spanRow.addLong(column_span_stop_time, span.getStopTimeMillis());
+ spanRow.addLong(column_span_span_id, span.getSpanId().getHigh());
+ if (span.getParents().length == 0) {
+ spanRow.addLong(column_span_parent_id_low, 0);
+ spanRow.addLong(column_span_parent_id_high, 0);
+ spanRow.addBoolean(column_span_parent, true);
+ } else if (span.getParents().length > 0) {
+ spanRow.addLong(column_span_parent_id_low, span.getParents()[0].getLow());
+ spanRow.addLong(column_span_parent_id_high, span.getParents()[0].getHigh());
+ spanRow.addBoolean(column_span_parent, false);
+ }
+ spanRow.addString(column_span_description, span.getDescription());
+ session.apply(spanInsert);
+ long annotationCounter = 0;
+ for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+ annotationCounter++;
+ KuduTable tableTimeline = client.openTable(table_timeline);
+ Insert timelineInsert = tableTimeline.newInsert();
+ PartialRow timelineRow = timelineInsert.getRow();
+ timelineRow.addLong(column_timeline_timeline_id, span.getSpanId().getLow() + annotationCounter);
+ timelineRow.addLong(column_timeline_time, ta.getTime());
+ timelineRow.addString(column_timeline_message, ta.getMessage());
+ timelineRow.addLong(column_timeline_span_id, span.getSpanId().getLow());
+ session.apply(timelineInsert);
+ }
+ } catch (java.lang.Exception ex) {
+ LOG.error("Failed to write span to Kudu backend", ex);
+ } finally {
try {
- this.queue.add(span);
- } catch (IllegalStateException e) {
- LOG.error("Error trying to enqueue span ("
- + span.getDescription()
- + ") to the queue. Blocking Queue is currently reached its capacity.");
- }
- }
- }
-
- private class WriteSpanRunnable implements Runnable {
-
- private KuduSession session;
- private KuduClient client;
-
- @Override
- public void run() {
- List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
- long errorCount = 0;
- while (running.get() || queue.size() > 0) {
- Span firstSpan = null;
- try {
- firstSpan = queue.poll(1, TimeUnit.SECONDS);
- if (firstSpan != null) {
- dequeuedSpans.add(firstSpan);
- queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted Exception occurred while polling to " +
- "retrieve first span from blocking queue");
- }
- startSession();
- if (dequeuedSpans.isEmpty()) {
- try {
- this.session.flush();
- } catch (java.lang.Exception e) {
- LOG.error("Failed to flush writes to Kudu.");
- closeSession();
- }
- continue;
- }
- try {
- for (Span span : dequeuedSpans) {
- KuduTable tableSpan = client.openTable(table_span);
- Insert spanInsert = tableSpan.newInsert();
- PartialRow spanRow = spanInsert.getRow();
- spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh());
- spanRow.addLong(column_span_start_time,span.getStartTimeMillis());
- spanRow.addLong(column_span_stop_time,span.getStopTimeMillis());
- spanRow.addLong(column_span_span_id,span.getSpanId().getLow());
- spanRow.addString(column_span_process_id,span.getTracerId());
- if (span.getParents().length == 0) {
- spanRow.addLong(column_span_parent_id,0);
- spanRow.addBoolean(column_span_parent,false);
- } else if (span.getParents().length > 0) {
- spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow());
- spanRow.addBoolean(column_span_parent,true);
- }
- spanRow.addString(column_span_description,span.getDescription());
- this.session.apply(spanInsert);
- long annotationCounter = 0;
- for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
- annotationCounter++;
- KuduTable tableTimeline = client.openTable(table_timeline);
- Insert timelineInsert = tableTimeline.newInsert();
- PartialRow timelineRow = timelineInsert.getRow();
- timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter);
- timelineRow.addLong(column_timeline_time,ta.getTime());
- timelineRow.addString(column_timeline_message,ta.getMessage());
- timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh());
- this.session.apply(timelineInsert);
- }
- }
- dequeuedSpans.clear();
- errorCount = 0;
- } catch (Exception e) {
- errorCount += 1;
- if (errorCount < MAX_ERRORS) {
- try {
- queue.addAll(dequeuedSpans);
- } catch (IllegalStateException ex) {
- LOG.error("Exception occured while writing spans kudu datastore. " +
- "Trying to re-enqueue de-queued spans to blocking queue for writing but failed. " +
- "Dropped " + dequeuedSpans.size() + " dequeued span(s) which were due written" +
- "into kudu datastore");
- }
- }
- closeSession();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- LOG.error("Interrupted Exception occurred while allowing kudu to re-stabilized");
- }
- }
- }
- closeSession();
- }
-
- private void closeSession() {
- try {
- if (this.session != null) {
- this.session.close();
- this.session = null;
- }
- } catch (java.lang.Exception e) {
- LOG.warn("Failed to close Kudu session. " + e.getMessage());
- }
- }
-
- private void startSession() {
- if (this.session == null) {
- if (this.client == null) {
- client = clientConf.buildClient();
- }
- session = client.newSession();
+ session.flush();
+ } catch (java.lang.Exception ex) {
+ //Ignore
}
}
}
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
index 8b446e2..c13970d 100644
--- a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
@@ -48,7 +48,7 @@
public class TestKuduSpanReceiver extends BaseKuduTest {
private static final String BIN_DIR_PROP = "binDir";
- private static final String BIN_DIR_PROP_DEFAULT = "./build/release/bin";
+ private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
//set kudu binary location and enable test execution from here
private static final boolean TEST_ENABLE = false;
@@ -83,10 +83,10 @@
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
Type.INT64)
.build());
- span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID,
- Type.STRING)
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH,
+ Type.INT64)
.build());
- span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID,
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW,
Type.INT64)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
@@ -137,6 +137,7 @@
KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
.build();
TraceScope scope = tracer.newScope("testKuduScope");
+ scope.addTimelineAnnotation("test");
Span testSpan = scope.getSpan();
scope.close();
tracer.close();
@@ -147,6 +148,8 @@
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH);
+ spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW);
KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE))
.setProjectedColumnNames(spanColumns)
.build();
@@ -157,11 +160,18 @@
RowResult result = results.next();
long traceId = result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
MilliSpan.Builder builder = new MilliSpan.Builder()
- .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID),
- result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID)))
+ .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID),
+ result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID)))
.description(result.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
.begin(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
.end(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
+ if (!(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH) == 0 &&
+ result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW) == 0)) {
+ SpanId[] parents = new SpanId[1];
+ parents[0] = new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH),
+ result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW));
+ builder.parents(parents);
+ }
List<String> timelineColumns = new ArrayList<>();
timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
@@ -177,7 +187,7 @@
while (timelineScanner.hasMoreRows()) {
RowResultIterator timelineResults = timelineScanner.nextRows();
while (timelineResults.hasNext()) {
- RowResult timelineRow = results.next();
+ RowResult timelineRow = timelineResults.next();
timelineList.add(new TimelineAnnotation
(timelineRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
timelineRow.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
@@ -193,6 +203,10 @@
Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStartTimeMillis());
Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStopTimeMillis());
Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription());
+ Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getMessage(),
+ dbSpan.getTimelineAnnotations().get(0).getMessage());
+ Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getTime(),
+ dbSpan.getTimelineAnnotations().get(0).getTime());
syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
}
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
new file mode 100644
index 0000000..7dd2807
--- /dev/null
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.apache.htrace.viewer.KuduSpanViewer;
+import org.junit.*;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduClient;
+import org.kududb.client.CreateTableOptions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestKuduSpanViewer extends BaseKuduTest {
+
+ private static final String BIN_DIR_PROP = "binDir";
+ private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
+ //set kudu binary location and enable test execution from here
+ private static final boolean TEST_ENABLE = false;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ if (TEST_ENABLE) {
+ System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
+ BaseKuduTest.setUpBeforeClass();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if(TEST_ENABLE) {
+ BaseKuduTest.tearDownAfterClass();
+ }
+ }
+
+ private void createTable() throws Exception {
+ KuduClient client = BaseKuduTest.syncClient;
+ List<ColumnSchema> span_columns = new ArrayList();
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID,
+ Type.INT64)
+ .key(true)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH,
+ Type.INT64)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
+ Type.BOOL)
+ .build());
+ span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION,
+ Type.STRING)
+ .build());
+
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+ Schema schema = new Schema(span_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeys));
+
+ List<ColumnSchema> timeline_columns = new ArrayList();
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64)
+ .key(true)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME,
+ Type.INT64)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+ (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING)
+ .build());
+ timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID,
+ Type.INT64)
+ .build());
+ List<String> rangeKeysTimeline = new ArrayList<>();
+ rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
+ Schema timelineSchema = new Schema(timeline_columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE, timelineSchema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline));
+ }
+
+
+ @Test
+ public void testSpanToJson() {
+ SpanId[] parent = new SpanId[1];
+ parent[0] = new SpanId(1,1);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .parents(parent)
+ .begin(1)
+ .end(2)
+ .spanId(new SpanId(10,20))
+ .description("description");
+ List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+ for (int i = 0; i < 3; i++) {
+ timelineList.add(new TimelineAnnotation(i,"message" + i));
+ }
+ builder.timeline(timelineList);
+ Span span = builder.build();
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\",\"parent_id\":\"1\"," +
+ "\"start\":\"1\",\"stop\":\"2\",\"timeline\":[{\"time\":\"0\",\"message\":\"message0\",}{\"time\":\"1\"," +
+ "\"message\":\"message1\",}{\"time\":\"2\",\"message\":\"message2\",}]}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSpanWithoutTimelineToJson() {
+ SpanId[] parent = new SpanId[1];
+ parent[0] = new SpanId(200,111);
+ MilliSpan.Builder builder = new MilliSpan.Builder()
+ .parents(parent)
+ .begin(1)
+ .end(2)
+ .spanId(new SpanId(10,20))
+ .tracerId("pid")
+ .description("description");
+ Span span = builder.build();
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\"," +
+ "\"parent_id\":\"111\",\"start\":\"1\",\"stop\":\"2\",}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+
+ @Ignore
+ @Test
+ public void TestKuduSpanViewer() throws Exception {
+ createTable();
+ Tracer tracer = new Tracer.Builder().
+ name("testKuduSpanReceiver").
+ tracerPool(new TracerPool("testKuduSpanReceiver")).
+ conf(HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
+ KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
+ .build();
+ TraceScope scope = tracer.newScope("testKuduScope");
+ scope.addTimelineAnnotation("test");
+ Span testSpan = scope.getSpan();
+ TraceScope childScope = tracer.newScope("testKuduChildScope", new SpanId(100,200));
+ Span childScopeSpan = childScope.getSpan();
+ childScope.addTimelineAnnotation("testChild");
+ childScope.close();
+ scope.close();
+ tracer.close();
+ HTraceConfiguration conf = HTraceConfiguration
+ .fromKeyValuePairs(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+ BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]);
+ KuduSpanViewer viewer = new KuduSpanViewer(conf);
+ List<Span> list = viewer.getRootSpans();
+ Assert.assertEquals(list.size(), 1);
+ Span span = viewer.getRootSpans().get(0);
+ try {
+ String json = KuduSpanViewer.toJsonString(span);
+ String expected = KuduSpanViewer.toJsonString(testSpan);
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ List<Span> list2 = viewer.getSpans(span.getSpanId().getHigh());
+ Assert.assertEquals(list2.size(), 2);
+ Span span2 = list2.get(0);
+ try {
+ String json = KuduSpanViewer.toJsonString(span2);
+ String expected = null;
+ if(span2.getParents().length != 0) {
+ expected = KuduSpanViewer.toJsonString(childScopeSpan);
+ } else {
+ expected = KuduSpanViewer.toJsonString(testSpan);
+ }
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ Span span3 = list2.get(1);
+ try {
+ String json = KuduSpanViewer.toJsonString(span3);
+ String expected = null;
+ if(span3.getParents().length != 0) {
+ expected = KuduSpanViewer.toJsonString(childScopeSpan);
+ } else {
+ expected = KuduSpanViewer.toJsonString(testSpan);
+ }
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+}