blob: e5059f7cc18c7b138fac8e15e1c39bfdcab366f0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.io.File;
import java.net.InetSocketAddress;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The configuration of the HTracedSpanReceiver.
*
* This class extracts all the relevant configuration information for
* HTracedSpanReceiver from the HTraceConfiguration. It performs parsing and
* bounds-checking for the configuration keys.
*
* It is more efficient to store the configuration as final values in this
* structure than to access the HTraceConfiguration object directly. This is
* especially true when the HTraceConfiguration object is a thin shim around a
* Hadoop Configuration object, which requires synchronization to access.
*/
class Conf {
private static final Log LOG = LogFactory.getLog(Conf.class);
/**
* Address of the htraced server.
*/
final static String ADDRESS_KEY =
"htraced.receiver.address";
/**
* The minimum number of milliseconds to wait for a read or write
* operation on the network.
*/
final static String IO_TIMEOUT_MS_KEY =
"htraced.receiver.io.timeout.ms";
final static int IO_TIMEOUT_MS_DEFAULT = 60000;
final static int IO_TIMEOUT_MS_MIN = 50;
/**
* The minimum number of milliseconds to wait for a network
* connection attempt.
*/
final static String CONNECT_TIMEOUT_MS_KEY =
"htraced.receiver.connect.timeout.ms";
final static int CONNECT_TIMEOUT_MS_DEFAULT = 60000;
final static int CONNECT_TIMEOUT_MS_MIN = 50;
/**
* The minimum number of milliseconds to keep alive a connection when it's
* not in use.
*/
final static String IDLE_TIMEOUT_MS_KEY =
"htraced.receiver.idle.timeout.ms";
final static int IDLE_TIMEOUT_MS_DEFAULT = 60000;
final static int IDLE_TIMEOUT_MS_MIN = 0;
/**
* Configure the retry times to use when an attempt to flush spans to
* htraced fails. This is configured as a comma-separated list of delay
* times in milliseconds. If the configured value is empty, no retries
* will be made.
*/
final static String FLUSH_RETRY_DELAYS_KEY =
"htraced.flush.retry.delays.key";
final static String FLUSH_RETRY_DELAYS_DEFAULT =
"1000,30000";
/**
* The maximum length of time to go in between flush attempts.
* Once this time elapses, a flush will be triggered even if we don't
* have that many spans buffered.
*/
final static String MAX_FLUSH_INTERVAL_MS_KEY =
"htraced.receiver.max.flush.interval.ms";
final static int MAX_FLUSH_INTERVAL_MS_DEFAULT = 60000;
final static int MAX_FLUSH_INTERVAL_MS_MIN = 10;
/**
* Whether or not to use msgpack for span serialization.
* If this key is false, JSON over REST will be used.
* If this key is true, msgpack over custom RPC will be used.
*/
final static String PACKED_KEY =
"htraced.receiver.packed";
final static boolean PACKED_DEFAULT = true;
/**
* The size of the span buffers.
*/
final static String BUFFER_SIZE_KEY =
"htraced.receiver.buffer.size";
final static int BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024;
static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
// The maximum buffer size should not be longer than
// PackedBuffer.MAX_HRPC_BODY_LENGTH.
final static int BUFFER_SIZE_MAX = 32 * 1024 * 1024;
/**
* Set the fraction of the span buffer which needs to fill up before we
* will automatically trigger a flush. This is a fraction, not a percentage.
* It is between 0 and 1.
*/
final static String BUFFER_SEND_TRIGGER_FRACTION_KEY =
"htraced.receiver.buffer.send.trigger.fraction";
final static double BUFFER_SEND_TRIGGER_FRACTION_DEFAULT = 0.5;
final static double BUFFER_SEND_TRIGGER_FRACTION_MIN = 0.1;
/**
* The length of time which receiveSpan should wait for a free spot in a
* span buffer before giving up and dropping the span
*/
final static String SPAN_DROP_TIMEOUT_MS_KEY =
"htraced.max.buffer.full.retry.ms.key";
final static int SPAN_DROP_TIMEOUT_MS_DEFAULT = 5000;
/**
* The length of time we should wait between displaying log messages on the
* rate-limited loggers.
*/
final static String ERROR_LOG_PERIOD_MS_KEY =
"htraced.error.log.period.ms";
final static long ERROR_LOG_PERIOD_MS_DEFAULT = 30000L;
final static String DROPPED_SPANS_LOG_PATH_KEY =
"htraced.dropped.spans.log.path";
final static String DROPPED_SPANS_LOG_PATH_DEFAULT =
new File(System.getProperty("java.io.tmpdir", "/tmp"), "htraceDropped").
getAbsolutePath();
final static String DROPPED_SPANS_LOG_MAX_SIZE_KEY =
"htraced.dropped.spans.log.max.size";
final static long DROPPED_SPANS_LOG_MAX_SIZE_DEFAULT = 1024L * 1024L;
@JsonProperty("ioTimeoutMs")
final int ioTimeoutMs;
@JsonProperty("connectTimeoutMs")
final int connectTimeoutMs;
@JsonProperty("idleTimeoutMs")
final int idleTimeoutMs;
@JsonProperty("flushRetryDelays")
final int[] flushRetryDelays;
@JsonProperty("maxFlushIntervalMs")
final int maxFlushIntervalMs;
@JsonProperty("packed")
final boolean packed;
@JsonProperty("bufferSize")
final int bufferSize;
@JsonProperty("spanDropTimeoutMs")
final int spanDropTimeoutMs;
@JsonProperty("errorLogPeriodMs")
final long errorLogPeriodMs;
@JsonProperty("triggerSize")
final int triggerSize;
@JsonProperty("endpointStr")
final String endpointStr;
@JsonProperty("endpoint")
final InetSocketAddress endpoint;
@JsonProperty("droppedSpansLogPath")
final String droppedSpansLogPath;
@JsonProperty("droppedSpansLogMaxSize")
final long droppedSpansLogMaxSize;
private static int getBoundedInt(final HTraceConfiguration conf,
String key, int defaultValue, int minValue, int maxValue) {
int val = conf.getInt(key, defaultValue);
if (val < minValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
"of " + minValue + " instead.");
return minValue;
} else if (val > maxValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
"of " + maxValue + " instead.");
return maxValue;
}
return val;
}
private static long getBoundedLong(final HTraceConfiguration conf,
String key, long defaultValue, long minValue, long maxValue) {
String strVal = conf.get(key, Long.toString(defaultValue));
long val = 0;
try {
val = Long.parseLong(strVal);
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException("Bad value for '" + key +
"': should be long");
}
if (val < minValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
"of " + minValue + " instead.");
return minValue;
} else if (val > maxValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
"of " + maxValue + " instead.");
return maxValue;
}
return val;
}
private static double getBoundedDouble(final HTraceConfiguration conf,
String key, double defaultValue, double minValue, double maxValue) {
String strVal = conf.get(key, Double.toString(defaultValue));
double val = 0;
try {
val = Double.parseDouble(strVal);
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException("Bad value for '" + key +
"': should be double");
}
if (val < minValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " +
"of " + minValue + " instead.");
return minValue;
}
if (val > maxValue) {
LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " +
"of " + maxValue + " instead.");
return maxValue;
}
return val;
}
private static int parseColonPort(String portStr) throws IOException {
int colonPosition = portStr.indexOf(':');
if (colonPosition != 0) {
throw new IOException("Invalid port string " + portStr);
}
int port = Integer.parseInt(portStr.substring(1), 10);
if ((port < 0) || (port > 65535)) {
throw new IOException("Invalid port number " + port);
}
return port;
}
/**
* Parse a hostname:port or ip:port pair.
*
* @param str The string to parse.
* @return The socket address.
*/
InetSocketAddress parseHostPortPair(String str) throws IOException {
str = str.trim();
if (str.isEmpty()) {
throw new IOException("No hostname:port pair given.");
}
int bracketBegin = str.indexOf('[');
if (bracketBegin == 0) {
// Parse an ipv6-style address enclosed in square brackets.
int bracketEnd = str.indexOf(']');
if (bracketEnd < 0) {
throw new IOException("Found left bracket, but no corresponding " +
"right bracket, in " + str);
}
String host = str.substring(bracketBegin + 1, bracketEnd);
int port = parseColonPort(str.substring(bracketEnd + 1));
return InetSocketAddress.createUnresolved(host, port);
} else if (bracketBegin > 0) {
throw new IOException("Found a left bracket that wasn't at the " +
"start of the host:port pair in " + str);
} else {
int colon = str.indexOf(':');
if (colon <= 0) {
throw new IOException("No port component found in " + str);
}
String host = str.substring(0, colon);
int port = parseColonPort(str.substring(colon));
return InetSocketAddress.createUnresolved(host, port);
}
}
static int[] getIntArray(String arrayStr) {
String[] array = arrayStr.split(",");
int nonEmptyEntries = 0;
for (String str : array) {
if (!str.trim().isEmpty()) {
nonEmptyEntries++;
}
}
int[] ret = new int[nonEmptyEntries];
int i = 0;
for (String str : array) {
if (!str.trim().isEmpty()) {
ret[i++] = Integer.parseInt(str);
}
}
return ret;
}
Conf(HTraceConfiguration conf) throws IOException {
this.ioTimeoutMs = getBoundedInt(conf, IO_TIMEOUT_MS_KEY,
IO_TIMEOUT_MS_DEFAULT,
IO_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
this.connectTimeoutMs = getBoundedInt(conf, CONNECT_TIMEOUT_MS_KEY,
CONNECT_TIMEOUT_MS_DEFAULT,
CONNECT_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
this.idleTimeoutMs = getBoundedInt(conf, IDLE_TIMEOUT_MS_KEY,
IDLE_TIMEOUT_MS_DEFAULT,
IDLE_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
this.flushRetryDelays = getIntArray(conf.get(FLUSH_RETRY_DELAYS_KEY,
FLUSH_RETRY_DELAYS_DEFAULT));
this.maxFlushIntervalMs = getBoundedInt(conf, MAX_FLUSH_INTERVAL_MS_KEY,
MAX_FLUSH_INTERVAL_MS_DEFAULT,
MAX_FLUSH_INTERVAL_MS_MIN, Integer.MAX_VALUE);
this.packed = conf.getBoolean(PACKED_KEY, PACKED_DEFAULT);
this.bufferSize = getBoundedInt(conf, BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT,
BUFFER_SIZE_MIN, BUFFER_SIZE_MAX);
double triggerFraction = getBoundedDouble(conf,
BUFFER_SEND_TRIGGER_FRACTION_KEY,
BUFFER_SEND_TRIGGER_FRACTION_DEFAULT,
BUFFER_SEND_TRIGGER_FRACTION_MIN, 1.0);
this.spanDropTimeoutMs = conf.getInt(SPAN_DROP_TIMEOUT_MS_KEY,
SPAN_DROP_TIMEOUT_MS_DEFAULT);
this.errorLogPeriodMs = getBoundedLong(conf, ERROR_LOG_PERIOD_MS_KEY,
ERROR_LOG_PERIOD_MS_DEFAULT, 0, Long.MAX_VALUE);
this.triggerSize = (int)(this.bufferSize * triggerFraction);
try {
this.endpointStr = conf.get(ADDRESS_KEY, "");
this.endpoint = parseHostPortPair(endpointStr);
} catch (IOException e) {
throw new IOException("Error reading " + ADDRESS_KEY + ": " +
e.getMessage());
}
this.droppedSpansLogPath = conf.get(
DROPPED_SPANS_LOG_PATH_KEY, DROPPED_SPANS_LOG_PATH_DEFAULT);
this.droppedSpansLogMaxSize = getBoundedLong(conf,
DROPPED_SPANS_LOG_MAX_SIZE_KEY, DROPPED_SPANS_LOG_MAX_SIZE_DEFAULT,
0, Long.MAX_VALUE);
}
@Override
public String toString() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}