blob: 46c324a700f91fe41a8d327aa42b574edc379810 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;
import org.apache.htrace.core.TimelineAnnotation;
import org.kududb.client.KuduClient;
import org.kududb.client.KuduSession;
import org.kududb.client.KuduTable;
import org.kududb.client.Insert;
import org.kududb.client.PartialRow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class KuduSpanReceiver extends SpanReceiver {
private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class);
private static final int SHUTDOWN_TIMEOUT = 30;
private static final int MAX_ERRORS = 10;
private final BlockingQueue<Span> queue;
private final AtomicBoolean running = new AtomicBoolean(true);
private final KuduClientConfiguration clientConf;
private final int maxSpanBatchSize;
private final ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicLong receiverIndex = new AtomicLong(0);
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(String.format("kuduSpanReceiver-%d",
receiverIndex.getAndIncrement()));
return thread;
}
};
private ExecutorService service;
private String table_span;
private String column_span_trace_id;
private String column_span_start_time;
private String column_span_stop_time;
private String column_span_span_id;
private String column_span_process_id;
private String column_span_parent_id;
private String column_span_description;
private String column_span_parent;
private String table_timeline;
private String column_timeline_timeline_id;
private String column_timeline_time;
private String column_timeline_message;
private String column_timeline_span_id;
public KuduSpanReceiver(HTraceConfiguration conf) {
String masterHost;
String masterPort;
Integer workerCount;
Integer bossCount;
Boolean isStatisticsEnabled;
Long adminOperationTimeout;
Long operationTimeout;
Long socketReadTimeout;
masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST);
masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT);
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
} else {
bossCount = null;
}
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) {
workerCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY));
} else {
workerCount = null;
}
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != null) {
isStatisticsEnabled = Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY));
} else {
isStatisticsEnabled = null;
}
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != null) {
adminOperationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY));
} else {
adminOperationTimeout = null;
}
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != null) {
operationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY));
} else {
operationTimeout = null;
}
if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != null) {
socketReadTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY));
} else {
socketReadTimeout = null;
}
this.clientConf = new KuduClientConfiguration(masterHost,
masterPort,
workerCount,
bossCount,
isStatisticsEnabled,
adminOperationTimeout,
operationTimeout,
socketReadTimeout);
this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
this.table_timeline= conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
this.column_span_trace_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
this.column_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
this.column_span_stop_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
this.column_span_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
this.column_span_process_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID);
this.column_span_parent_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID);
this.column_span_description = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
this.column_span_parent = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT);
this.column_timeline_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
this.column_timeline_message = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
this.column_timeline_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID);
this.column_timeline_timeline_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY,
KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY,
KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
if (this.service != null) {
this.service.shutdownNow();
this.service = null;
}
int numThreads = conf.getInt(KuduReceiverConstants.NUM_PARALLEL_THREADS_KEY,
KuduReceiverConstants.DEFAULT_NUM_PARALLEL_THREADS);
this.service = Executors.newFixedThreadPool(numThreads, threadFactory);
for (int i = 0; i < numThreads; i++) {
this.service.submit(new KuduSpanReceiver.WriteSpanRunnable());
}
}
@Override
public void close() throws IOException {
running.set(false);
service.shutdown();
try {
if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.error("Timeout " + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
" reached while shutting worker threads which process enqued spans." +
" Enqueued spans which are left in blocking queue is dropped.");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception occured while terminating thread service executor.", e);
}
}
@Override
public void receiveSpan(Span span) {
if (running.get()) {
try {
this.queue.add(span);
} catch (IllegalStateException e) {
LOG.error("Error trying to enqueue span ("
+ span.getDescription()
+ ") to the queue. Blocking Queue is currently reached its capacity.");
}
}
}
private class WriteSpanRunnable implements Runnable {
private KuduSession session;
private KuduClient client;
@Override
public void run() {
List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
long errorCount = 0;
while (running.get() || queue.size() > 0) {
Span firstSpan = null;
try {
firstSpan = queue.poll(1, TimeUnit.SECONDS);
if (firstSpan != null) {
dequeuedSpans.add(firstSpan);
queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
}
} catch (InterruptedException ie) {
LOG.error("Interrupted Exception occurred while polling to " +
"retrieve first span from blocking queue");
}
startSession();
if (dequeuedSpans.isEmpty()) {
try {
this.session.flush();
} catch (java.lang.Exception e) {
LOG.error("Failed to flush writes to Kudu.");
closeSession();
}
continue;
}
try {
for (Span span : dequeuedSpans) {
KuduTable tableSpan = client.openTable(table_span);
Insert spanInsert = tableSpan.newInsert();
PartialRow spanRow = spanInsert.getRow();
spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh());
spanRow.addLong(column_span_start_time,span.getStartTimeMillis());
spanRow.addLong(column_span_stop_time,span.getStopTimeMillis());
spanRow.addLong(column_span_span_id,span.getSpanId().getLow());
spanRow.addString(column_span_process_id,span.getTracerId());
if (span.getParents().length == 0) {
spanRow.addLong(column_span_parent_id,0);
spanRow.addBoolean(column_span_parent,false);
} else if (span.getParents().length > 0) {
spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow());
spanRow.addBoolean(column_span_parent,true);
}
spanRow.addString(column_span_description,span.getDescription());
this.session.apply(spanInsert);
long annotationCounter = 0;
for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
annotationCounter++;
KuduTable tableTimeline = client.openTable(table_timeline);
Insert timelineInsert = tableTimeline.newInsert();
PartialRow timelineRow = timelineInsert.getRow();
timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter);
timelineRow.addLong(column_timeline_time,ta.getTime());
timelineRow.addString(column_timeline_message,ta.getMessage());
timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh());
this.session.apply(timelineInsert);
}
}
dequeuedSpans.clear();
errorCount = 0;
} catch (Exception e) {
errorCount += 1;
if (errorCount < MAX_ERRORS) {
try {
queue.addAll(dequeuedSpans);
} catch (IllegalStateException ex) {
LOG.error("Exception occured while writing spans kudu datastore. " +
"Trying to re-enqueue de-queued spans to blocking queue for writing but failed. " +
"Dropped " + dequeuedSpans.size() + " dequeued span(s) which were due written" +
"into kudu datastore");
}
}
closeSession();
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
LOG.error("Interrupted Exception occurred while allowing kudu to re-stabilized");
}
}
}
closeSession();
}
private void closeSession() {
try {
if (this.session != null) {
this.session.close();
this.session = null;
}
} catch (java.lang.Exception e) {
LOG.warn("Failed to close Kudu session. " + e.getMessage());
}
}
private void startSession() {
if (this.session == null) {
if (this.client == null) {
client = clientConf.buildClient();
}
session = client.newSession();
}
}
}
}