blob: f8dc19e360ca8a149890000b8a7449340130edfd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.trace;
import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
import static org.apache.phoenix.metrics.MetricInfo.END;
import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
import static org.apache.phoenix.metrics.MetricInfo.PARENT;
import static org.apache.phoenix.metrics.MetricInfo.SPAN;
import static org.apache.phoenix.metrics.MetricInfo.START;
import static org.apache.phoenix.metrics.MetricInfo.TAG;
import static org.apache.phoenix.metrics.MetricInfo.TRACE;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.TimelineAnnotation;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class
* instantiates a thread pool of configurable size, which will pull the data from queue and write to
* the Phoenix Trace Table in batches. Various configuration options include thread pool size and
* batch commit size.
*/
public class TraceWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceWriter.class);
private static final String VARIABLE_VALUE = "?";
private static final Joiner COLUMN_JOIN = Joiner.on(".");
static final String TAG_FAMILY = "tags";
/**
* Count of the number of tags we are storing for this row
*/
static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count");
static final String ANNOTATION_FAMILY = "annotations";
static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count");
/**
* Join strings on a comma
*/
private static final Joiner COMMAS = Joiner.on(',');
private String tableName;
private int batchSize;
private int numThreads;
private TraceSpanReceiver traceSpanReceiver;
protected ScheduledExecutorService executor;
public TraceWriter(String tableName, int numThreads, int batchSize) {
this.batchSize = batchSize;
this.numThreads = numThreads;
this.tableName = tableName;
}
public void start() {
traceSpanReceiver = getTraceSpanReceiver();
if (traceSpanReceiver == null) {
LOGGER.warn(
"No receiver has been initialized for TraceWriter. Traces will not be written.");
LOGGER.warn("Restart Phoenix to try again.");
return;
}
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER");
executor = Executors.newScheduledThreadPool(this.numThreads, builder.build());
for (int i = 0; i < this.numThreads; i++) {
executor.scheduleAtFixedRate(new FlushMetrics(), 0, 10, TimeUnit.SECONDS);
}
LOGGER.info("Writing tracing metrics to phoenix table");
}
@VisibleForTesting
protected TraceSpanReceiver getTraceSpanReceiver() {
return Tracing.getTraceSpanReceiver();
}
public class FlushMetrics implements Runnable {
private Connection conn;
private int counter = 0;
public FlushMetrics() {
conn = getConnection(tableName);
}
@Override
public void run() {
if (conn == null) return;
while (!traceSpanReceiver.isSpanAvailable()) {
Span span = traceSpanReceiver.getSpan();
if (null == span) break;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Span received: " + span.toJson());
}
addToBatch(span);
counter++;
if (counter >= batchSize) {
commitBatch(conn);
counter = 0;
}
}
}
private void addToBatch(Span span) {
String stmt = "UPSERT INTO " + tableName + " (";
// drop it into the queue of things that should be written
List<String> keys = new ArrayList<String>();
List<Object> values = new ArrayList<Object>();
// we need to keep variable values in a separate set since they may have spaces, which
// causes the parser to barf. Instead, we need to add them after the statement is
// prepared
List<String> variableValues = new ArrayList<String>();
keys.add(TRACE.columnName);
values.add(span.getTraceId());
keys.add(DESCRIPTION.columnName);
values.add(VARIABLE_VALUE);
variableValues.add(span.getDescription());
keys.add(SPAN.traceName);
values.add(span.getSpanId());
keys.add(PARENT.traceName);
values.add(span.getParentId());
keys.add(START.traceName);
values.add(span.getStartTimeMillis());
keys.add(END.traceName);
values.add(span.getStopTimeMillis());
int annotationCount = 0;
int tagCount = 0;
// add the tags to the span. They were written in order received so we mark them as such
for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
addDynamicEntry(keys, values, variableValues, TAG_FAMILY,
Long.toString(ta.getTime()), ta.getMessage(), TAG, tagCount);
tagCount++;
}
// add the annotations. We assume they are serialized as strings and integers, but that
// can
// change in the future
Map<byte[], byte[]> annotations = span.getKVAnnotations();
for (Map.Entry<byte[], byte[]> annotation : annotations.entrySet()) {
Pair<String, String> val =
TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, val.getFirst(),
val.getSecond(), ANNOTATION, annotationCount);
annotationCount++;
}
// add the tag count, now that we know it
keys.add(TAG_COUNT);
// ignore the hostname in the tags, if we know it
values.add(tagCount);
keys.add(ANNOTATION_COUNT);
values.add(annotationCount);
// compile the statement together
stmt += COMMAS.join(keys);
stmt += ") VALUES (" + COMMAS.join(values) + ")";
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Logging metrics to phoenix table via: " + stmt);
LOGGER.trace("With tags: " + variableValues);
}
try {
PreparedStatement ps = conn.prepareStatement(stmt);
// add everything that wouldn't/may not parse
int index = 1;
for (String tag : variableValues) {
ps.setString(index++, tag);
}
// Not going through the standard route of using statement.execute() as that code
// path
// is blocked if the metadata hasn't been been upgraded to the new minor release.
MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
MutationState newState = plan.execute();
state.join(newState);
} catch (SQLException e) {
LOGGER.error("Could not write metric: \n" + span + " to prepared statement:\n" + stmt,
e);
}
}
}
public static String getDynamicColumnName(String family, String column, int count) {
return COLUMN_JOIN.join(family, column) + count;
}
private void addDynamicEntry(List<String> keys, List<Object> values,
List<String> variableValues, String family, String desc, String value,
MetricInfo metric, int count) {
// <family><.dynColumn><count> <VARCHAR>
keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");
// build the annotation value
String val = desc + " - " + value;
values.add(VARIABLE_VALUE);
variableValues.add(val);
}
protected Connection getConnection(String tableName) {
try {
// create the phoenix connection
Properties props = new Properties();
props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, Tracing.Frequency.NEVER.getKey());
Configuration conf = HBaseConfiguration.create();
Connection conn = QueryUtil.getConnectionOnServer(props, conf);
if (!traceTableExists(conn, tableName)) {
createTable(conn, tableName);
}
LOGGER.info(
"Created new connection for tracing " + conn.toString() + " Table: " + tableName);
return conn;
} catch (Exception e) {
LOGGER.error("Tracing will NOT be pursued. New connection failed for tracing Table: "
+ tableName,
e);
LOGGER.error("Restart Phoenix to retry.");
return null;
}
}
protected boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
try {
PhoenixRuntime.getTable(conn, traceTableName);
return true;
} catch (TableNotFoundException e) {
return false;
}
}
/**
* Create a stats table with the given name. Stores the name for use later when creating upsert
* statements
* @param conn connection to use when creating the table
* @param table name of the table to create
* @throws SQLException if any phoenix operations fails
*/
protected void createTable(Connection conn, String table) throws SQLException {
// only primary-key columns can be marked non-null
String ddl =
"create table if not exists " + table + "( " + TRACE.columnName
+ " bigint not null, " + PARENT.columnName + " bigint not null, "
+ SPAN.columnName + " bigint not null, " + DESCRIPTION.columnName
+ " varchar, " + START.columnName + " bigint, " + END.columnName
+ " bigint, " + HOSTNAME.columnName + " varchar, " + TAG_COUNT
+ " smallint, " + ANNOTATION_COUNT + " smallint"
+ " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", "
+ PARENT.columnName + ", " + SPAN.columnName + "))\n" +
// We have a config parameter that can be set so that tables are
// transactional by default. If that's set, we still don't want these system
// tables created as transactional tables, make these table non
// transactional
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
}
protected void commitBatch(Connection conn) {
try {
conn.commit();
} catch (SQLException e) {
LOGGER.error(
"Unable to commit traces on conn: " + conn.toString() + " to table: " + tableName,
e);
}
}
}