blob: 8b446e22f19979ff52ac56c697a8bf1b548a8f24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TracerPool;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.MilliSpan;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.TimelineAnnotation;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.Assert;
import org.junit.AfterClass;
import org.kududb.Schema;
import org.kududb.Type;
import org.kududb.ColumnSchema;
import org.kududb.client.BaseKuduTest;
import org.kududb.client.KuduClient;
import org.kududb.client.CreateTableOptions;
import org.kududb.client.KuduScanner;
import org.kududb.client.RowResultIterator;
import org.kududb.client.RowResult;
import org.kududb.client.KuduPredicate;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class TestKuduSpanReceiver extends BaseKuduTest {
private static final String BIN_DIR_PROP = "binDir";
private static final String BIN_DIR_PROP_DEFAULT = "./build/release/bin";
//set kudu binary location and enable test execution from here
private static final boolean TEST_ENABLE = false;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
if (TEST_ENABLE) {
System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
BaseKuduTest.setUpBeforeClass();
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if(TEST_ENABLE) {
BaseKuduTest.tearDownAfterClass();
}
}
private void createTable() throws Exception {
KuduClient client = BaseKuduTest.syncClient;
List<ColumnSchema> span_columns = new ArrayList();
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID,
Type.INT64)
.key(true)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME,
Type.INT64)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME,
Type.INT64)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
Type.INT64)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID,
Type.STRING)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID,
Type.INT64)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
Type.BOOL)
.build());
span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION,
Type.STRING)
.build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
Schema schema = new Schema(span_columns);
client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema,
new CreateTableOptions().setRangePartitionColumns(rangeKeys));
List<ColumnSchema> timeline_columns = new ArrayList();
timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64)
.key(true)
.build());
timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME,
Type.INT64)
.build());
timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING)
.build());
timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID,
Type.INT64)
.build());
List<String> rangeKeysTimeline = new ArrayList<>();
rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
Schema timelineSchema = new Schema(timeline_columns);
client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE, timelineSchema,
new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline));
}
@Ignore
@Test
public void TestKuduSpanReceiver() throws Exception {
createTable();
Tracer tracer = new Tracer.Builder().
name("testKuduSpanReceiver").
tracerPool(new TracerPool("testKuduSpanReceiver")).
conf(HTraceConfiguration.fromKeyValuePairs(
"sampler.classes", "AlwaysSampler",
"span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
.build();
TraceScope scope = tracer.newScope("testKuduScope");
Span testSpan = scope.getSpan();
scope.close();
tracer.close();
KuduClient client = BaseKuduTest.syncClient;
List<String> spanColumns = new ArrayList<>();
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE))
.setProjectedColumnNames(spanColumns)
.build();
MilliSpan dbSpan = null;
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
long traceId = result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
MilliSpan.Builder builder = new MilliSpan.Builder()
.spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID),
result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID)))
.description(result.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
.begin(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
.end(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
List<String> timelineColumns = new ArrayList<>();
timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
KuduScanner timelineScanner = client
.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE))
.setProjectedColumnNames(timelineColumns)
.addPredicate(KuduPredicate
.newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder
(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID, Type.INT64)
.build(), KuduPredicate.ComparisonOp.EQUAL, traceId))
.build();
List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
while (timelineScanner.hasMoreRows()) {
RowResultIterator timelineResults = timelineScanner.nextRows();
while (timelineResults.hasNext()) {
RowResult timelineRow = results.next();
timelineList.add(new TimelineAnnotation
(timelineRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
timelineRow.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
}
}
builder.timeline(timelineList);
dbSpan = builder.build();
break;
}
}
Assert.assertEquals(testSpan.getSpanId().getHigh(), dbSpan.getSpanId().getHigh());
Assert.assertEquals(testSpan.getSpanId().getLow(), dbSpan.getSpanId().getLow());
Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStartTimeMillis());
Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStopTimeMillis());
Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription());
syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
}
}