| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ambari.metrics.core.timeline; |
| |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.ambari.metrics.core.timeline.FunctionUtils.findMetricFunctions; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_TRANSIENT_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_TRANSIENT_METRICS_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRIC_TRANSIENT_TABLE_NAME; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_TRANSIENT_METRICS_SQL; |
| import static org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Timestamp; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils; |
| import org.apache.ambari.metrics.core.timeline.aggregators.Function; |
| import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; |
| import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricReadHelper; |
| import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata; |
| import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; |
| import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; |
| import org.apache.ambari.metrics.core.timeline.query.Condition; |
| import org.apache.ambari.metrics.core.timeline.query.DefaultPhoenixDataSource; |
| import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider; |
| import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; |
| import org.apache.ambari.metrics.core.timeline.query.SplitByMetricNamesCondition; |
| import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink; |
| import org.apache.ambari.metrics.core.timeline.sink.ExternalSinkProvider; |
| import org.apache.ambari.metrics.core.timeline.source.InternalMetricsSource; |
| import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Durability; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.util.RetryCounter; |
| import org.apache.hadoop.hbase.util.RetryCounterFactory; |
| import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; |
| import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; |
| import org.apache.hadoop.metrics2.sink.timeline.Precision; |
| import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.util.timeline.TimelineUtils; |
| import org.apache.phoenix.exception.PhoenixIOException; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.type.TypeReference; |
| |
| import com.google.common.collect.Multimap; |
| |
| |
| /** |
| * Provides a facade over the Phoenix API to access HBase schema |
| */ |
| public class PhoenixHBaseAccessor { |
| private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); |
| |
| static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; |
| |
| /** |
| * 22 metrics for 2hours in SECONDS, 2 hosts (1 minute data records) |
| * 22 * (2 * 60) * 2 = 5280 |
| * 22 cluster aggregate metrics for 2 hours (30 second data records) |
| * 22 * (2 * 60 * 2) = 5280 |
| * => Reasonable upper bound on the limit such that our Precision calculation for a given time range makes sense. |
| */ |
| public static int RESULTSET_LIMIT = 5760; |
| |
| public static int hostMinuteAggregatorDataInterval = 300; |
| public static int clusterMinuteAggregatorDataInterval = 300; |
| public static int clusterSecondAggregatorDataInterval = 30; |
| |
| static TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); |
| static ObjectMapper mapper = new ObjectMapper(); |
| static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {}; |
| |
| private final Configuration hbaseConf; |
| private final Configuration metricsConf; |
| private final RetryCounterFactory retryCounterFactory; |
| private final PhoenixConnectionProvider dataSource; |
| private final int cacheSize; |
| private final boolean cacheEnabled; |
| private final BlockingQueue<TimelineMetrics> insertCache; |
| private ScheduledExecutorService scheduledExecutorService; |
| private MetricsCacheCommitterThread metricsCommiterThread; |
| private TimelineMetricsAggregatorSink aggregatorSink; |
| private final int cacheCommitInterval; |
| private final boolean skipBlockCacheForAggregatorsEnabled; |
| private final String timelineMetricsTablesDurability; |
| private final String timelineMetricsPrecisionTableDurability; |
| private TimelineMetricMetadataManager metadataManagerInstance; |
| |
| static final String HSTORE_COMPACTION_CLASS_KEY = |
| "hbase.hstore.defaultengine.compactionpolicy.class"; |
| static final String HSTORE_ENGINE_CLASS = |
| "hbase.hstore.engine.class"; |
| static final String FIFO_COMPACTION_POLICY_CLASS = |
| "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy"; |
| static final String DATE_TIERED_COMPACTION_POLICY = |
| "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"; |
| static final String BLOCKING_STORE_FILES_KEY = |
| "hbase.hstore.blockingStoreFiles"; |
| |
| private Map<String, Integer> tableTTL = new HashMap<>(); |
| |
| private final TimelineMetricConfiguration configuration; |
| private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>(); |
| |
| public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) { |
| this(TimelineMetricConfiguration.getInstance(), dataSource); |
| } |
| |
| // Test friendly construction since mock instrumentation is difficult to get |
| // working with hadoop mini cluster |
| PhoenixHBaseAccessor(TimelineMetricConfiguration configuration, PhoenixConnectionProvider dataSource) { |
| this.configuration = TimelineMetricConfiguration.getInstance(); |
| try { |
| this.hbaseConf = configuration.getHbaseConf(); |
| this.metricsConf = configuration.getMetricsConf(); |
| } catch (Exception e) { |
| throw new ExceptionInInitializerError("Cannot initialize configuration."); |
| } |
| if (dataSource == null) { |
| dataSource = new DefaultPhoenixDataSource(hbaseConf); |
| } |
| this.dataSource = dataSource; |
| |
| RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT); |
| clusterSecondAggregatorDataInterval = metricsConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30); |
| hostMinuteAggregatorDataInterval = metricsConf.getInt(HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300); |
| clusterMinuteAggregatorDataInterval = metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300); |
| |
| try { |
| Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); |
| } catch (ClassNotFoundException e) { |
| LOG.error("Phoenix client jar not found in the classpath.", e); |
| throw new IllegalStateException(e); |
| } |
| |
| this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10), |
| (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3))); |
| this.cacheEnabled = Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_ENABLED, "true")); |
| this.cacheSize = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_SIZE, "150")); |
| this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3")); |
| this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize); |
| this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false); |
| this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY, ""); |
| this.timelineMetricsPrecisionTableDurability = metricsConf.get(TIMELINE_METRICS_PRECISION_TABLE_DURABILITY, ""); |
| |
| tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400)); // 1 day |
| tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 30 * 86400)); // 30 days |
| tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400)); //7 days |
| tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400)); //30 days |
| tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400)); //1 year |
| tableTTL.put(METRICS_CLUSTER_AGGREGATE_TABLE_NAME, metricsConf.getInt(CLUSTER_SECOND_TABLE_TTL, 7 * 86400)); //7 days |
| tableTTL.put(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.getInt(CLUSTER_MINUTE_TABLE_TTL, 30 * 86400)); //30 days |
| tableTTL.put(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.getInt(CLUSTER_HOUR_TABLE_TTL, 365 * 86400)); //1 year |
| tableTTL.put(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, metricsConf.getInt(CLUSTER_DAILY_TABLE_TTL, 730 * 86400)); //2 years |
| tableTTL.put(METRIC_TRANSIENT_TABLE_NAME, metricsConf.getInt(METRICS_TRANSIENT_TABLE_TTL, 7 * 86400)); //7 days |
| |
| if (cacheEnabled) { |
| LOG.debug("Initialising and starting metrics cache committer thread..."); |
| metricsCommiterThread = new MetricsCacheCommitterThread(this); |
| scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); |
| scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 0, cacheCommitInterval, TimeUnit.SECONDS); |
| } |
| |
| Class<? extends TimelineMetricsAggregatorSink> metricSinkClass = |
| metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null, |
| TimelineMetricsAggregatorSink.class); |
| if (metricSinkClass != null) { |
| aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, metricsConf); |
| LOG.info("Initialized aggregator sink class " + metricSinkClass); |
| } |
| |
| List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList(); |
| InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider(); |
| if (!externalSinkProviderList.isEmpty()) { |
| for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) { |
| ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS); |
| int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS); |
| if (interval == -1) { |
| interval = cacheCommitInterval; |
| } |
| rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink)); |
| } |
| } |
| TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance); |
| } |
| |
| public boolean isInsertCacheEmpty() { |
| return insertCache.isEmpty(); |
| } |
| |
| public void commitMetricsFromCache() { |
| LOG.debug("Clearing metrics cache"); |
| List<TimelineMetrics> metricsList = new ArrayList<TimelineMetrics>(insertCache.size()); |
| if (!insertCache.isEmpty()) { |
| insertCache.drainTo(metricsList); // More performant than poll |
| } |
| if (metricsList.size() > 0) { |
| commitMetrics(metricsList); |
| if (!rawMetricsSources.isEmpty()) { |
| for (InternalMetricsSource rawMetricsSource : rawMetricsSources) { |
| rawMetricsSource.publishTimelineMetrics(metricsList); |
| } |
| } |
| } |
| } |
| |
| public void commitMetrics(TimelineMetrics timelineMetrics) { |
| commitMetrics(Collections.singletonList(timelineMetrics)); |
| } |
| |
| public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) { |
| LOG.debug("Committing metrics to store"); |
| Connection conn = null; |
| PreparedStatement metricRecordStmt = null; |
| List<TimelineMetric> transientMetrics = new ArrayList<>(); |
| |
| try { |
| conn = getConnection(); |
| metricRecordStmt = conn.prepareStatement(String.format( |
| UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); |
| for (TimelineMetrics timelineMetrics : timelineMetricsCollection) { |
| for (TimelineMetric metric : timelineMetrics.getMetrics()) { |
| |
| if (metadataManagerInstance.isTransientMetric(metric.getMetricName(), metric.getAppId())) { |
| transientMetrics.add(metric); |
| continue; |
| } |
| metricRecordStmt.clearParameters(); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("host: " + metric.getHostName() + ", " + |
| "metricName = " + metric.getMetricName() + ", " + |
| "values: " + metric.getMetricValues()); |
| } |
| double[] aggregates = AggregatorUtils.calculateAggregates( |
| metric.getMetricValues()); |
| |
| if (aggregates[3] != 0.0) { |
| byte[] uuid = metadataManagerInstance.getUuid(metric, true); |
| if (uuid == null) { |
| LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString()); |
| continue; |
| } |
| metricRecordStmt.setBytes(1, uuid); |
| metricRecordStmt.setLong(2, metric.getStartTime()); |
| metricRecordStmt.setDouble(3, aggregates[0]); |
| metricRecordStmt.setDouble(4, aggregates[1]); |
| metricRecordStmt.setDouble(5, aggregates[2]); |
| metricRecordStmt.setLong(6, (long) aggregates[3]); |
| String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); |
| metricRecordStmt.setString(7, json); |
| |
| try { |
| int rows = metricRecordStmt.executeUpdate(); |
| } catch (SQLException sql) { |
| LOG.error("Failed on insert records to store.", sql); |
| } |
| } else { |
| LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," + |
| metric.getAppId() + "," + |
| metric.getHostName() + "," + |
| metric.getInstanceId() + "]"); |
| } |
| |
| } |
| } |
| if (CollectionUtils.isNotEmpty(transientMetrics)) { |
| commitTransientMetrics(conn, transientMetrics); |
| } |
| |
| // commit() blocked if HBase unavailable |
| conn.commit(); |
| } catch (Exception exception){ |
| exception.printStackTrace(); |
| } |
| finally { |
| if (metricRecordStmt != null) { |
| try { |
| metricRecordStmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| private void commitTransientMetrics(Connection conn, Collection<TimelineMetric> transientMetrics) throws SQLException, IOException { |
| LOG.debug("Committing transient metrics to store"); |
| PreparedStatement metricTransientRecordStmt = null; |
| |
| metricTransientRecordStmt = conn.prepareStatement(String.format( |
| UPSERT_TRANSIENT_METRICS_SQL, METRIC_TRANSIENT_TABLE_NAME)); |
| for (TimelineMetric metric : transientMetrics) { |
| |
| metricTransientRecordStmt.clearParameters(); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("host: " + metric.getHostName() + ", " + |
| "metricName = " + metric.getMetricName() + ", " + |
| "values: " + metric.getMetricValues()); |
| } |
| double[] aggregates = AggregatorUtils.calculateAggregates( |
| metric.getMetricValues()); |
| |
| metricTransientRecordStmt.setString(1, metric.getMetricName()); |
| metricTransientRecordStmt.setString(2, metric.getHostName()); |
| metricTransientRecordStmt.setString(3, metric.getAppId()); |
| metricTransientRecordStmt.setString(4, metric.getInstanceId()); |
| metricTransientRecordStmt.setLong(5, metric.getStartTime()); |
| metricTransientRecordStmt.setString(6, metric.getUnits()); |
| metricTransientRecordStmt.setDouble(7, aggregates[0]); |
| metricTransientRecordStmt.setDouble(8, aggregates[1]); |
| metricTransientRecordStmt.setDouble(9, aggregates[2]); |
| metricTransientRecordStmt.setLong(10, (long) aggregates[3]); |
| String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); |
| metricTransientRecordStmt.setString(11, json); |
| |
| try { |
| metricTransientRecordStmt.executeUpdate(); |
| } catch (SQLException sql) { |
| LOG.error("Failed on inserting transient metric records to store.", sql); |
| } |
| } |
| } |
| |
| |
| private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { |
| TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs); |
| metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS"))); |
| return metric; |
| } |
| |
| private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json) throws IOException { |
| TreeMap<Long, Double> values = readMetricFromJSON(json); |
| Long lastTimeStamp = values.lastKey(); |
| |
| TreeMap<Long, Double> valueMap = new TreeMap<Long, Double>(); |
| valueMap.put(lastTimeStamp, values.get(lastTimeStamp)); |
| return valueMap; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException { |
| return mapper.readValue(json, metricValuesTypeRef); |
| } |
| |
| private Connection getConnectionRetryingOnException() throws SQLException, InterruptedException { |
| RetryCounter retryCounter = retryCounterFactory.create(); |
| while (true) { |
| try{ |
| return getConnection(); |
| } catch (SQLException e) { |
| if(!retryCounter.shouldRetry()){ |
| LOG.error("HBaseAccessor getConnection failed after " |
| + retryCounter.getMaxAttempts() + " attempts"); |
| throw e; |
| } |
| } |
| retryCounter.sleepUntilNextRetry(); |
| } |
| } |
| |
| /** |
| * Get JDBC connection to HBase store. Assumption is that the hbase |
| * configuration is present on the classpath and loaded by the caller into |
| * the Configuration object. |
| * Phoenix already caches the HConnection between the client and HBase |
| * cluster. |
| * |
| * @return @java.sql.Connection |
| */ |
| public Connection getConnection() throws SQLException { |
| return dataSource.getConnection(); |
| } |
| |
| /** |
| * Unit test purpose only for now. |
| * @return @HBaseAdmin |
| * @throws IOException |
| */ |
| Admin getHBaseAdmin() throws IOException { |
| return dataSource.getHBaseAdmin(); |
| } |
| |
| protected void initMetricSchema() { |
| Connection conn = null; |
| Statement stmt = null; |
| |
| String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); |
| String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); |
| |
| |
| try { |
| LOG.info("Initializing metrics schema..."); |
| conn = getConnectionRetryingOnException(); |
| stmt = conn.createStatement(); |
| |
| // Metadata |
| String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(metadataSql); |
| stmt.executeUpdate(ALTER_METRICS_METADATA_TABLE); |
| |
| String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(hostedAppSql); |
| |
| //Host Instances table |
| String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(hostedInstancesSql); |
| |
| // Container Metrics |
| stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL, |
| encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression)); |
| |
| // Host level |
| String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, |
| encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression); |
| stmt.executeUpdate(precisionSql); |
| |
| String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, |
| METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME), |
| compression); |
| stmt.executeUpdate(hostMinuteAggregrateSql); |
| |
| stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, |
| METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME), |
| compression)); |
| stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, |
| METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_AGGREGATE_DAILY_TABLE_NAME), |
| compression)); |
| |
| // Cluster level |
| String aggregateSql = String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, |
| METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME), |
| compression); |
| |
| stmt.executeUpdate(aggregateSql); |
| stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, |
| METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME), |
| compression)); |
| stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, |
| METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME), |
| compression)); |
| stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, |
| METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, |
| tableTTL.get(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME), |
| compression)); |
| |
| // Metrics Transient Table |
| String transientMetricPatterns = metricsConf.get(TRANSIENT_METRIC_PATTERNS, StringUtils.EMPTY); |
| if (StringUtils.isNotEmpty(transientMetricPatterns)) { |
| String transientMetricsTableSql = String.format(CREATE_TRANSIENT_METRICS_TABLE_SQL, |
| encoding, tableTTL.get(METRIC_TRANSIENT_TABLE_NAME), compression); |
| int row = stmt.executeUpdate(transientMetricsTableSql); |
| } |
| |
| conn.commit(); |
| |
| LOG.info("Metrics schema initialized."); |
| } catch (SQLException | InterruptedException sql) { |
| LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); |
| throw new MetricsSystemInitializationException( |
| "Error creating Metrics Schema in HBase using Phoenix.", sql); |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| protected void initPoliciesAndTTL() { |
| Admin hBaseAdmin = null; |
| try { |
| hBaseAdmin = dataSource.getHBaseAdmin(); |
| } catch (IOException e) { |
| LOG.warn("Unable to initialize HBaseAdmin for setting policies.", e); |
| } |
| |
| TableName[] tableNames = null; |
| if (hBaseAdmin != null) { |
| try { |
| tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, false); |
| } catch (IOException e) { |
| LOG.warn("Unable to get table names from HBaseAdmin for setting policies.", e); |
| return; |
| } |
| if (tableNames == null || tableNames.length == 0) { |
| LOG.warn("Unable to get table names from HBaseAdmin for setting policies."); |
| return; |
| } |
| for (String tableName : PHOENIX_TABLES) { |
| try { |
| boolean modifyTable = false; |
| Optional<TableName> tableNameOptional = Arrays.stream(tableNames) |
| .filter(t -> tableName.equals(t.getNameAsString())).findFirst(); |
| |
| TableDescriptor tableDescriptor = null; |
| if (tableNameOptional.isPresent()) { |
| tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get()); |
| } |
| |
| if (tableDescriptor == null) { |
| LOG.warn("Unable to get table descriptor for " + tableName); |
| continue; |
| } |
| |
| // @TableDescriptor is immutable by design |
| TableDescriptorBuilder tableDescriptorBuilder = |
| TableDescriptorBuilder.newBuilder(tableDescriptor); |
| |
| //Set normalizer preferences |
| boolean enableNormalizer = hbaseConf.getBoolean("hbase.normalizer.enabled", false); |
| if (enableNormalizer ^ tableDescriptor.isNormalizationEnabled()) { |
| tableDescriptorBuilder.setNormalizationEnabled(enableNormalizer); |
| LOG.info("Normalizer set to " + enableNormalizer + " for " + tableName); |
| modifyTable = true; |
| } |
| |
| //Set durability preferences |
| boolean durabilitySettingsModified = setDurabilityForTable(tableName, tableDescriptorBuilder); |
| modifyTable = modifyTable || durabilitySettingsModified; |
| |
| //Set compaction policy preferences |
| boolean compactionPolicyModified = false; |
| compactionPolicyModified = setCompactionPolicyForTable(tableName, tableDescriptorBuilder); |
| modifyTable = modifyTable || compactionPolicyModified; |
| |
| // Change TTL setting to match user configuration |
| ColumnFamilyDescriptor[] columnFamilyDescriptors = tableDescriptor.getColumnFamilies(); |
| if (columnFamilyDescriptors != null) { |
| for (ColumnFamilyDescriptor familyDescriptor : columnFamilyDescriptors) { |
| int ttlValue = familyDescriptor.getTimeToLive(); |
| if (ttlValue != tableTTL.get(tableName)) { |
| ColumnFamilyDescriptorBuilder familyDescriptorBuilder = |
| ColumnFamilyDescriptorBuilder.newBuilder(familyDescriptor); |
| |
| familyDescriptorBuilder.setTimeToLive(tableTTL.get(tableName)); |
| |
| LOG.info("Setting TTL on table: " + tableName + " to : " + |
| tableTTL.get(tableName) + " seconds."); |
| |
| hBaseAdmin.modifyColumnFamily(tableNameOptional.get(), familyDescriptorBuilder.build()); |
| // modifyTable = true; |
| } |
| } |
| } |
| |
| // Persist only if anything changed |
| if (modifyTable) { |
| hBaseAdmin.modifyTable(tableNameOptional.get(), tableDescriptorBuilder.build()); |
| } |
| |
| } catch (IOException e) { |
| LOG.error("Failed setting policies for " + tableName, e); |
| } |
| } |
| try { |
| hBaseAdmin.close(); |
| } catch (IOException e) { |
| LOG.warn("Exception on HBaseAdmin close.", e); |
| } |
| } |
| } |
| |
| private boolean setDurabilityForTable(String tableName, TableDescriptorBuilder tableDescriptor) { |
| |
| boolean modifyTable = false; |
| |
| if (METRIC_TRANSIENT_TABLE_NAME.equalsIgnoreCase(tableName)) { |
| tableDescriptor.setDurability(Durability.SKIP_WAL); |
| modifyTable = true; |
| } else if (METRICS_RECORD_TABLE_NAME.equals(tableName)) { |
| if (!timelineMetricsPrecisionTableDurability.isEmpty()) { |
| LOG.info("Setting WAL option " + timelineMetricsPrecisionTableDurability + " for table : " + tableName); |
| boolean validDurability = true; |
| if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) { |
| tableDescriptor.setDurability(Durability.SKIP_WAL); |
| } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) { |
| tableDescriptor.setDurability(Durability.SYNC_WAL); |
| } else if ("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) { |
| tableDescriptor.setDurability(Durability.ASYNC_WAL); |
| } else if ("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) { |
| tableDescriptor.setDurability(Durability.FSYNC_WAL); |
| } else { |
| LOG.info("Unknown value for " + TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " + timelineMetricsPrecisionTableDurability); |
| validDurability = false; |
| } |
| if (validDurability) { |
| modifyTable = true; |
| } |
| } |
| } else { |
| if (!timelineMetricsTablesDurability.isEmpty()) { |
| LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " for table : " + tableName); |
| boolean validDurability = true; |
| if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) { |
| tableDescriptor.setDurability(Durability.SKIP_WAL); |
| } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) { |
| tableDescriptor.setDurability(Durability.SYNC_WAL); |
| } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) { |
| tableDescriptor.setDurability(Durability.ASYNC_WAL); |
| } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) { |
| tableDescriptor.setDurability(Durability.FSYNC_WAL); |
| } else { |
| LOG.info("Unknown value for " + TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : " + timelineMetricsTablesDurability); |
| validDurability = false; |
| } |
| if (validDurability) { |
| modifyTable = true; |
| } |
| } |
| } |
| return modifyTable; |
| } |
| |
| private boolean setCompactionPolicyForTable(String tableName, TableDescriptorBuilder tableDescriptorBuilder) { |
| |
| boolean modifyTable = false; |
| |
| String compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY, |
| HSTORE_ENGINE_CLASS); |
| String compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS, |
| DATE_TIERED_COMPACTION_POLICY); |
| int blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES, 60); |
| |
| if (tableName.equals(METRICS_RECORD_TABLE_NAME) || tableName.equalsIgnoreCase(METRIC_TRANSIENT_TABLE_NAME)) { |
| compactionPolicyKey = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY, |
| HSTORE_COMPACTION_CLASS_KEY); |
| compactionPolicyClass = metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS, |
| FIFO_COMPACTION_POLICY_CLASS); |
| blockingStoreFiles = hbaseConf.getInt(TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES, 1000); |
| } |
| |
| if (StringUtils.isEmpty(compactionPolicyKey) || StringUtils.isEmpty(compactionPolicyClass)) { |
| // Default blockingStoreFiles = 300 |
| modifyTable = setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, 300); |
| } else { |
| tableDescriptorBuilder.setValue(compactionPolicyKey, compactionPolicyClass); |
| tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes()); |
| tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes()); |
| setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, blockingStoreFiles); |
| modifyTable = true; |
| } |
| |
| return modifyTable; |
| } |
| |
| private boolean setHbaseBlockingStoreFiles(TableDescriptorBuilder tableDescriptor, String tableName, int value) { |
| int blockingStoreFiles = hbaseConf.getInt(HBASE_BLOCKING_STORE_FILES, value); |
| if (blockingStoreFiles != value) { |
| blockingStoreFiles = value; |
| tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value)); |
| LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY + |
| " = " + blockingStoreFiles + " for " + tableName); |
| return true; |
| } |
| return false; |
| } |
| |
| protected String getSplitPointsStr(String splitPoints) { |
| if (StringUtils.isEmpty(splitPoints.trim())) { |
| return ""; |
| } |
| String[] points = splitPoints.split(","); |
| if (points.length > 0) { |
| StringBuilder sb = new StringBuilder(" SPLIT ON "); |
| sb.append("("); |
| for (String point : points) { |
| sb.append("'"); |
| sb.append(point.trim()); |
| sb.append("'"); |
| sb.append(","); |
| } |
| sb.deleteCharAt(sb.length() - 1); |
| sb.append(")"); |
| return sb.toString(); |
| } |
| return ""; |
| } |
| |
| /** |
| * Insert precision YARN container data. |
| */ |
| public void insertContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException { |
| Connection conn = getConnection(); |
| PreparedStatement metricRecordStmt = null; |
| |
| try { |
| metricRecordStmt = conn.prepareStatement( |
| String.format(UPSERT_CONTAINER_METRICS_SQL, CONTAINER_METRICS_TABLE_NAME)); |
| for (ContainerMetric metric : metrics) { |
| metricRecordStmt.clearParameters(); |
| metricRecordStmt.setString(1, ContainerId.fromString(metric.getContainerId()) |
| .getApplicationAttemptId().getApplicationId().toString()); |
| metricRecordStmt.setString(2, metric.getContainerId()); |
| metricRecordStmt.setTimestamp(3, new Timestamp(metric.getStartTime())); |
| metricRecordStmt.setTimestamp(4, new Timestamp(metric.getFinishTime())); |
| metricRecordStmt.setLong(5, metric.getFinishTime() - metric.getStartTime()); |
| metricRecordStmt.setString(6, metric.getHostName()); |
| metricRecordStmt.setInt(7, metric.getExitCode()); |
| metricRecordStmt.setLong(8, metric.getLocalizationDuration()); |
| metricRecordStmt.setLong(9, metric.getLaunchDuration()); |
| metricRecordStmt.setDouble(10, (double) metric.getPmemLimit() / 1024); |
| metricRecordStmt.setDouble(11, |
| ((double) metric.getPmemLimit() / 1024) * (metric.getFinishTime() |
| - metric.getStartTime())); |
| metricRecordStmt.setDouble(12, (double) metric.getVmemLimit() / 1024); |
| metricRecordStmt.setDouble(13, (double) metric.getPmemUsedMin() / 1024); |
| metricRecordStmt.setDouble(14, (double) metric.getPmemUsedMax() / 1024); |
| metricRecordStmt.setDouble(15, (double) metric.getPmemUsedAvg() / 1024); |
| metricRecordStmt.setDouble(16, (double) metric.getPmem50Pct() / 1024); |
| metricRecordStmt.setDouble(17, (double) metric.getPmem75Pct() / 1024); |
| metricRecordStmt.setDouble(18, (double) metric.getPmem90Pct() / 1024); |
| metricRecordStmt.setDouble(19, (double) metric.getPmem95Pct()/ 1024); |
| metricRecordStmt.setDouble(20, (double) metric.getPmem99Pct() / 1024); |
| metricRecordStmt.setDouble(21, (double) metric.getPmemLimit() / 1024 |
| - (double) metric.getPmemUsedMax() / 1024); |
| metricRecordStmt.setDouble(22, ((double) metric.getPmemLimit() / 1024 |
| - (double) metric.getPmemUsedMax() / 1024) * (metric.getFinishTime() |
| - metric.getStartTime())); |
| |
| try { |
| metricRecordStmt.executeUpdate(); |
| } catch (SQLException sql) { |
| LOG.error("Failed on insert records to store.", sql); |
| } |
| } |
| |
| conn.commit(); |
| } finally { |
| if (metricRecordStmt != null) { |
| try { |
| metricRecordStmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| /** |
| * Insert precision data. |
| */ |
| public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager, |
| TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException { |
| List<TimelineMetric> timelineMetrics = metrics.getMetrics(); |
| if (timelineMetrics == null || timelineMetrics.isEmpty()) { |
| LOG.debug("Empty metrics insert request."); |
| return; |
| } |
| for (Iterator<TimelineMetric> iterator = timelineMetrics.iterator(); iterator.hasNext();) { |
| |
| TimelineMetric tm = iterator.next(); |
| |
| boolean acceptMetric = TimelineMetricsFilter.acceptMetric(tm); |
| |
| // Write to metadata cache on successful write to store |
| if (metadataManager != null) { |
| metadataManager.putIfModifiedTimelineMetricMetadata( |
| metadataManager.createTimelineMetricMetadata(tm, acceptMetric)); |
| |
| metadataManager.putIfModifiedHostedAppsMetadata( |
| tm.getHostName(), tm.getAppId()); |
| |
| if (!tm.getAppId().equals("FLUME_HANDLER")) { |
| metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName()); |
| } |
| } |
| if (!acceptMetric) { |
| iterator.remove(); |
| } |
| } |
| |
| if (!skipCache && cacheEnabled) { |
| LOG.debug("Adding metrics to cache"); |
| if (insertCache.size() >= cacheSize) { |
| commitMetricsFromCache(); |
| } |
| try { |
| insertCache.put(metrics); // blocked while the queue is full |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } else { |
| LOG.debug("Skipping metrics cache"); |
| commitMetrics(metrics); |
| } |
| } |
| |
| public void insertMetricRecords(TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException { |
| insertMetricRecordsWithMetadata(null, metrics, skipCache); |
| } |
| |
| public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException { |
| insertMetricRecords(metrics, false); |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| public TimelineMetrics getMetricRecords( |
| final Condition condition, Multimap<String, List<Function>> metricFunctions) |
| throws SQLException, IOException { |
| |
| validateConditionIsNotEmpty(condition); |
| |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| TimelineMetrics metrics = new TimelineMetrics(); |
| |
| try { |
| //get latest |
| if (condition.isPointInTime()){ |
| getLatestMetricRecords(condition, conn, metrics); |
| } else { |
| if (condition.getEndTime() >= condition.getStartTime()) { |
| |
| if (CollectionUtils.isNotEmpty(condition.getUuids())) { |
| stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| appendMetricFromResultSet(metrics, condition, metricFunctions, rs); |
| } |
| } |
| |
| if (CollectionUtils.isNotEmpty(condition.getTransientMetricNames())) { |
| stmt = PhoenixTransactSQL.prepareTransientMetricsSqlStmt(conn, condition); |
| if (stmt != null) { |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| TransientMetricReadHelper.appendMetricFromResultSet(metrics, condition, metricFunctions, rs); |
| } |
| } |
| } |
| } else { |
| LOG.warn("Skipping metrics query because endTime < startTime"); |
| } |
| } |
| |
| } catch (PhoenixIOException pioe) { |
| Throwable pioe2 = pioe.getCause(); |
| // Need to find out if this is exception "Could not find hash cache |
| // for joinId" or another PhoenixIOException |
| if (pioe2 instanceof PhoenixIOException && |
| pioe2.getCause() instanceof DoNotRetryIOException) { |
| String className = null; |
| for (StackTraceElement ste : pioe2.getCause().getStackTrace()) { |
| className = ste.getClassName(); |
| } |
| |
| if (className != null && className.equals("HashJoinRegionScanner")) { |
| LOG.error("The cache might have expired and have been removed. Try to" + |
| " increase the cache size by setting bigger value for " + |
| "phoenix.coprocessor.maxMetaDataCacheSize in ams-hbase-site config." + |
| " Falling back to sort-merge join algorithm."); |
| PhoenixTransactSQL.setSortMergeJoinEnabled(true); |
| } |
| } |
| throw pioe; |
| } catch (RuntimeException ex) { |
| // We need to find out if this is a real IO exception |
| // or exception "maxStamp is smaller than minStamp" |
| // which is thrown in hbase TimeRange.java |
| Throwable io = ex.getCause(); |
| String className = null; |
| if (io != null) { |
| for (StackTraceElement ste : io.getStackTrace()) { |
| className = ste.getClassName(); |
| } |
| } |
| if (className != null && className.equals("TimeRange")) { |
| // This is "maxStamp is smaller than minStamp" exception |
| // Log error and return empty metrics |
| LOG.debug(io); |
| return new TimelineMetrics(); |
| } else { |
| throw ex; |
| } |
| |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| LOG.debug("Metrics records size: " + metrics.getMetrics().size()); |
| return metrics; |
| } |
| |
| /** |
| * Apply aggregate function to the result if supplied else get precision |
| * or aggregate data with default function applied. |
| */ |
| private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition, |
| Multimap<String, List<Function>> metricFunctions, |
| ResultSet rs) throws SQLException, IOException { |
| byte[] uuid = rs.getBytes("UUID"); |
| String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); |
| Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); |
| |
| for (List<Function> functions : functionList) { |
| // Apply aggregation function if present |
| if ((functions != null && !functions.isEmpty())) { |
| if (functions.size() > 1) { |
| throw new IllegalArgumentException("Multiple aggregate functions not supported."); |
| } |
| for (Function f : functions) { |
| if (f.getReadFunction() == Function.ReadFunction.VALUE) { |
| getTimelineMetricsFromResultSet(metrics, f, condition, rs); |
| } else { |
| SingleValuedTimelineMetric metric = |
| TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f); |
| |
| if (condition.isGrouped()) { |
| metrics.addOrMergeTimelineMetric(metric); |
| } else { |
| metrics.getMetrics().add(metric.getTimelineMetric()); |
| } |
| } |
| } |
| } else { |
| // No aggregation requested |
| // Execution never goes here, function always contain at least 1 element |
| getTimelineMetricsFromResultSet(metrics, null, condition, rs); |
| } |
| } |
| } |
| |
| private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, Function f, Condition condition, ResultSet rs) |
| throws SQLException, IOException { |
| if (condition.getPrecision().equals(Precision.SECONDS)) { |
| TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); |
| if (f != null && f.getSuffix() != null) { //Case : Requesting "._rate" for precision data |
| metric.setMetricName(metric.getMetricName() + f.getSuffix()); |
| } |
| if (condition.isGrouped()) { |
| metrics.addOrMergeTimelineMetric(metric); |
| } else { |
| metrics.getMetrics().add(metric); |
| } |
| |
| } else { |
| SingleValuedTimelineMetric metric = |
| TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f); |
| if (condition.isGrouped()) { |
| metrics.addOrMergeTimelineMetric(metric); |
| } else { |
| metrics.getMetrics().add(metric.getTimelineMetric()); |
| } |
| } |
| } |
| |
| private void getLatestMetricRecords(Condition condition, Connection conn, |
| TimelineMetrics metrics) throws SQLException, IOException { |
| |
| validateConditionIsNotEmpty(condition); |
| |
| PreparedStatement stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition); |
| ResultSet rs = null; |
| try { |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| TimelineMetric metric = getLastTimelineMetricFromResultSet(rs); |
| metrics.getMetrics().add(metric); |
| } |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| stmt.close(); |
| } |
| } |
| } |
| |
| /** |
| * Get metrics aggregated across hosts. |
| * |
| * @param condition @Condition |
| * @return @TimelineMetrics |
| * @throws SQLException |
| */ |
| public TimelineMetrics getAggregateMetricRecords(final Condition condition, |
| Multimap<String, List<Function>> metricFunctions) throws SQLException, IOException { |
| |
| validateConditionIsNotEmpty(condition); |
| |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| TimelineMetrics metrics = new TimelineMetrics(); |
| |
| try { |
| //get latest |
| if(condition.isPointInTime()) { |
| getLatestAggregateMetricRecords(condition, conn, metrics, metricFunctions); |
| } else { |
| |
| if (CollectionUtils.isNotEmpty(condition.getUuids())) { |
| stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| appendAggregateMetricFromResultSet(metrics, condition, metricFunctions, rs); |
| } |
| } |
| |
| if (CollectionUtils.isNotEmpty(condition.getTransientMetricNames())) { |
| stmt = PhoenixTransactSQL.prepareTransientMetricsSqlStmt(conn, condition); |
| if (stmt != null) { |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| TransientMetricReadHelper.appendMetricFromResultSet(metrics, condition, metricFunctions, rs); |
| } |
| } |
| } |
| } |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| LOG.debug("Aggregate records size: " + metrics.getMetrics().size()); |
| return metrics; |
| } |
| |
| private void appendAggregateMetricFromResultSet(TimelineMetrics metrics, |
| Condition condition, Multimap<String, List<Function>> metricFunctions, |
| ResultSet rs) throws SQLException { |
| |
| byte[] uuid = rs.getBytes("UUID"); |
| String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); |
| Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); |
| |
| for (List<Function> functions : functionList) { |
| for (Function aggregateFunction : functions) { |
| SingleValuedTimelineMetric metric; |
| |
| if (condition.getPrecision() == Precision.MINUTES |
| || condition.getPrecision() == Precision.HOURS |
| || condition.getPrecision() == Precision.DAYS) { |
| metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false); |
| } else { |
| metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true); |
| } |
| |
| if (condition.isGrouped()) { |
| metrics.addOrMergeTimelineMetric(metric); |
| } else { |
| metrics.getMetrics().add(metric.getTimelineMetric()); |
| } |
| } |
| } |
| |
| } |
| |
| private void getLatestAggregateMetricRecords(Condition condition, |
| Connection conn, TimelineMetrics metrics, |
| Multimap<String, List<Function>> metricFunctions) throws SQLException { |
| |
| PreparedStatement stmt = null; |
| SplitByMetricNamesCondition splitCondition = |
| new SplitByMetricNamesCondition(condition); |
| |
| for (byte[] uuid: condition.getUuids()) { |
| |
| splitCondition.setCurrentUuid(uuid); |
| stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, splitCondition); |
| ResultSet rs = null; |
| try { |
| rs = stmt.executeQuery(); |
| while (rs.next()) { |
| String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); |
| Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); |
| for (List<Function> functions : functionList) { |
| if (functions != null) { |
| for (Function f : functions) { |
| SingleValuedTimelineMetric metric = |
| getAggregateTimelineMetricFromResultSet(rs, f, true); |
| |
| if (condition.isGrouped()) { |
| metrics.addOrMergeTimelineMetric(metric); |
| } else { |
| metrics.getMetrics().add(metric.getTimelineMetric()); |
| } |
| } |
| } else { |
| SingleValuedTimelineMetric metric = |
| getAggregateTimelineMetricFromResultSet(rs, new Function(), true); |
| metrics.getMetrics().add(metric.getTimelineMetric()); |
| } |
| } |
| } |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| stmt.close(); |
| } |
| } |
| } |
| } |
| |
| private SingleValuedTimelineMetric getAggregateTimelineMetricFromResultSet(ResultSet rs, |
| Function f, boolean useHostCount) throws SQLException { |
| |
| String countColumnName = "METRIC_COUNT"; |
| if (useHostCount) { |
| countColumnName = "HOSTS_COUNT"; |
| } |
| |
| byte[] uuid = rs.getBytes("UUID"); |
| TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); |
| |
| SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( |
| timelineMetric.getMetricName() + f.getSuffix(), |
| timelineMetric.getAppId(), |
| timelineMetric.getInstanceId(), |
| null, |
| rs.getLong("SERVER_TIME") |
| ); |
| |
| double value; |
| switch(f.getReadFunction()){ |
| case AVG: |
| value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName); |
| break; |
| case MIN: |
| value = rs.getDouble("METRIC_MIN"); |
| break; |
| case MAX: |
| value = rs.getDouble("METRIC_MAX"); |
| break; |
| case SUM: |
| value = rs.getDouble("METRIC_SUM"); |
| break; |
| default: |
| value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName); |
| break; |
| } |
| |
| metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); |
| |
| return metric; |
| } |
| |
| private void validateConditionIsNotEmpty(Condition condition) { |
| if (condition.isEmpty()) { |
| throw new IllegalArgumentException("No filter criteria specified."); |
| } |
| } |
| |
| public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap, |
| String phoenixTableName) throws SQLException { |
| |
| if (hostAggregateMap == null || hostAggregateMap.isEmpty()) { |
| LOG.debug("Empty aggregate records."); |
| return; |
| } |
| |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| |
| long start = System.currentTimeMillis(); |
| int rowCount = 0; |
| |
| try { |
| stmt = conn.prepareStatement( |
| String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); |
| |
| for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : |
| hostAggregateMap.entrySet()) { |
| |
| TimelineMetric metric = metricAggregate.getKey(); |
| MetricHostAggregate hostAggregate = metricAggregate.getValue(); |
| |
| byte[] uuid = metadataManagerInstance.getUuid(metric, false); |
| if (uuid == null) { |
| LOG.error("Error computing UUID for metric. Cannot write aggregate metric : " + metric.toString()); |
| continue; |
| } |
| rowCount++; |
| stmt.clearParameters(); |
| stmt.setBytes(1, uuid); |
| stmt.setLong(2, metric.getStartTime()); |
| stmt.setDouble(3, hostAggregate.getSum()); |
| stmt.setDouble(4, hostAggregate.getMax()); |
| stmt.setDouble(5, hostAggregate.getMin()); |
| stmt.setDouble(6, hostAggregate.getNumberOfSamples()); |
| |
| try { |
| stmt.executeUpdate(); |
| } catch (SQLException sql) { |
| LOG.error(sql); |
| } |
| |
| if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { |
| conn.commit(); |
| rowCount = 0; |
| } |
| |
| } |
| |
| conn.commit(); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| long end = System.currentTimeMillis(); |
| |
| if ((end - start) > 60000l) { |
| LOG.info("Time to save map: " + (end - start) + ", " + |
| "thread = " + Thread.currentThread().getClass()); |
| } |
| if (aggregatorSink != null) { |
| try { |
| aggregatorSink.saveHostAggregateRecords(hostAggregateMap, |
| getTablePrecision(phoenixTableName)); |
| } catch (Exception e) { |
| LOG.warn( |
| "Error writing host aggregate records metrics to external sink. " |
| + e); |
| } |
| } |
| } |
| |
| /** |
| * Save Metric aggregate records. |
| * |
| * @throws SQLException |
| */ |
| public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records) |
| throws SQLException { |
| |
| if (records == null || records.isEmpty()) { |
| LOG.debug("Empty aggregate records."); |
| return; |
| } |
| |
| long start = System.currentTimeMillis(); |
| String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME); |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(sqlStr); |
| int rowCount = 0; |
| |
| for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> |
| aggregateEntry : records.entrySet()) { |
| TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); |
| MetricClusterAggregate aggregate = aggregateEntry.getValue(); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("clusterMetric = " + clusterMetric + ", " + |
| "aggregate = " + aggregate); |
| } |
| |
| rowCount++; |
| byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false); |
| if (uuid == null) { |
| LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString()); |
| continue; |
| } |
| stmt.clearParameters(); |
| stmt.setBytes(1, uuid); |
| stmt.setLong(2, clusterMetric.getTimestamp()); |
| stmt.setDouble(3, aggregate.getSum()); |
| stmt.setInt(4, aggregate.getNumberOfHosts()); |
| stmt.setDouble(5, aggregate.getMax()); |
| stmt.setDouble(6, aggregate.getMin()); |
| |
| try { |
| stmt.executeUpdate(); |
| } catch (SQLException sql) { |
| // we have no way to verify it works!!! |
| LOG.error(sql); |
| } |
| |
| if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { |
| conn.commit(); |
| rowCount = 0; |
| } |
| } |
| |
| conn.commit(); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| long end = System.currentTimeMillis(); |
| if ((end - start) > 60000l) { |
| LOG.info("Time to save: " + (end - start) + ", " + |
| "thread = " + Thread.currentThread().getName()); |
| } |
| if (aggregatorSink != null) { |
| try { |
| aggregatorSink.saveClusterAggregateRecords(records); |
| } catch (Exception e) { |
| LOG.warn("Error writing cluster aggregate records metrics to external sink. ", e); |
| } |
| } |
| } |
| |
| |
| /** |
| * Save Metric aggregate records. |
| * |
| * @throws SQLException |
| */ |
| public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records, |
| String tableName) throws SQLException { |
| if (records == null || records.isEmpty()) { |
| LOG.debug("Empty aggregate records."); |
| return; |
| } |
| |
| long start = System.currentTimeMillis(); |
| |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); |
| int rowCount = 0; |
| |
| for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) { |
| TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); |
| MetricHostAggregate aggregate = aggregateEntry.getValue(); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("clusterMetric = " + clusterMetric + ", " + |
| "aggregate = " + aggregate); |
| } |
| |
| byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false); |
| if (uuid == null) { |
| LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString()); |
| continue; |
| } |
| |
| rowCount++; |
| stmt.clearParameters(); |
| stmt.setBytes(1, uuid); |
| stmt.setLong(2, clusterMetric.getTimestamp()); |
| stmt.setDouble(3, aggregate.getSum()); |
| stmt.setLong(4, aggregate.getNumberOfSamples()); |
| stmt.setDouble(5, aggregate.getMax()); |
| stmt.setDouble(6, aggregate.getMin()); |
| |
| try { |
| stmt.executeUpdate(); |
| } catch (SQLException sql) { |
| // we have no way to verify it works!!! |
| LOG.error(sql); |
| } |
| |
| if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { |
| conn.commit(); |
| rowCount = 0; |
| } |
| } |
| |
| conn.commit(); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| long end = System.currentTimeMillis(); |
| if ((end - start) > 60000l) { |
| LOG.info("Time to save: " + (end - start) + ", " + |
| "thread = " + Thread.currentThread().getName()); |
| } |
| if (aggregatorSink != null) { |
| try { |
| aggregatorSink.saveClusterTimeAggregateRecords(records, |
| getTablePrecision(tableName)); |
| } catch (Exception e) { |
| LOG.warn( |
| "Error writing cluster time aggregate records metrics to external sink. " |
| + e); |
| } |
| } |
| } |
| |
| /** |
| * Get precision for a table |
| * @param tableName |
| * @return precision |
| */ |
| private Precision getTablePrecision(String tableName) { |
| Precision tablePrecision = null; |
| switch (tableName) { |
| case METRICS_RECORD_TABLE_NAME: |
| tablePrecision = Precision.SECONDS; |
| break; |
| case METRICS_AGGREGATE_MINUTE_TABLE_NAME: |
| case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME: |
| tablePrecision = Precision.MINUTES; |
| break; |
| case METRICS_AGGREGATE_HOURLY_TABLE_NAME: |
| case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME: |
| tablePrecision = Precision.HOURS; |
| break; |
| case METRICS_AGGREGATE_DAILY_TABLE_NAME: |
| case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME: |
| tablePrecision = Precision.DAYS; |
| break; |
| } |
| return tablePrecision; |
| } |
| |
| /** |
| * Provide skip block cache hint for aggregator queries. |
| */ |
| public boolean isSkipBlockCacheForAggregatorsEnabled() { |
| return skipBlockCacheForAggregatorsEnabled; |
| } |
| |
| /** |
| * One time save of metadata when discovering topology during aggregation. |
| * @throws SQLException |
| */ |
| public void saveHostAppsMetadata(Map<String, TimelineMetricHostMetadata> hostMetadata) throws SQLException { |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL); |
| int rowCount = 0; |
| |
| for (Map.Entry<String, TimelineMetricHostMetadata> hostedAppsEntry : hostMetadata.entrySet()) { |
| TimelineMetricHostMetadata timelineMetricHostMetadata = hostedAppsEntry.getValue(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("HostedAppsMetadata: " + hostedAppsEntry); |
| } |
| |
| stmt.clearParameters(); |
| stmt.setString(1, hostedAppsEntry.getKey()); |
| stmt.setBytes(2, timelineMetricHostMetadata.getUuid()); |
| stmt.setString(3, StringUtils.join(timelineMetricHostMetadata.getHostedApps().keySet(), ",")); |
| try { |
| stmt.executeUpdate(); |
| rowCount++; |
| } catch (SQLException sql) { |
| LOG.error("Error saving hosted apps metadata.", sql); |
| } |
| } |
| |
| conn.commit(); |
| LOG.info("Saved " + rowCount + " hosted apps metadata records."); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| public void saveInstanceHostsMetadata(Map<String, Set<String>> instanceHostsMap) throws SQLException { |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| try { |
| stmt = conn.prepareStatement(UPSERT_INSTANCE_HOST_METADATA_SQL); |
| int rowCount = 0; |
| |
| for (Map.Entry<String, Set<String>> hostInstancesEntry : instanceHostsMap.entrySet()) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Host Instances Entry: " + hostInstancesEntry); |
| } |
| |
| String instanceId = hostInstancesEntry.getKey(); |
| |
| for(String hostname : hostInstancesEntry.getValue()) { |
| stmt.clearParameters(); |
| stmt.setString(1, instanceId); |
| stmt.setString(2, hostname); |
| try { |
| stmt.executeUpdate(); |
| rowCount++; |
| } catch (SQLException sql) { |
| LOG.error("Error saving host instances metadata.", sql); |
| } |
| } |
| |
| } |
| |
| conn.commit(); |
| LOG.info("Saved " + rowCount + " host instances metadata records."); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| /** |
| * Save metdata on updates. |
| * @param metricMetadata @Collection<@TimelineMetricMetadata> |
| * @throws SQLException |
| */ |
| public void saveMetricMetadata(Collection<TimelineMetricMetadata> metricMetadata) throws SQLException { |
| if (metricMetadata.isEmpty()) { |
| LOG.info("No metadata records to save."); |
| return; |
| } |
| |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| |
| try { |
| stmt = conn.prepareStatement(UPSERT_METADATA_SQL); |
| int rowCount = 0; |
| |
| for (TimelineMetricMetadata metadata : metricMetadata) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("TimelineMetricMetadata: metricName = " + metadata.getMetricName() |
| + ", appId = " + metadata.getAppId() |
| + ", seriesStartTime = " + metadata.getSeriesStartTime() |
| ); |
| } |
| try { |
| stmt.clearParameters(); |
| stmt.setString(1, metadata.getMetricName()); |
| stmt.setString(2, metadata.getAppId()); |
| stmt.setString(3, metadata.getInstanceId()); |
| stmt.setBytes(4, metadata.getUuid()); |
| stmt.setString(5, metadata.getUnits()); |
| stmt.setString(6, metadata.getType()); |
| stmt.setLong(7, metadata.getSeriesStartTime()); |
| stmt.setBoolean(8, metadata.isSupportsAggregates()); |
| stmt.setBoolean(9, metadata.isWhitelisted()); |
| } catch (Exception e) { |
| LOG.error("Exception in saving metric metadata entry. "); |
| continue; |
| } |
| |
| try { |
| stmt.executeUpdate(); |
| rowCount++; |
| } catch (SQLException sql) { |
| LOG.error("Error saving metadata.", sql); |
| } |
| } |
| |
| conn.commit(); |
| LOG.info("Saved " + rowCount + " metadata records."); |
| |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| } |
| |
| public Map<String, TimelineMetricHostMetadata> getHostedAppsMetadata() throws SQLException { |
| Map<String, TimelineMetricHostMetadata> hostedAppMap = new HashMap<>(); |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| stmt = conn.prepareStatement(GET_HOSTED_APPS_METADATA_SQL); |
| rs = stmt.executeQuery(); |
| |
| while (rs.next()) { |
| String appIds = rs.getString("APP_IDS"); |
| TimelineMetricHostMetadata hostMetadata = new TimelineMetricHostMetadata(new HashSet<>()); |
| if (StringUtils.isNotEmpty(appIds)) { |
| hostMetadata = new TimelineMetricHostMetadata(new HashSet<>(Arrays.asList(StringUtils.split(appIds, ",")))); |
| } |
| hostMetadata.setUuid(rs.getBytes("UUID")); |
| hostedAppMap.put(rs.getString("HOSTNAME"), hostMetadata); |
| } |
| |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| return hostedAppMap; |
| } |
| |
| public Map<String, Set<String>> getInstanceHostsMetdata() throws SQLException { |
| Map<String, Set<String>> instanceHostsMap = new HashMap<>(); |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| stmt = conn.prepareStatement(GET_INSTANCE_HOST_METADATA_SQL); |
| rs = stmt.executeQuery(); |
| |
| while (rs.next()) { |
| String instanceId = rs.getString("INSTANCE_ID"); |
| String hostname = rs.getString("HOSTNAME"); |
| |
| if (!instanceHostsMap.containsKey(instanceId)) { |
| instanceHostsMap.put(instanceId, new HashSet<String>()); |
| } |
| instanceHostsMap.get(instanceId).add(hostname); |
| } |
| |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| return instanceHostsMap; |
| } |
| |
| // No filter criteria support for now. |
| public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException { |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>(); |
| Connection conn = getConnection(); |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| |
| try { |
| stmt = conn.prepareStatement(GET_METRIC_METADATA_SQL); |
| rs = stmt.executeQuery(); |
| |
| while (rs.next()) { |
| String metricName = rs.getString("METRIC_NAME"); |
| String appId = rs.getString("APP_ID"); |
| String instanceId = rs.getString("INSTANCE_ID"); |
| TimelineMetricMetadata metadata = new TimelineMetricMetadata( |
| metricName, |
| appId, |
| instanceId, |
| rs.getString("UNITS"), |
| rs.getString("TYPE"), |
| rs.getLong("START_TIME"), |
| rs.getBoolean("SUPPORTS_AGGREGATION"), |
| rs.getBoolean("IS_WHITELISTED") |
| ); |
| |
| TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId, instanceId); |
| metadata.setIsPersisted(true); // Always true on retrieval |
| metadata.setUuid(rs.getBytes("UUID")); |
| metadataMap.put(key, metadata); |
| } |
| |
| } finally { |
| if (rs != null) { |
| try { |
| rs.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException sql) { |
| // Ignore |
| } |
| } |
| } |
| |
| return metadataMap; |
| } |
| |
| public void setMetadataInstance(TimelineMetricMetadataManager metadataManager) { |
| this.metadataManagerInstance = metadataManager; |
| TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance); |
| } |
| } |