refactored and added unit test cases
diff --git a/htrace-kudu/pom.xml b/htrace-kudu/pom.xml
index 60db72d..58140a6 100644
--- a/htrace-kudu/pom.xml
+++ b/htrace-kudu/pom.xml
@@ -18,7 +18,7 @@
<parent>
<artifactId>htrace</artifactId>
<groupId>org.apache.htrace</groupId>
- <version>4.1.0-incubating-SNAPSHOT</version>
+ <version>4.2.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -31,8 +31,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <kudu.version>1.0.0-SNAPSHOT</kudu.version>
- <hadoop.version>2.6.0-cdh5.4.7</hadoop.version>
+ <kudu.version>0.9.1</kudu.version>
+ <commons.version>1.3.2</commons.version>
<createDependencyReducedPom>true</createDependencyReducedPom>
</properties>
@@ -147,7 +147,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
<!-- KUDU specific deps. -->
<dependency>
<groupId>org.kududb</groupId>
@@ -155,9 +154,16 @@
<version>${kudu.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
+ <groupId>org.kududb</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons.version}</version>
</dependency>
</dependencies>
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java
deleted file mode 100644
index 4a5e740..0000000
--- a/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.htrace;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.htrace.core.HTraceConfiguration;
-
-public class KuduHTraceConfiguration extends HTraceConfiguration {
-
- public static final String KEY_PREFIX = "kudu.";
- private final Configuration conf;
-
- public KuduHTraceConfiguration(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public String get(String key) {
- return conf.get(KEY_PREFIX + key);
- }
-
- @Override
- public String get(String key, String defaultValue) {
- return conf.get(KEY_PREFIX + key, defaultValue);
-
- }
-
- @Override
- public boolean getBoolean(String key, boolean defaultValue) {
- return conf.getBoolean(KEY_PREFIX + key, defaultValue);
- }
-
- @Override
- public int getInt(String key, int defaultValue) {
- return conf.getInt(KEY_PREFIX + key, defaultValue);
- }
-}
\ No newline at end of file
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
index 4e99efc..c58dbb2 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -24,12 +24,12 @@
private String host;
private String port;
- private int workerCount;
- private int bossCount;
- private boolean isStatisticsEnabled;
- private long adminOperationTimeout;
- private long operationTimeout;
- private long socketReadTimeout;
+ private Integer workerCount;
+ private Integer bossCount;
+ private Boolean isStatisticsEnabled;
+ private Long adminOperationTimeout;
+ private Long operationTimeout;
+ private Long socketReadTimeout;
public KuduClientConfiguration(String host, String port) {
this.host = host;
@@ -63,22 +63,22 @@
public KuduClient buildClient() {
KuduClientBuilder builder = new KuduClient
.KuduClientBuilder(host.concat(":").concat(port));
- if (Integer.valueOf(workerCount) != null) {
+ if (workerCount != null) {
builder.workerCount(workerCount);
}
- if (Integer.valueOf(bossCount) != null) {
+ if (bossCount != null) {
builder.bossCount(bossCount);
}
- if (!Boolean.valueOf(isStatisticsEnabled)) {
+ if (isStatisticsEnabled != null && isStatisticsEnabled == false) {
builder.disableStatistics();
}
- if (Long.valueOf(adminOperationTimeout) != null) {
+ if (adminOperationTimeout != null) {
builder.defaultAdminOperationTimeoutMs(adminOperationTimeout);
}
- if (Long.valueOf(operationTimeout) != null) {
+ if (operationTimeout != null) {
builder.defaultOperationTimeoutMs(operationTimeout);
}
- if (Long.valueOf(socketReadTimeout) != null) {
+ if (socketReadTimeout != null) {
builder.defaultSocketReadTimeoutMs(socketReadTimeout);
}
return builder.build();
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
similarity index 98%
rename from htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java
rename to htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
index 4092b8b..a605dfe 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
@@ -17,7 +17,7 @@
package org.apache.htrace.impl;
-public class KuduConstants {
+public class KuduReceiverConstants {
public static final String KUDU_MASTER_HOST_KEY = "htrace.kudu.master.host";
public static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
index da7aca3..ed3e093 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -74,44 +74,50 @@
public KuduSpanReceiver(HTraceConfiguration conf) {
this.clientConf =
- new KuduClientConfiguration(conf.get(KuduConstants.KUDU_MASTER_HOST_KEY,
- KuduConstants.DEFAULT_KUDU_MASTER_HOST),
- conf.get(KuduConstants.KUDU_MASTER_PORT_KEY,
- KuduConstants.DEFAULT_KUDU_MASTER_PORT));
- this.clientConf.setBossCount(conf.getInt(KuduConstants.KUDU_CLIENT_BOSS_COUNT_KEY,
- Integer.valueOf(null)));
- this.clientConf.setWorkerCount(conf.getInt(KuduConstants.KUDU_CLIENT_WORKER_COUNT_KEY,
- Integer.valueOf(null)));
- this.clientConf.setIsStatisticsEnabled(conf.getBoolean(KuduConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY,
- Boolean.valueOf(null)));
- this.clientConf
- .setAdminOperationTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY,
- String.valueOf(null))));
- this.clientConf
- .setOperationTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY,
- String.valueOf(null))));
- this.clientConf
- .setSocketReadTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY,
- String.valueOf(null))));
- this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
- KuduConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
- this.table = conf.get(KuduConstants.KUDU_TABLE_KEY, KuduConstants.DEFAULT_KUDU_TABLE);
- this.column_span_id = conf.get(KuduConstants.KUDU_COLUMN_SPAN_ID_KEY,
- KuduConstants.DEFAULT_KUDU_COLUMN_SPAN_ID);
- this.column_span = conf.get(KuduConstants.KUDU_COLUMN_SPAN_KEY,
- KuduConstants.DEFAULT_KUDU_COLUMN_SPAN);
- this.column_root_span = conf.get(KuduConstants.KUDU_COLUMN_ROOT_SPAN_KEY,
- KuduConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN);
- this.column_root_span_start_time = conf.get(KuduConstants.KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY,
- KuduConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME);
- this.maxSpanBatchSize = conf.getInt(KuduConstants.MAX_SPAN_BATCH_SIZE_KEY,
- KuduConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
+ new KuduClientConfiguration(conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST),
+ conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT));
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
+ this.clientConf.setBossCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY)));
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) {
+ this.clientConf.setWorkerCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY)));
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != null) {
+ this.clientConf.setIsStatisticsEnabled(Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY)));
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != null) {
+ this.clientConf
+ .setAdminOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY)));
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != null) {
+ this.clientConf
+ .setOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY)));
+ }
+ if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != null) {
+ this.clientConf
+ .setSocketReadTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY)));
+ }
+ this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
+ KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
+ this.table = conf.get(KuduReceiverConstants.KUDU_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_TABLE);
+ this.column_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_ID_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_ID);
+ this.column_span = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN);
+ this.column_root_span = conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN);
+ this.column_root_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY,
+ KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME);
+ this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY,
+ KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
if (this.service != null) {
this.service.shutdownNow();
this.service = null;
}
- int numThreads = conf.getInt(KuduConstants.NUM_PARALLEL_THREADS_KEY,
- KuduConstants.DEFAULT_NUM_PARALLEL_THREADS);
+ int numThreads = conf.getInt(KuduReceiverConstants.NUM_PARALLEL_THREADS_KEY,
+ KuduReceiverConstants.DEFAULT_NUM_PARALLEL_THREADS);
this.service = Executors.newFixedThreadPool(numThreads, threadFactory);
for (int i = 0; i < numThreads; i++) {
this.service.submit(new KuduSpanReceiver.WriteSpanRunnable());
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
new file mode 100644
index 0000000..99a2839
--- /dev/null
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.ColumnSchema;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduClient;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.RowResult;
+
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestKuduSpanReceiver extends BaseKuduTest {
+
+ private static final String BIN_DIR_PROP = "binDir";
+ private static final String BIN_DIR_PROP_DEFAULT = "/home/djkevincr/poc/incubator-kudu/build/release/bin";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
+ BaseKuduTest.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ BaseKuduTest.tearDownAfterClass();
+ }
+
+ private void createTable() throws Exception {
+ KuduClient client = BaseKuduTest.syncClient;
+ List<ColumnSchema> columns = new ArrayList(4);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_ID,
+ Type.BINARY)
+ .key(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN,
+ Type.BINARY)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME,
+ Type.BINARY)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN,
+ Type.BINARY)
+ .build());
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_ID);
+
+ Schema schema = new Schema(columns);
+ client.createTable(KuduReceiverConstants.DEFAULT_KUDU_TABLE, schema,
+ new CreateTableOptions().setRangePartitionColumns(rangeKeys));
+ }
+
+ @Ignore
+ @Test
+ public void TestKuduSpanReceiver() throws Exception {
+ createTable();
+ Tracer tracer = new Tracer.Builder().
+ name("testKuduSpanReceiver").
+ tracerPool(new TracerPool("testKuduSpanReceiver")).
+ conf(HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
+ KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
+ KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
+ .build();
+ TraceScope scope = tracer.newScope("testKuduScope");
+ Span testSpan = scope.getSpan();
+ SpanProtos.Span dbSpan = null;
+ scope.close();
+ tracer.close();
+ KuduClient client = BaseKuduTest.syncClient;
+ List<String> projectColumns = new ArrayList<>(1);
+ projectColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN);
+ KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_TABLE))
+ .setProjectedColumnNames(projectColumns)
+ .build();
+ while (scanner.hasMoreRows()) {
+ RowResultIterator results = scanner.nextRows();
+ while (results.hasNext()) {
+ RowResult result = results.next();
+ ByteArrayInputStream in = new
+ ByteArrayInputStream(result.getBinaryCopy(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN));
+ dbSpan = SpanProtos.Span.parseFrom(in);
+ break;
+ }
+ }
+ Assert.assertEquals(testSpan.getSpanId().getHigh(), dbSpan.getTraceId());
+ Assert.assertEquals(testSpan.getSpanId().getLow(), dbSpan.getSpanId());
+ Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStart());
+ Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStop());
+ Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription());
+ syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_TABLE);
+ }
+
+}