AMBARI-23804 : Refine AMS HBase region splitting calculation based on UUID work.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index 56a28dc..d09d4bb 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -17,7 +17,6 @@
*/
package org.apache.ambari.metrics.core.timeline;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -35,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -111,15 +109,18 @@
private synchronized void initializeSubsystem() {
if (!isInitialized) {
hBaseAccessor = new PhoenixHBaseAccessor(null);
- // Initialize schema
- hBaseAccessor.initMetricSchema();
- // Initialize metadata from store
+
+ // Initialize metadata
try {
metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
} catch (MalformedURLException | URISyntaxException e) {
throw new ExceptionInInitializerError("Unable to initialize metadata manager");
}
metricMetadataManager.initializeMetadata();
+
+ // Initialize metric schema
+ hBaseAccessor.initMetricSchema();
+
// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
// Start HA service
@@ -395,6 +396,10 @@
return metricsFunctions;
}
+ public void putMetricsSkipCache(TimelineMetrics metrics) throws SQLException, IOException {
+ hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, true);
+ }
+
@Override
public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
// Error indicated by the Sql exception
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index 040df1b..dec7850 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -20,6 +20,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ambari.metrics.core.timeline.FunctionUtils.findMetricFunctions;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.BLOCKING_STORE_FILES_KEY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
@@ -27,41 +28,32 @@
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.FIFO_COMPACTION_POLICY_CLASS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_ENGINE_CLASS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_TRANSIENT_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_TRANSIENT_METRICS_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
@@ -132,6 +124,7 @@
import org.apache.ambari.metrics.core.timeline.source.InternalMetricsSource;
import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -201,21 +194,8 @@
private TimelineMetricsAggregatorSink aggregatorSink;
private final int cacheCommitInterval;
private final boolean skipBlockCacheForAggregatorsEnabled;
- private final String timelineMetricsTablesDurability;
- private final String timelineMetricsPrecisionTableDurability;
private TimelineMetricMetadataManager metadataManagerInstance;
- static final String HSTORE_COMPACTION_CLASS_KEY =
- "hbase.hstore.defaultengine.compactionpolicy.class";
- static final String HSTORE_ENGINE_CLASS =
- "hbase.hstore.engine.class";
- static final String FIFO_COMPACTION_POLICY_CLASS =
- "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
- static final String DATE_TIERED_COMPACTION_POLICY =
- "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
- static final String BLOCKING_STORE_FILES_KEY =
- "hbase.hstore.blockingStoreFiles";
-
private Map<String, Integer> tableTTL = new HashMap<>();
private final TimelineMetricConfiguration configuration;
@@ -259,11 +239,9 @@
this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3"));
this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);
- this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY, "");
- this.timelineMetricsPrecisionTableDurability = metricsConf.get(TIMELINE_METRICS_PRECISION_TABLE_DURABILITY, "");
tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400)); // 1 day
- tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 30 * 86400)); // 30 days
+ tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400)); // 30 days
tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400)); //7 days
tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400)); //30 days
tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400)); //1 year
@@ -470,7 +448,7 @@
return mapper.readValue(json, metricValuesTypeRef);
}
- private Connection getConnectionRetryingOnException() throws SQLException, InterruptedException {
+ public Connection getConnectionRetryingOnException() throws SQLException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try{
@@ -511,6 +489,9 @@
protected void initMetricSchema() {
Connection conn = null;
Statement stmt = null;
+ PreparedStatement pStmt = null;
+ TimelineMetricSplitPointComputer splitPointComputer = new TimelineMetricSplitPointComputer(
+ metricsConf, hbaseConf, metadataManagerInstance);
String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
@@ -521,21 +502,6 @@
conn = getConnectionRetryingOnException();
stmt = conn.createStatement();
- // Metadata
- String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
- encoding, compression);
- stmt.executeUpdate(metadataSql);
- stmt.executeUpdate(ALTER_METRICS_METADATA_TABLE);
-
- String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
- encoding, compression);
- stmt.executeUpdate(hostedAppSql);
-
- //Host Instances table
- String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
- encoding, compression);
- stmt.executeUpdate(hostedInstancesSql);
-
// Container Metrics
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
@@ -543,13 +509,15 @@
// Host level
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
- stmt.executeUpdate(precisionSql);
+ pStmt = prepareCreateMetricsTableStatement(conn, precisionSql, splitPointComputer.getPrecisionSplitPoints());
+ pStmt.executeUpdate();
String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
compression);
- stmt.executeUpdate(hostMinuteAggregrateSql);
+ pStmt = prepareCreateMetricsTableStatement(conn, hostMinuteAggregrateSql, splitPointComputer.getHostAggregateSplitPoints());
+ pStmt.executeUpdate();
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
@@ -565,8 +533,9 @@
METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
compression);
+ pStmt = prepareCreateMetricsTableStatement(conn, aggregateSql, splitPointComputer.getClusterAggregateSplitPoints());
+ pStmt.executeUpdate();
- stmt.executeUpdate(aggregateSql);
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
@@ -603,6 +572,13 @@
// Ignore
}
}
+ if (pStmt != null) {
+ try {
+ pStmt.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
if (conn != null) {
try {
conn.close();
@@ -613,7 +589,7 @@
}
}
- protected void initPoliciesAndTTL() {
+ void initPoliciesAndTTL() {
Admin hBaseAdmin = null;
try {
hBaseAdmin = dataSource.getHBaseAdmin();
@@ -622,9 +598,13 @@
}
TableName[] tableNames = null;
+ TableName[] containerMetricsTableName = null;
+
if (hBaseAdmin != null) {
try {
tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, false);
+ containerMetricsTableName = hBaseAdmin.listTableNames(CONTAINER_METRICS_TABLE_NAME, false);
+ tableNames = (TableName[]) ArrayUtils.addAll(tableNames, containerMetricsTableName);
} catch (IOException e) {
LOG.warn("Unable to get table names from HBaseAdmin for setting policies.", e);
return;
@@ -708,72 +688,44 @@
}
private boolean setDurabilityForTable(String tableName, TableDescriptorBuilder tableDescriptor) {
-
- boolean modifyTable = false;
-
- if (METRIC_TRANSIENT_TABLE_NAME.equalsIgnoreCase(tableName)) {
- tableDescriptor.setDurability(Durability.SKIP_WAL);
- modifyTable = true;
- } else if (METRICS_RECORD_TABLE_NAME.equals(tableName)) {
- if (!timelineMetricsPrecisionTableDurability.isEmpty()) {
- LOG.info("Setting WAL option " + timelineMetricsPrecisionTableDurability + " for table : " + tableName);
- boolean validDurability = true;
- if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) {
- tableDescriptor.setDurability(Durability.SKIP_WAL);
- } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
- tableDescriptor.setDurability(Durability.SYNC_WAL);
- } else if ("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
- tableDescriptor.setDurability(Durability.ASYNC_WAL);
- } else if ("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
- tableDescriptor.setDurability(Durability.FSYNC_WAL);
- } else {
- LOG.info("Unknown value for " + TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " + timelineMetricsPrecisionTableDurability);
- validDurability = false;
- }
- if (validDurability) {
- modifyTable = true;
- }
+ String tableDurability = metricsConf.get("timeline.metrics." + tableName + ".durability", "");
+ if (StringUtils.isNotEmpty(tableDurability)) {
+ LOG.info("Setting WAL option " + tableDurability + " for table : " + tableName);
+ boolean validDurability = true;
+ if ("SKIP_WAL".equals(tableDurability)) {
+ tableDescriptor.setDurability(Durability.SKIP_WAL);
+ } else if ("SYNC_WAL".equals(tableDurability)) {
+ tableDescriptor.setDurability(Durability.SYNC_WAL);
+ } else if ("ASYNC_WAL".equals(tableDurability)) {
+ tableDescriptor.setDurability(Durability.ASYNC_WAL);
+ } else if ("FSYNC_WAL".equals(tableDurability)) {
+ tableDescriptor.setDurability(Durability.FSYNC_WAL);
+ } else {
+ LOG.info("Unknown value for durability : " + tableDurability);
+ validDurability = false;
}
- } else {
- if (!timelineMetricsTablesDurability.isEmpty()) {
- LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " for table : " + tableName);
- boolean validDurability = true;
- if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
- tableDescriptor.setDurability(Durability.SKIP_WAL);
- } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
- tableDescriptor.setDurability(Durability.SYNC_WAL);
- } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
- tableDescriptor.setDurability(Durability.ASYNC_WAL);
- } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
- tableDescriptor.setDurability(Durability.FSYNC_WAL);
- } else {
- LOG.info("Unknown value for " + TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : " + timelineMetricsTablesDurability);
- validDurability = false;
- }
- if (validDurability) {
- modifyTable = true;
- }
- }
+ return validDurability;
}
- return modifyTable;
+ return false;
}
+
private boolean setCompactionPolicyForTable(String tableName, TableDescriptorBuilder tableDescriptorBuilder) {
boolean modifyTable = false;
- String compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY,
- HSTORE_ENGINE_CLASS);
- String compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS,
- DATE_TIERED_COMPACTION_POLICY);
- int blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES, 60);
+ String keyConfig = "timeline.metrics." + tableName + ".compaction.policy.key";
+ String policyConfig = "timeline.metrics." + tableName + ".compaction.policy";
+ String storeFilesConfig = "timeline.metrics." + tableName + ".blocking.store.files";
- if (tableName.equals(METRICS_RECORD_TABLE_NAME) || tableName.equalsIgnoreCase(METRIC_TRANSIENT_TABLE_NAME)) {
- compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY,
- HSTORE_COMPACTION_CLASS_KEY);
- compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS,
- FIFO_COMPACTION_POLICY_CLASS);
- blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES, 1000);
+ String compactionPolicyKey = metricsConf.get(keyConfig, HSTORE_ENGINE_CLASS);
+ String compactionPolicyClass = metricsConf.get(policyConfig, DATE_TIERED_COMPACTION_POLICY);
+ int blockingStoreFiles = hbaseConf.getInt(storeFilesConfig, 60);
+
+ if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+ compactionPolicyKey = metricsConf.get(keyConfig, HSTORE_COMPACTION_CLASS_KEY);
+ compactionPolicyClass = metricsConf.get(policyConfig, FIFO_COMPACTION_POLICY_CLASS);
+ blockingStoreFiles = hbaseConf.getInt(storeFilesConfig, 1000);
}
if (StringUtils.isEmpty(compactionPolicyKey) || StringUtils.isEmpty(compactionPolicyClass)) {
@@ -781,46 +733,54 @@
modifyTable = setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, 300);
} else {
tableDescriptorBuilder.setValue(compactionPolicyKey, compactionPolicyClass);
- tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
- tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, blockingStoreFiles);
modifyTable = true;
}
+ if (!compactionPolicyKey.equals(HSTORE_ENGINE_CLASS)) {
+ tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
+ }
+ if (!compactionPolicyKey.equals(HSTORE_COMPACTION_CLASS_KEY)) {
+ tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
+ }
+
return modifyTable;
}
private boolean setHbaseBlockingStoreFiles(TableDescriptorBuilder tableDescriptor, String tableName, int value) {
- int blockingStoreFiles = hbaseConf.getInt(HBASE_BLOCKING_STORE_FILES, value);
- if (blockingStoreFiles != value) {
- blockingStoreFiles = value;
- tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value));
- LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
- " = " + blockingStoreFiles + " for " + tableName);
- return true;
- }
- return false;
+ tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value));
+ LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+ " = " + value + " for " + tableName);
+ return true;
}
- protected String getSplitPointsStr(String splitPoints) {
- if (StringUtils.isEmpty(splitPoints.trim())) {
+
+ private PreparedStatement prepareCreateMetricsTableStatement(Connection connection,
+ String sql,
+ List<byte[]> splitPoints) throws SQLException {
+
+ String createTableWithSplitPointsSql = sql + getSplitPointsStr(splitPoints.size());
+ LOG.info(createTableWithSplitPointsSql);
+ PreparedStatement statement = connection.prepareStatement(createTableWithSplitPointsSql);
+ for (int i = 1; i <= splitPoints.size(); i++) {
+ statement.setBytes(i, splitPoints.get(i - 1));
+ }
+ return statement;
+ }
+
+ private String getSplitPointsStr(int numSplits) {
+ if (numSplits <= 0) {
return "";
}
- String[] points = splitPoints.split(",");
- if (points.length > 0) {
- StringBuilder sb = new StringBuilder(" SPLIT ON ");
- sb.append("(");
- for (String point : points) {
- sb.append("'");
- sb.append(point.trim());
- sb.append("'");
- sb.append(",");
- }
- sb.deleteCharAt(sb.length() - 1);
- sb.append(")");
- return sb.toString();
+ StringBuilder sb = new StringBuilder(" SPLIT ON ");
+ sb.append("(");
+ for (int i = 0; i < numSplits; i++) {
+ sb.append("?");
+ sb.append(",");
}
- return "";
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(")");
+ return sb.toString();
}
/**
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
index 6ec2c6b..393d4a3 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
@@ -237,12 +237,6 @@
public static final String WATCHER_MAX_FAILURES =
"timeline.metrics.service.watcher.max.failures";
- public static final String PRECISION_TABLE_SPLIT_POINTS =
- "timeline.metrics.host.aggregate.splitpoints";
-
- public static final String AGGREGATE_TABLE_SPLIT_POINTS =
- "timeline.metrics.cluster.aggregate.splitpoints";
-
public static final String AGGREGATORS_SKIP_BLOCK_CACHE =
"timeline.metrics.aggregators.skip.blockcache.enabled";
@@ -261,12 +255,6 @@
public static final String TIMELINE_METRICS_SINK_COLLECTION_PERIOD =
"timeline.metrics.sink.collection.period";
- public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
- "timeline.metrics.precision.table.durability";
-
- public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
- "timeline.metrics.aggregate.tables.durability";
-
public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
"timeline.metrics.whitelisting.enabled";
@@ -285,33 +273,9 @@
public static final String TIMELINE_METRICS_APPS_WHITELIST =
"timeline.metrics.apps.whitelist";
- public static final String HBASE_BLOCKING_STORE_FILES =
- "hbase.hstore.blockingStoreFiles";
-
- public static final String DEFAULT_TOPN_HOSTS_LIMIT =
- "timeline.metrics.default.topn.hosts.limit";
-
public static final String TIMELINE_METRIC_AGGREGATION_SQL_FILTERS =
"timeline.metrics.cluster.aggregation.sql.filters";
- public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY =
- "timeline.metrics.hbase.aggregate.table.compaction.policy.key";
-
- public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS =
- "timeline.metrics.hbase.aggregate.table.compaction.policy.class";
-
- public static final String TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES =
- "timeline.metrics.aggregate.table.hbase.hstore.blockingStoreFiles";
-
- public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY =
- "timeline.metrics.hbase.precision.table.compaction.policy.key";
-
- public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS =
- "timeline.metrics.hbase.precision.table.compaction.policy.class";
-
- public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
- "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
-
public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
"timeline.metrics.support.multiple.clusters";
@@ -346,6 +310,9 @@
public static final String TRANSIENT_METRIC_PATTERNS = "timeline.metrics.transient.metric.patterns";
+ public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS = "timeline.metrics.initial.configured.master.components";
+ public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS = "timeline.metrics.initial.configured.slave.components";
+
public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
@@ -353,7 +320,13 @@
public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
-
+
+ public static final String HSTORE_COMPACTION_CLASS_KEY = "hbase.hstore.defaultengine.compactionpolicy.class";
+ public static final String HSTORE_ENGINE_CLASS = "hbase.hstore.engine.class";
+ public static final String FIFO_COMPACTION_POLICY_CLASS = "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
+ public static final String DATE_TIERED_COMPACTION_POLICY = "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
+ public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsSslConf;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java
new file mode 100644
index 0000000..89bb843
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputer.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS;
+
+public class TimelineMetricSplitPointComputer {
+
+ private static final Log LOG = LogFactory.getLog(TimelineMetricSplitPointComputer.class);
+ private Set<String> masterComponents = new HashSet<>();
+ private Set<String> slaveComponents = new HashSet<>();
+
+ private static final int MINIMUM_PRECISION_TABLE_REGIONS = 4;
+ private static final int MINIMUM_AGGREGATE_TABLE_REGIONS = 2;
+ private static final int OTHER_TABLE_STATIC_REGIONS = 8;
+ private static final int SLAVE_EQUIDISTANT_POINTS = 50;
+ private static final int MASTER_EQUIDISTANT_POINTS = 5;
+
+ private List<byte[]> precisionSplitPoints = new ArrayList<>();
+ private List<byte[]> aggregateSplitPoints = new ArrayList<>();
+
+ public TimelineMetricSplitPointComputer(Configuration metricsConf,
+ Configuration hbaseConf,
+ TimelineMetricMetadataManager timelineMetricMetadataManager) {
+
+ String componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "");
+ if (StringUtils.isNotEmpty(componentsString)) {
+ masterComponents.addAll(Arrays.asList(componentsString.split(",")));
+ }
+
+ componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "");
+ if (StringUtils.isNotEmpty(componentsString)) {
+ slaveComponents.addAll(Arrays.asList(componentsString.split(",")));
+ }
+
+ double hbaseTotalHeapsize = metricsConf.getDouble("hbase_total_heapsize", 1024*1024*1024);
+ double hbaseMemstoreUpperLimit = hbaseConf.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5);
+ double hbaseMemstoreFlushSize = hbaseConf.getDouble("hbase.hregion.memstore.flush.size", 134217728);
+
+ computeSplitPoints(hbaseTotalHeapsize, hbaseMemstoreUpperLimit, hbaseMemstoreFlushSize, timelineMetricMetadataManager);
+ }
+
+
+ private void computeSplitPoints(double hbaseTotalHeapsize,
+ double hbaseMemstoreUpperLimit,
+ double hbaseMemstoreFlushSize,
+ TimelineMetricMetadataManager timelineMetricMetadataManager) {
+
+ double memstoreMaxMemory = hbaseMemstoreUpperLimit * hbaseTotalHeapsize;
+ int maxInMemoryRegions = (int) ((memstoreMaxMemory / hbaseMemstoreFlushSize) - OTHER_TABLE_STATIC_REGIONS);
+
+ int targetPrecisionTableRegionCount = MINIMUM_PRECISION_TABLE_REGIONS;
+ int targetAggregateTableRegionCount = MINIMUM_AGGREGATE_TABLE_REGIONS;
+
+ if (maxInMemoryRegions > 2) {
+ targetPrecisionTableRegionCount = Math.max(4, (int)(0.70 * maxInMemoryRegions));
+ targetAggregateTableRegionCount = Math.max(2, (int)(0.15 * maxInMemoryRegions));
+ }
+
+ List<MetricApp> metricList = new ArrayList<>();
+
+ for (String component : masterComponents) {
+ metricList.addAll(getSortedMetricListForSplitPoint(component, false));
+ }
+
+ for (String component : slaveComponents) {
+ metricList.addAll(getSortedMetricListForSplitPoint(component, true));
+ }
+
+ int totalMetricLength = metricList.size();
+
+ if (targetPrecisionTableRegionCount > 1) {
+ int idx = (int) Math.ceil(totalMetricLength / targetPrecisionTableRegionCount);
+ int index = idx;
+ for (int i = 0; i < targetPrecisionTableRegionCount; i++) {
+ if (index < totalMetricLength - 1) {
+ MetricApp metricAppService = metricList.get(index);
+ byte[] uuid = timelineMetricMetadataManager.getUuid(
+ new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
+ true);
+ precisionSplitPoints.add(uuid);
+ index += idx;
+ }
+ }
+ }
+
+ if (targetAggregateTableRegionCount > 1) {
+ int idx = (int) Math.ceil(totalMetricLength / targetAggregateTableRegionCount);
+ int index = idx;
+ for (int i = 0; i < targetAggregateTableRegionCount; i++) {
+ if (index < totalMetricLength - 1) {
+ MetricApp metricAppService = metricList.get(index);
+ byte[] uuid = timelineMetricMetadataManager.getUuid(
+ new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
+ true);
+ aggregateSplitPoints.add(uuid);
+ index += idx;
+ }
+ }
+ }
+ }
+
+ private List<MetricApp> getSortedMetricListForSplitPoint(String component, boolean isSlave) {
+
+ String appId = getAppId(component);
+ List<MetricApp> metricList = new ArrayList<>();
+
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+
+ String strLine;
+ BufferedReader bufferedReader;
+
+ try (InputStream inputStream = classLoader.getResourceAsStream("metrics_def/" + appId.toUpperCase() + ".dat")) {
+
+ if (inputStream != null) {
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ LOG.info("Found split point candidate metrics for : " + appId);
+
+ while ((strLine = bufferedReader.readLine()) != null) {
+ metricList.add(new MetricApp(strLine.trim(), appId));
+ }
+ } else {
+ LOG.info("Split point candidate metrics not found for : " + appId);
+ }
+ } catch (Exception e) {
+ LOG.info("Error reading split point candidate metrics for component : " + component);
+ LOG.error(e);
+ }
+
+ if (isSlave) {
+ return getEquidistantMetrics(metricList, SLAVE_EQUIDISTANT_POINTS);
+ } else {
+ return getEquidistantMetrics(metricList, MASTER_EQUIDISTANT_POINTS);
+ }
+ }
+
+ private List<MetricApp> getEquidistantMetrics(List<MetricApp> metrics, int distance) {
+ List<MetricApp> selectedMetricApps = new ArrayList<>();
+
+ int idx = metrics.size() / distance;
+ if (idx == 0) {
+ return metrics;
+ }
+
+ int index = idx;
+ for (int i = 0; i < distance; i++) {
+ selectedMetricApps.add(metrics.get(index - 1));
+ index += idx;
+ }
+ return selectedMetricApps;
+ }
+
+
+ public List<byte[]> getPrecisionSplitPoints() {
+ return precisionSplitPoints;
+ }
+
+ public List<byte[]> getClusterAggregateSplitPoints() {
+ return aggregateSplitPoints;
+ }
+
+ public List<byte[]> getHostAggregateSplitPoints() {
+ return aggregateSplitPoints;
+ }
+
+ private String getAppId(String component) {
+
+ if (component.equalsIgnoreCase("METRICS_COLLECTOR")) {
+ return "ams-hbase";
+ }
+
+ if (component.equalsIgnoreCase("METRICS_MONITOR")) {
+ return "HOST";
+ }
+ return component;
+ }
+}
+
+class MetricApp implements Comparable{
+ String metricName;
+ String appId;
+
+ MetricApp(String metricName, String appId) {
+ this.metricName = metricName;
+ if (appId.startsWith("hbase")) {
+ this.appId = "hbase";
+ } else {
+ this.appId = appId;
+ }
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ MetricApp that = (MetricApp)o;
+
+ int metricCompare = metricName.compareTo(that.metricName);
+ if (metricCompare != 0) {
+ return metricCompare;
+ }
+
+ return appId.compareTo(that.appId);
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
index 0ab7929..ba7ce44 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
@@ -45,13 +45,13 @@
private static int failures = 0;
private final TimelineMetricConfiguration configuration;
- private TimelineMetricStore timelineMetricStore;
+ private HBaseTimelineMetricsService timelineMetricStore;
//used to call timelineMetricStore blocking methods with timeout
private ExecutorService executor = Executors.newSingleThreadExecutor();
- public TimelineMetricStoreWatcher(TimelineMetricStore timelineMetricStore,
+ public TimelineMetricStoreWatcher(HBaseTimelineMetricsService timelineMetricStore,
TimelineMetricConfiguration configuration) {
this.timelineMetricStore = timelineMetricStore;
this.configuration = configuration;
@@ -100,7 +100,7 @@
Callable<TimelineMetric> task = new Callable<TimelineMetric>() {
public TimelineMetric call() throws Exception {
- timelineMetricStore.putMetrics(metrics);
+ timelineMetricStore.putMetricsSkipCache(metrics);
TimelineMetrics timelineMetrics = timelineMetricStore.getTimelineMetrics(
Collections.singletonList(FAKE_METRIC_NAME), Collections.singletonList(FAKE_HOSTNAME),
FAKE_APP_ID, null, startTime - delay * 2 * 1000,
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
index 1ca5bc0..737c2ff 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
+import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -38,9 +40,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy;
import org.apache.ambari.metrics.core.timeline.uuid.MD5UuidGenStrategy;
+import org.apache.ambari.metrics.core.timeline.uuid.Murmur3HashUuidGenStrategy;
+import org.apache.ambari.metrics.core.timeline.uuid.TimelineMetricUuid;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ArrayUtils;
@@ -53,13 +58,19 @@
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaRegexFromSqlRegex;
@@ -67,18 +78,18 @@
private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class);
// Cache all metadata on retrieval
private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>();
- private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>();
+ private final Map<TimelineMetricUuid, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>();
// Map to lookup apps on a host
private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>();
- private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>();
+ private final Map<TimelineMetricUuid, String> uuidHostMap = new ConcurrentHashMap<>();
private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>();
// Sync only when needed
AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false);
AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false);
- private MetricUuidGenStrategy uuidGenStrategy = new HashBasedUuidGenStrategy();
+ private MetricUuidGenStrategy uuidGenStrategy = new Murmur3HashUuidGenStrategy();
public static final int TIMELINE_METRIC_UUID_LENGTH = 16;
- public static final int HOSTNAME_UUID_LENGTH = 16;
+ public static int HOSTNAME_UUID_LENGTH = 4;
//Transient metric patterns. No UUID management and aggregation for such metrics.
private List<String> transientMetricPatterns = new ArrayList<>();
@@ -120,7 +131,54 @@
* Initialize Metadata from the store
*/
public void initializeMetadata() {
- metricMetadataSync = new TimelineMetricMetadataSync(this);
+
+ //Create metadata schema
+ Connection conn = null;
+ Statement stmt = null;
+
+ String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
+ String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
+
+ try {
+ LOG.info("Initializing metrics metadata schema...");
+ conn = hBaseAccessor.getConnectionRetryingOnException();
+ stmt = conn.createStatement();
+
+ // Metadata
+ String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
+ encoding, compression);
+ stmt.executeUpdate(metadataSql);
+
+ String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
+ encoding, compression);
+ stmt.executeUpdate(hostedAppSql);
+
+ //Host Instances table
+ String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
+ encoding, compression);
+ stmt.executeUpdate(hostedInstancesSql);
+ } catch (SQLException | InterruptedException sql) {
+ LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
+ throw new MetricsSystemInitializationException(
+ "Error creating Metrics Metadata Schema in HBase using Phoenix.", sql);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+
+ metricMetadataSync = new TimelineMetricMetadataSync(this);
// Schedule the executor to sync to store
executorService.scheduleWithFixedDelay(metricMetadataSync,
metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
@@ -335,14 +393,26 @@
for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) {
TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key);
if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) {
- uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key);
+ uuidKeyMap.put(new TimelineMetricUuid(timelineMetricMetadata.getUuid()), key);
+ }
+ }
+
+ if (!HOSTED_APPS_MAP.isEmpty()) {
+ Map.Entry<String, TimelineMetricHostMetadata> entry = HOSTED_APPS_MAP.entrySet().iterator().next();
+ TimelineMetricHostMetadata timelineMetricHostMetadata = entry.getValue();
+ if (timelineMetricHostMetadata.getUuid() != null && timelineMetricHostMetadata.getUuid().length == 16) {
+ HOSTNAME_UUID_LENGTH = 16;
+ uuidGenStrategy = new MD5UuidGenStrategy();
+ } else {
+ HOSTNAME_UUID_LENGTH = 4;
+ uuidGenStrategy = new Murmur3HashUuidGenStrategy();
}
}
for (String host : HOSTED_APPS_MAP.keySet()) {
TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host);
if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) {
- uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), host);
+ uuidHostMap.put(new TimelineMetricUuid(timelineMetricHostMetadata.getUuid()), host);
}
}
}
@@ -354,11 +424,11 @@
*/
private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) {
String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
- if ("hash".equalsIgnoreCase(strategy)) {
- return new HashBasedUuidGenStrategy();
+ if ("md5".equalsIgnoreCase(strategy)){
+ return new MD5UuidGenStrategy();
} else {
//Default
- return new MD5UuidGenStrategy();
+ return new Murmur3HashUuidGenStrategy();
}
}
@@ -379,14 +449,13 @@
}
if (!createIfNotPresent) {
- LOG.warn("UUID not found for " + hostname + ", createIfNotPresent is false");
+ LOG.debug("UUID not found for " + hostname + ", createIfNotPresent is false");
return null;
}
byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH);
- String uuidStr = new String(uuid);
- if (uuidHostMap.containsKey(uuidStr)) {
- LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuidStr));
+ if (uuidHostMap.containsKey(new TimelineMetricUuid(uuid))) {
+ LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuid));
return null;
}
@@ -395,7 +464,7 @@
HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata);
}
timelineMetricHostMetadata.setUuid(uuid);
- uuidHostMap.put(uuidStr, hostname);
+ uuidHostMap.put(new TimelineMetricUuid(uuid), hostname);
return uuid;
}
@@ -420,17 +489,16 @@
}
if (!createIfNotPresent) {
- LOG.warn("UUID not found for " + key + ", createIfNotPresent is false");
+ LOG.debug("UUID not found for " + key + ", createIfNotPresent is false");
return null;
}
- byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH);
+ byte[] uuidBytes = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH);
- String uuidStr = new String(uuid);
- if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) {
- TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr);
- LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid + ") computed for " + timelineClusterMetric.toString()
- + ", Collides with " + collidingKey.toString());
+ TimelineMetricUuid uuid = new TimelineMetricUuid(uuidBytes);
+ if (uuidKeyMap.containsKey(uuid) && !uuidKeyMap.get(uuid).equals(key)) {
+ TimelineMetricMetadataKey collidingKey = uuidKeyMap.get(uuid);
+ LOG.error("Duplicate key " + uuid + " computed for " + timelineClusterMetric + ", Collides with " + collidingKey);
return null;
}
@@ -442,10 +510,10 @@
METADATA_CACHE.put(key, timelineMetricMetadata);
}
- timelineMetricMetadata.setUuid(uuid);
+ timelineMetricMetadata.setUuid(uuid.uuid);
timelineMetricMetadata.setIsPersisted(false);
- uuidKeyMap.put(uuidStr, key);
- return uuid;
+ uuidKeyMap.put(uuid, key);
+ return uuid.uuid;
}
/**
@@ -484,14 +552,14 @@
return metricUuid;
}
- public String getMetricNameFromUuid(byte[] uuid) {
+ public String getMetricNameFromUuid(byte[] uuid) {
byte[] metricUuid = uuid;
if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) {
metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH);
}
- TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+ TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid));
return key != null ? key.getMetricName() : null;
}
@@ -506,11 +574,11 @@
}
if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) {
- TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid));
+ TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(uuid));
return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null;
} else {
byte[] metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH);
- TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+ TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid));
if (key == null) {
LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid));
return null;
@@ -521,7 +589,7 @@
timelineMetric.setInstanceId(key.instanceId);
byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH);
- timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid)));
+ timelineMetric.setHostName(uuidHostMap.get(new TimelineMetricUuid(hostUuid)));
return timelineMetric;
}
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index f76933a..e0cc642 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -43,8 +43,11 @@
/**
* Create table to store individual metric records.
*/
+
+ public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD_UUID";
+
public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
- "EXISTS METRIC_RECORD_UUID (UUID BINARY(32) NOT NULL, " +
+ "EXISTS " + METRICS_RECORD_TABLE_NAME + " (UUID BINARY(20) NOT NULL, " +
"SERVER_TIME BIGINT NOT NULL, " +
"METRIC_SUM DOUBLE, " +
"METRIC_COUNT UNSIGNED_INT, " +
@@ -83,7 +86,7 @@
public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS %s " +
- "(UUID BINARY(32) NOT NULL, " +
+ "(UUID BINARY(20) NOT NULL, " +
"SERVER_TIME BIGINT NOT NULL, " +
"METRIC_SUM DOUBLE," +
"METRIC_COUNT UNSIGNED_INT, " +
@@ -155,7 +158,7 @@
public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA_UUID " +
- "(HOSTNAME VARCHAR, UUID BINARY(16), APP_IDS VARCHAR, " +
+ "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " +
"CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
"DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
@@ -165,9 +168,6 @@
"CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
"DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
- public static final String ALTER_METRICS_METADATA_TABLE =
- "ALTER TABLE METRICS_METADATA_UUID ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
-
////////////////////////////////
/**
@@ -442,8 +442,6 @@
public static final String METRICS_CLUSTER_AGGREGATE_DAILY_V1_TABLE_NAME =
"METRIC_AGGREGATE_DAILY";
- public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD_UUID";
-
public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
"METRIC_RECORD_MINUTE_UUID";
public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
@@ -459,7 +457,7 @@
public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
"METRIC_AGGREGATE_DAILY_UUID";
- public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = Pattern.compile("METRIC_.*_UUID");
+ public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = Pattern.compile("METRIC_.*");
public static final String[] PHOENIX_TABLES = {
METRICS_RECORD_TABLE_NAME,
@@ -469,7 +467,9 @@
METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
- METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME,
+ METRIC_TRANSIENT_TABLE_NAME,
+ CONTAINER_METRICS_TABLE_NAME
};
public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java
new file mode 100644
index 0000000..9418aa4
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/Murmur3HashUuidGenStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.commons.lang.StringUtils;
+
+public class Murmur3HashUuidGenStrategy implements MetricUuidGenStrategy{
+
+ @Override
+ public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) {
+
+ String metricString = timelineClusterMetric.getMetricName() + timelineClusterMetric.getAppId();
+ if (StringUtils.isNotEmpty(timelineClusterMetric.getInstanceId())) {
+ metricString += timelineClusterMetric.getInstanceId();
+ }
+ byte[] metricBytes = metricString.getBytes();
+ return Hashing.murmur3_128().hashBytes(metricBytes).asBytes();
+ }
+
+ @Override
+ public byte[] computeUuid(String value, int maxLength) {
+ byte[] valueBytes = value.getBytes();
+ return Hashing.murmur3_32().hashBytes(valueBytes).asBytes();
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java
new file mode 100644
index 0000000..7907ff6
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuid.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import java.util.Arrays;
+
+public class TimelineMetricUuid {
+ public byte[] uuid;
+
+ public TimelineMetricUuid(byte[] uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(uuid);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+
+ if (this == o) {
+ return false;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TimelineMetricUuid that = (TimelineMetricUuid) o;
+
+ return Arrays.equals(this.uuid, that.uuid);
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(uuid);
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
index 9c88b1a..3bcbaf6 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/webapp/TimelineWebServices.java
@@ -144,8 +144,8 @@
// TODO: Check ACLs for MetricEntity using the TimelineACLManager.
// TODO: Save owner of the MetricEntity.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing metrics: " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Storing metrics: " +
TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
}
@@ -175,8 +175,8 @@
}
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing aggregated metrics: " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Storing aggregated metrics: " +
TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
}
@@ -200,8 +200,8 @@
}
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing container metrics: " + TimelineUtils
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Storing container metrics: " + TimelineUtils
.dumpTimelineRecordtoJSON(metrics, true));
}
@@ -250,8 +250,8 @@
) {
init(res);
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for metrics => metricNames: " + metricNames + ", " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Request for metrics => metricNames: " + metricNames + ", " +
"appId: " + appId + ", instanceId: " + instanceId + ", " +
"hostname: " + hostname + ", startTime: " + startTime + ", " +
"endTime: " + endTime + ", " +
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat
new file mode 100644
index 0000000..f5c181a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.dat
@@ -0,0 +1 @@
+AMBARI_METRICS.SmokeTest.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/MASTER_HBASE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_MASTER.dat
similarity index 100%
rename from ambari-metrics-timelineservice/src/main/resources/metrics_def/MASTER_HBASE.dat
rename to ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_MASTER.dat
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/SLAVE_HBASE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_REGIONSERVER.dat
similarity index 100%
rename from ambari-metrics-timelineservice/src/main/resources/metrics_def/SLAVE_HBASE.dat
rename to ambari-metrics-timelineservice/src/main/resources/metrics_def/HBASE_REGIONSERVER.dat
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat
new file mode 100644
index 0000000..af73d02
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.dat
@@ -0,0 +1 @@
+TimelineMetricStoreWatcher.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
index 258054c..26078cb 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
@@ -18,6 +18,7 @@
package org.apache.ambari.metrics.core.timeline;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
@@ -100,8 +101,12 @@
// inits connection, starts mini cluster
conn = getConnection(getUrl());
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+ metadataManager = new TimelineMetricMetadataManager(metricsConf, hdb);
+ metadataManager.initializeMetadata();
hdb.initMetricSchema();
- metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
hdb.setMetadataInstance(metadataManager);
}
@@ -206,6 +211,8 @@
metricsConf.set("timeline.metrics.transient.metric.patterns", "topology%");
// Unit tests insert values into the future
metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
+ metricsConf.set("timeline.metrics." + METRICS_RECORD_TABLE_NAME + ".durability", "SKIP_WAL");
+ metricsConf.set("timeline.metrics." + METRICS_CLUSTER_AGGREGATE_TABLE_NAME + ".durability", "ASYNC_WAL");
return
new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
index 65b5a1b..20fbc58 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -19,11 +19,13 @@
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
-import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_ENGINE_CLASS;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN;
@@ -42,6 +44,7 @@
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -367,28 +370,30 @@
precisionValues.put(METRICS_RECORD_TABLE_NAME, precisionTtl);
f.set(hdb, precisionValues);
- Field f2 = PhoenixHBaseAccessor.class.getDeclaredField("timelineMetricsTablesDurability");
- f2.setAccessible(true);
- f2.set(hdb, "ASYNC_WAL");
-
hdb.initPoliciesAndTTL();
// Verify expected policies are set
boolean normalizerEnabled = false;
String precisionTableCompactionPolicy = null;
String aggregateTableCompactionPolicy = null;
- boolean tableDurabilitySet = false;
- for (int i = 0; i < 10; i++) {
+ boolean precisionTableDurabilitySet = false;
+ boolean aggregateTableDurabilitySet = false;
+
+ boolean isComplete = false;
+
+ for (int i = 0; i < 10 && !isComplete; i++) {
LOG.warn("Policy check retry : " + i);
for (String tableName : PHOENIX_TABLES) {
TableName[] tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, false);
+ TableName[] containerMetricsTableName = hBaseAdmin.listTableNames(CONTAINER_METRICS_TABLE_NAME, false);
+ tableNames = (TableName[]) ArrayUtils.addAll(tableNames, containerMetricsTableName);
+
Optional<TableName> tableNameOptional = Arrays.stream(tableNames)
.filter(t -> tableName.equals(t.getNameAsString())).findFirst();
TableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get());
normalizerEnabled = tableDescriptor.isNormalizationEnabled();
- tableDurabilitySet = (Durability.ASYNC_WAL.equals(tableDescriptor.getDurability()));
if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
precisionTableCompactionPolicy = tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY);
} else {
@@ -398,17 +403,25 @@
// Best effort for 20 seconds
if (normalizerEnabled || (precisionTableCompactionPolicy == null && aggregateTableCompactionPolicy == null)) {
Thread.sleep(2000l);
+ } else {
+ isComplete = true;
}
if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+ precisionTableDurabilitySet = (Durability.SKIP_WAL.equals(tableDescriptor.getDurability()));
for (ColumnFamilyDescriptor family : tableDescriptor.getColumnFamilies()) {
precisionTtl = family.getTimeToLive();
}
}
+
+ if (tableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+ aggregateTableDurabilitySet = (Durability.ASYNC_WAL.equals(tableDescriptor.getDurability()));
+ }
}
}
Assert.assertFalse("Normalizer disabled.", normalizerEnabled);
- Assert.assertTrue("Durability Set.", tableDurabilitySet);
+ Assert.assertTrue("METRIC_RECORD_UUID Durability Set.", precisionTableDurabilitySet);
+ Assert.assertTrue("METRIC_AGGREGATE_UUID Durability Set.", aggregateTableDurabilitySet);
Assert.assertEquals("FIFO compaction policy is set for METRIC_RECORD_UUID.", FIFO_COMPACTION_POLICY_CLASS, precisionTableCompactionPolicy);
Assert.assertEquals("FIFO compaction policy is set for aggregate tables", DATE_TIERED_COMPACTION_POLICY, aggregateTableCompactionPolicy);
Assert.assertEquals("Precision TTL value as expected.", 86400, precisionTtl);
@@ -441,7 +454,7 @@
metric.setExitCode(0);
List<ContainerMetric> list = Arrays.asList(metric);
hdb.insertContainerMetrics(list);
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS_UUID");
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS");
ResultSet set = stmt.executeQuery();
// check each filed is set properly when read back.
boolean foundRecord = false;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
index 9d1b2a4..63ec59e 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
@@ -109,7 +109,7 @@
mockStatic(PhoenixTransactSQL.class);
PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
- Condition condition = new DefaultCondition(Collections.singletonList(new byte[32]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+ Condition condition = new DefaultCondition(Collections.singletonList(new byte[20]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
@@ -138,7 +138,7 @@
mockStatic(PhoenixTransactSQL.class);
PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
- Condition condition = new DefaultCondition(Collections.singletonList(new byte[32]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+ Condition condition = new DefaultCondition(Collections.singletonList(new byte[20]), metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java
new file mode 100644
index 0000000..4d663cc
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricSplitPointComputerTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.hadoop.conf.Configuration;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS;
+import static org.easymock.EasyMock.anyBoolean;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+
+public class TimelineMetricSplitPointComputerTest {
+
+ @Test
+ public void testSplitPointComputationForBasicCluster() {
+
+ /**
+ * HBase Total heap = 1G.
+ * HDFS,HBASE,YARN services deployed.
+ */
+ Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+ andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER").once();
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+ andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+ expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(1024 * 1024 * 1024.0).once();
+
+ Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+ expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+ expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(134217728.0).once();
+
+ TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+ replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+ TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+ hbaseConfMock,
+ metricMetadataManagerMock);
+
+ Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 3);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 1);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 1);
+ }
+
+ @Test
+ public void testSplitPointComputationForMediumCluster() {
+
+ /**
+ * HBase Total heap = 8G.
+ * All services deployed.
+ */
+ Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+ andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER," +
+ "NIMBUS,HIVESERVER2,HIVEMETASTORE,HBASE_MASTER,KAFKA_BROKER").once();
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+ andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+ expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(8589934592.0).once();
+
+ Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+ expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+ expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(134217728.0).once();
+
+ TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+ replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+ TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+ hbaseConfMock,
+ metricMetadataManagerMock);
+
+ Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 16);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 3);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 3);
+ }
+
+ @Test
+ public void testSplitPointComputationForLargeCluster() {
+
+ /**
+ * HBase Total heap = 24G.
+ * All services deployed.
+ */
+ Configuration metricsConfMock = EasyMock.createMock(Configuration.class);
+
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "")).
+ andReturn("METRICS_COLLECTOR,AMBARI_SERVER,NAMENODE,RESOURCEMANAGER," +
+ "NIMBUS,HIVESERVER2,HIVEMETASTORE,HBASE_MASTER,KAFKA_BROKER").once();
+ expect(metricsConfMock.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "")).
+ andReturn("METRICS_MONITOR,DATANODE,NODEMANAGER,HBASE_REGIONSERVER").once();
+ expect(metricsConfMock.getDouble("hbase_total_heapsize", 1024*1024*1024)).andReturn(24 * 1024 * 1024 * 1024.0).once();
+
+ Configuration hbaseConfMock = EasyMock.createMock(Configuration.class);
+ expect(hbaseConfMock.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5)).andReturn(0.5).once();
+ expect(hbaseConfMock.getDouble("hbase.hregion.memstore.flush.size", 134217728)).andReturn(2 * 134217728.0).once();
+
+ TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]);
+
+ replay(metricsConfMock, hbaseConfMock, metricMetadataManagerMock);
+
+ TimelineMetricSplitPointComputer timelineMetricSplitPointComputer = new TimelineMetricSplitPointComputer(metricsConfMock,
+ hbaseConfMock,
+ metricMetadataManagerMock);
+
+ Assert.assertEquals(timelineMetricSplitPointComputer.getPrecisionSplitPoints().size(), 28);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getClusterAggregateSplitPoints().size(), 6);
+ Assert.assertEquals(timelineMetricSplitPointComputer.getHostAggregateSplitPoints().size(), 6);
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
index de0236c..eb64198 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
@@ -48,10 +48,10 @@
@Test
public void testRunPositive() throws Exception {
- TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+ HBaseTimelineMetricsService metricStore = createNiceMock(HBaseTimelineMetricsService.class);
- expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
- .andReturn(new TimelinePutResponse());
+ metricStore.putMetricsSkipCache(anyObject(TimelineMetrics.class));
+ expectLastCall().once();
// metric found
expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
@@ -75,7 +75,7 @@
@Test
public void testRunNegative() throws Exception {
- TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+ HBaseTimelineMetricsService metricStore = createNiceMock(HBaseTimelineMetricsService.class);
expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
.andReturn(new TimelinePutResponse());
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
index 2f2b0b5..28bb75e 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
@@ -160,7 +160,7 @@
byte[] uuid = metadataManager.getUuid(timelineMetric, true);
Assert.assertNotNull(uuid);
- Assert.assertEquals(uuid.length, 32);
+ Assert.assertEquals(uuid.length, 20);
byte[] uuidWithoutHost = metadataManager.getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), -1), true);
Assert.assertNotNull(uuidWithoutHost);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java
new file mode 100644
index 0000000..a25310b
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/MetricUuidGenStrategyTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.uuid;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MetricUuidGenStrategyTest {
+
+
+ private static List<String> apps = Arrays.asList("namenode",
+ "datanode", "hbase_master", "hbase_regionserver", "kafka_broker", "nimbus", "ams-hbase",
+ "accumulo", "nodemanager", "resourcemanager", "ambari_server", "HOST", "timeline_metric_store_watcher",
+ "jobhistoryserver", "hiveserver2", "hivemetastore", "applicationhistoryserver", "amssmoketestfake", "llapdaemon");
+
+ private static Map<String, Set<String>> metricSet = new HashMap<>();
+
+ @BeforeClass
+ public static void init() {
+ metricSet = new HashMap<>(populateMetricWhitelistFromFile());
+ }
+
+ @Test
+ @Ignore
+ public void testHashBasedUuid() throws SQLException {
+ testMetricCollisionsForUuidGenStrategy(new HashBasedUuidGenStrategy(), 16);
+ }
+
+ @Test
+ @Ignore
+ public void testHashBasedUuidForAppIds() throws SQLException {
+ MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
+ Map<String, TimelineClusterMetric> uuids = new HashMap<>();
+ for (String app : metricSet.keySet()) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
+ byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
+ String uuidStr = new String(uuid);
+ Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
+ uuids.put(uuidStr, timelineClusterMetric);
+ }
+ }
+
+ @Test
+ @Ignore
+ public void testHashBasedUuidForHostnames() throws SQLException {
+ testHostCollisionsForUuidGenStrategy(new HashBasedUuidGenStrategy(), 16);
+ }
+
+
+ @Test
+ public void testMD5BasedUuid() throws SQLException {
+ testMetricCollisionsForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+
+ }
+
+ @Test
+ public void testMD5BasedUuidForHostnames() throws SQLException {
+ testHostCollisionsForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+ }
+
+
+ @Test
+ public void testMD5ConsistentHashing() throws SQLException, InterruptedException {
+ testConsistencyForUuidGenStrategy(new MD5UuidGenStrategy(), 16);
+ }
+
+
+ @Test
+ public void testMurmur3HashUuid() throws SQLException {
+ testMetricCollisionsForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 16);
+ }
+
+ @Test
+ public void testMurmur3HashingBasedUuidForHostnames() throws SQLException {
+ testHostCollisionsForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 4);
+ }
+
+ @Test
+ public void testMurmur3ConsistentHashing() throws SQLException, InterruptedException {
+ testConsistencyForUuidGenStrategy(new Murmur3HashUuidGenStrategy(), 4);
+ }
+
+ private void testMetricCollisionsForUuidGenStrategy(MetricUuidGenStrategy strategy, int uuidLength) {
+ Map<TimelineMetricUuid, TimelineClusterMetric> uuids = new HashMap<>();
+ for (String app : metricSet.keySet()) {
+ Set<String> metrics = metricSet.get(app);
+ for (String m : metrics) {
+ TimelineClusterMetric metric = new TimelineClusterMetric(m, app, null, -1l);
+ byte[] uuid = strategy.computeUuid(metric, uuidLength);
+ Assert.assertNotNull(uuid);
+ Assert.assertTrue(uuid.length == uuidLength);
+ TimelineMetricUuid uuidStr = new TimelineMetricUuid(uuid);
+ Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(metric));
+ uuids.put(uuidStr, metric);
+ }
+ }
+ }
+
+
+ private void testHostCollisionsForUuidGenStrategy(MetricUuidGenStrategy strategy, int uuidLength) {
+ Map<TimelineMetricUuid, String> uuids = new HashMap<>();
+
+ List<String> hosts = new ArrayList<>();
+ String hostPrefix = "TestHost.";
+ String hostSuffix = ".ambari.apache.org";
+
+ for (int i=0; i<=2000; i++) {
+ hosts.add(hostPrefix + i + hostSuffix);
+ }
+
+ for (String host : hosts) {
+ byte[] uuid = strategy.computeUuid(host, uuidLength);
+ Assert.assertNotNull(uuid);
+ Assert.assertTrue(uuid.length == uuidLength);
+ TimelineMetricUuid uuidStr = new TimelineMetricUuid(uuid);
+ Assert.assertFalse(uuids.containsKey(uuidStr));
+ uuids.put(uuidStr, host);
+ }
+ }
+
+ private void testConsistencyForUuidGenStrategy(MetricUuidGenStrategy strategy, int length) throws InterruptedException {
+ String key = "TestString";
+
+ byte[] uuid = strategy.computeUuid(key, length);
+ Assert.assertNotNull(uuid);
+ Assert.assertTrue(uuid.length == length);
+
+ for (int i = 0; i<100; i++) {
+ byte[] uuid2 = strategy.computeUuid(key, length);
+ Assert.assertNotNull(uuid2);
+ Assert.assertTrue(uuid2.length == length);
+ Assert.assertArrayEquals(uuid, uuid2);
+ Thread.sleep(10);
+ }
+ }
+
+ private static Map<String, Set<String>> populateMetricWhitelistFromFile() {
+
+ Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
+ FileInputStream fstream = null;
+ Set<String> hbaseMetrics = new HashSet<>();
+ BufferedReader br = null;
+ String strLine;
+ for (String appId : apps) {
+ URL fileUrl = ClassLoader.getSystemResource("metrics_def/" + appId.toUpperCase() + ".dat");
+
+ Set<String> metricsForApp = new HashSet<>();
+ try {
+ fstream = new FileInputStream(fileUrl.getPath());
+ br = new BufferedReader(new InputStreamReader(fstream));
+ while ((strLine = br.readLine()) != null) {
+ strLine = strLine.trim();
+ metricsForApp.add(strLine);
+ }
+ } catch (Exception ioEx) {
+ System.out.println("Metrics for AppId " + appId + " not found.");
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ }
+ }
+
+ if (fstream != null) {
+ try {
+ fstream.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ metricsForApp.add("live_hosts");
+ if (appId.startsWith("hbase")) {
+ hbaseMetrics.addAll(metricsForApp);
+ } else {
+ metricSet.put(appId, metricsForApp);
+ }
+ System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
+ }
+ metricSet.put("hbase", hbaseMetrics);
+ return metricSet;
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java
deleted file mode 100644
index e4018bb..0000000
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/uuid/TimelineMetricUuidManagerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.core.timeline.uuid;
-
-import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TimelineMetricUuidManagerTest {
-
-
- private List<String> apps = Arrays.asList("namenode",
- "datanode", "master_hbase", "slave_hbase", "kafka_broker", "nimbus", "ams-hbase",
- "accumulo", "nodemanager", "resourcemanager", "ambari_server", "HOST", "timeline_metric_store_watcher",
- "jobhistoryserver", "hiveserver2", "hivemetastore", "applicationhistoryserver", "amssmoketestfake");
-
- private Map<String, Set<String>> metricSet = new HashMap<>(populateMetricWhitelistFromFile());
-
- @Test
- @Ignore("Collisions possible")
- public void testHashBasedUuidForMetricName() throws SQLException {
-
- MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
- Map<String, TimelineClusterMetric> uuids = new HashMap<>();
- for (String app : metricSet.keySet()) {
- Set<String> metrics = metricSet.get(app);
- for (String metric : metrics) {
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(metric, app, null, -1l);
- byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
- Assert.assertNotNull(uuid);
- Assert.assertTrue(uuid.length == 16);
- String uuidStr = new String(uuid);
- Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
- uuids.put(uuidStr, timelineClusterMetric);
- }
- }
- }
-
- @Test
- public void testHaseBasedUuidForAppIds() throws SQLException {
-
- MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
- Map<String, TimelineClusterMetric> uuids = new HashMap<>();
- for (String app : metricSet.keySet()) {
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
- byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
- String uuidStr = new String(uuid);
- Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
- uuids.put(uuidStr, timelineClusterMetric);
- }
- }
-
- @Test
- public void testHashBasedUuidForHostnames() throws SQLException {
-
- MetricUuidGenStrategy strategy = new HashBasedUuidGenStrategy();
- Map<String, String> uuids = new HashMap<>();
-
- List<String> hosts = new ArrayList<>();
- String hostPrefix = "TestHost.";
- String hostSuffix = ".ambari.apache.org";
-
- for (int i=0; i<=2000; i++) {
- hosts.add(hostPrefix + i + hostSuffix);
- }
-
- for (String host : hosts) {
- byte[] uuid = strategy.computeUuid(host, 16);
- Assert.assertNotNull(uuid);
- Assert.assertTrue(uuid.length == 16);
- String uuidStr = new String(uuid);
- Assert.assertFalse(uuids.containsKey(uuidStr));
- uuids.put(uuidStr, host);
- }
- }
-
-
- @Test
- public void testRandomUuidForWhitelistedMetrics() throws SQLException {
-
- MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
- Map<String, String> uuids = new HashMap<>();
- for (String app : metricSet.keySet()) {
- Set<String> metrics = metricSet.get(app);
- for (String metric : metrics) {
- byte[] uuid = strategy.computeUuid(new TimelineClusterMetric(metric, app, null, -1l), 16);
- Assert.assertNotNull(uuid);
- Assert.assertTrue(uuid.length == 16);
- String uuidStr = new String(uuid);
- Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(metric));
- uuids.put(uuidStr, metric);
- }
- }
- }
-
- @Test
- public void testRandomUuidForHostnames() throws SQLException {
-
- MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
- Map<String, String> uuids = new HashMap<>();
-
- List<String> hosts = new ArrayList<>();
- String hostPrefix = "TestHost.";
- String hostSuffix = ".ambari.apache.org";
-
- for (int i=0; i<=2000; i++) {
- hosts.add(hostPrefix + i + hostSuffix);
- }
-
- int numC = 0;
- for (String host : hosts) {
- byte[] uuid = strategy.computeUuid(host, 16);
- Assert.assertNotNull(uuid);
- Assert.assertTrue(uuid.length == 16);
- String uuidStr = new String(uuid);
- Assert.assertFalse(uuids.containsKey(uuidStr));
- uuids.put(uuidStr, host);
- }
- }
-
-
- @Test
- public void testConsistentHashing() throws SQLException, InterruptedException {
-
- MetricUuidGenStrategy strategy = new MD5UuidGenStrategy();
- String key = "TestString";
-
- byte[] uuid = strategy.computeUuid(key, 16);
- Assert.assertNotNull(uuid);
- Assert.assertTrue(uuid.length == 16);
-
- for (int i = 0; i<100; i++) {
- byte[] uuid2 = strategy.computeUuid(key, 16);
- Assert.assertNotNull(uuid2);
- Assert.assertTrue(uuid2.length == 16);
- Assert.assertArrayEquals(uuid, uuid2);
- Thread.sleep(10);
- }
- }
-
-
- public Map<String, Set<String>> populateMetricWhitelistFromFile() {
-
-
- Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
- FileInputStream fstream = null;
- Set<String> hbaseMetrics = new HashSet<>();
- BufferedReader br = null;
- String strLine;
- for (String appId : apps) {
- URL fileUrl = ClassLoader.getSystemResource("metrics_def/" + appId.toUpperCase() + ".dat");
-
- Set<String> metricsForApp = new HashSet<>();
- try {
- fstream = new FileInputStream(fileUrl.getPath());
- br = new BufferedReader(new InputStreamReader(fstream));
- while ((strLine = br.readLine()) != null) {
- strLine = strLine.trim();
- metricsForApp.add(strLine);
- }
- } catch (Exception ioEx) {
- System.out.println("Metrics for AppId " + appId + " not found.");
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (IOException e) {
- }
- }
-
- if (fstream != null) {
- try {
- fstream.close();
- } catch (IOException e) {
- }
- }
- }
- metricsForApp.add("live_hosts");
- if (appId.equals("master_hbase") || appId.equals("slave_hbase")) {
- hbaseMetrics.addAll(metricsForApp);
- } else {
- metricSet.put(appId, metricsForApp);
- }
- System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
- }
- metricSet.put("hbase", hbaseMetrics);
- return metricSet;
- }
-}