| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ambari.metrics.core.timeline.discovery; |
| |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.URISyntaxException; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException; |
| import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; |
| import org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy; |
| import org.apache.ambari.metrics.core.timeline.uuid.MD5UuidGenStrategy; |
| import org.apache.ambari.metrics.core.timeline.uuid.Murmur3HashUuidGenStrategy; |
| import org.apache.ambari.metrics.core.timeline.uuid.TimelineMetricUuid; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang.ArrayUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.metrics2.sink.timeline.MetadataException; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; |
| import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; |
| import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; |
| |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY; |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; |
| import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; |
| import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns; |
| import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaRegexFromSqlRegex; |
| |
| public class TimelineMetricMetadataManager { |
| private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class); |
| // Cache all metadata on retrieval |
| private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); |
| private final Map<TimelineMetricUuid, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>(); |
| // Map to lookup apps on a host |
| private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); |
| private final Map<TimelineMetricUuid, String> uuidHostMap = new ConcurrentHashMap<>(); |
| private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>(); |
| // Sync only when needed |
| AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); |
| AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false); |
| |
| private MetricUuidGenStrategy uuidGenStrategy = new Murmur3HashUuidGenStrategy(); |
| public static final int TIMELINE_METRIC_UUID_LENGTH = 16; |
| public static int HOSTNAME_UUID_LENGTH = 4; |
| |
| //Transient metric patterns. No UUID management and aggregation for such metrics. |
| private List<String> transientMetricPatterns = new ArrayList<>(); |
| |
| // Single thread to sync back new writes to the store |
| private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); |
| |
| private PhoenixHBaseAccessor hBaseAccessor; |
| private Configuration metricsConf; |
| |
| TimelineMetricMetadataSync metricMetadataSync; |
| // Filter metrics names matching given patterns, from metadata |
| private final List<String> metricNameFilters = new ArrayList<>(); |
| |
| // Test friendly construction since mock instrumentation is difficult to get |
| // working with hadoop mini cluster |
| public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) { |
| this.metricsConf = metricsConf; |
| this.hBaseAccessor = hBaseAccessor; |
| String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS); |
| if (!StringUtils.isEmpty(patternStrings)) { |
| metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); |
| } |
| |
| uuidGenStrategy = getUuidStrategy(metricsConf); |
| |
| String transientMetricPatternsString = metricsConf.get(TRANSIENT_METRIC_PATTERNS, StringUtils.EMPTY); |
| if (StringUtils.isNotEmpty(transientMetricPatternsString)) { |
| LOG.info("Skipping UUID for patterns : " + transientMetricPatternsString); |
| transientMetricPatterns.addAll(getJavaMetricPatterns(transientMetricPatternsString)); |
| } |
| } |
| |
| public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException { |
| this(TimelineMetricConfiguration.getInstance().getMetricsConf(), hBaseAccessor); |
| } |
| |
| /** |
| * Initialize Metadata from the store |
| */ |
| public void initializeMetadata() { |
| initializeMetadata(true); |
| } |
| |
| /** |
| * Initialize Metadata from the store |
| */ |
| public void initializeMetadata(boolean scheduleMetadateSync) { |
| |
| //Create metadata schema |
| Connection conn = null; |
| Statement stmt = null; |
| |
| String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); |
| String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); |
| |
| try { |
| LOG.info("Initializing metrics metadata schema..."); |
| conn = hBaseAccessor.getConnectionRetryingOnException(); |
| stmt = conn.createStatement(); |
| |
| // Metadata |
| String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(metadataSql); |
| |
| String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(hostedAppSql); |
| |
| //Host Instances table |
| String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL, |
| encoding, compression); |
| stmt.executeUpdate(hostedInstancesSql); |
| } catch (SQLException | InterruptedException sql) { |
| LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); |
| throw new MetricsSystemInitializationException( |
| "Error creating Metrics Metadata Schema in HBase using Phoenix.", sql); |
| } finally { |
| if (stmt != null) { |
| try { |
| stmt.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException e) { |
| // Ignore |
| } |
| } |
| } |
| |
| metricMetadataSync = new TimelineMetricMetadataSync(this); |
| // Schedule the executor to sync to store |
| if (scheduleMetadateSync) { |
| executorService.scheduleWithFixedDelay(metricMetadataSync, |
| metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes |
| metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes |
| TimeUnit.SECONDS); |
| } |
| // Read from store and initialize map |
| try { |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataFromStore(); |
| |
| LOG.info("Retrieved " + metadata.size() + ", metadata objects from store."); |
| // Store in the cache |
| METADATA_CACHE.putAll(metadata); |
| |
| Map<String, TimelineMetricHostMetadata> hostedAppData = getHostedAppsFromStore(); |
| |
| LOG.info("Retrieved " + hostedAppData.size() + " host objects from store."); |
| HOSTED_APPS_MAP.putAll(hostedAppData); |
| |
| loadUuidMapsOnInit(); |
| |
| hBaseAccessor.setMetadataInstance(this); |
| } catch (SQLException e) { |
| LOG.warn("Exception loading metric metadata", e); |
| } |
| } |
| |
| public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataCache() { |
| return METADATA_CACHE; |
| } |
| |
| public TimelineMetricMetadata getMetadataCacheValue(TimelineMetricMetadataKey key) { |
| return METADATA_CACHE.get(key); |
| } |
| |
| public Map<String, TimelineMetricHostMetadata> getHostedAppsCache() { |
| return HOSTED_APPS_MAP; |
| } |
| |
| public Map<String, Set<String>> getHostedInstanceCache() { |
| return INSTANCE_HOST_MAP; |
| } |
| |
| public boolean syncHostedAppsMetadata() { |
| return SYNC_HOSTED_APPS_METADATA.get(); |
| } |
| |
| public boolean syncHostedInstanceMetadata() { |
| return SYNC_HOSTED_INSTANCES_METADATA.get(); |
| } |
| |
| public void markSuccessOnSyncHostedAppsMetadata() { |
| SYNC_HOSTED_APPS_METADATA.set(false); |
| } |
| |
| public void markSuccessOnSyncHostedInstanceMetadata() { |
| SYNC_HOSTED_INSTANCES_METADATA.set(false); |
| } |
| |
| /** |
| * Test metric name for valid patterns and return true/false |
| */ |
| boolean skipMetadataCache(String metricName) { |
| for (String pattern : metricNameFilters) { |
| if (metricName.contains(pattern)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Update value in metadata cache |
| * @param metadata @TimelineMetricMetadata |
| */ |
| public void putIfModifiedTimelineMetricMetadata(TimelineMetricMetadata metadata) { |
| if (skipMetadataCache(metadata.getMetricName())) { |
| return; |
| } |
| |
| TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( |
| metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId()); |
| |
| TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key); |
| |
| if (metadataFromCache != null) { |
| try { |
| if (metadataFromCache.needsToBeSynced(metadata)) { |
| metadata.setIsPersisted(false); // Set the flag to ensure sync to store on next run |
| METADATA_CACHE.put(key, metadata); |
| } |
| } catch (MetadataException e) { |
| LOG.warn("Error inserting Metadata in cache.", e); |
| } |
| |
| } else { |
| METADATA_CACHE.put(key, metadata); |
| } |
| } |
| |
| /** |
| * Update value in hosted apps cache |
| * @param hostname Host name |
| * @param appId Application Id |
| */ |
| public void putIfModifiedHostedAppsMetadata(String hostname, String appId) { |
| TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); |
| ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null; |
| if (apps == null) { |
| apps = new ConcurrentHashMap<>(); |
| if (timelineMetricHostMetadata == null) { |
| TimelineMetricHostMetadata newHostMetadata = new TimelineMetricHostMetadata(apps); |
| newHostMetadata.setUuid(getUuidForHostname(hostname, true)); |
| HOSTED_APPS_MAP.put(hostname, newHostMetadata); |
| } else { |
| HOSTED_APPS_MAP.get(hostname).setHostedApps(apps); |
| } |
| } |
| |
| if (!apps.containsKey(appId)) { |
| apps.put(appId, appId); |
| SYNC_HOSTED_APPS_METADATA.set(true); |
| } |
| } |
| |
| public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) { |
| if (StringUtils.isEmpty(instanceId)) { |
| return; |
| } |
| |
| Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId); |
| if (hosts == null) { |
| hosts = new HashSet<>(); |
| INSTANCE_HOST_MAP.put(instanceId, hosts); |
| } |
| |
| if (!hosts.contains(hostname)) { |
| hosts.add(hostname); |
| SYNC_HOSTED_INSTANCES_METADATA.set(true); |
| } |
| } |
| |
| public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException { |
| hBaseAccessor.saveMetricMetadata(metadata); |
| } |
| |
| public void persistHostedAppsMetadata(Map<String, TimelineMetricHostMetadata> hostedApps) throws SQLException { |
| hBaseAccessor.saveHostAppsMetadata(hostedApps); |
| } |
| |
| public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException { |
| hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata); |
| } |
| |
| public TimelineMetricMetadata createTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) { |
| TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata( |
| timelineMetric.getMetricName(), |
| timelineMetric.getAppId(), |
| timelineMetric.getInstanceId(), |
| timelineMetric.getUnits(), |
| timelineMetric.getType(), |
| timelineMetric.getStartTime(), |
| supportAggregates(timelineMetric), |
| isWhitelisted |
| ); |
| |
| //Set UUID for metadata on the write path. Do not pass in hostname here since we only want Metric metadata, not host metadata. |
| if (!isTransientMetric(timelineMetric.getMetricName(), timelineMetric.getAppId())) { |
| byte[] uuid = getUuid(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), null, true); |
| timelineMetricMetadata.setUuid(uuid); |
| } |
| return timelineMetricMetadata; |
| } |
| |
| boolean isDistributedModeEnabled() { |
| return metricsConf.get("timeline.metrics.service.operation.mode").equals("distributed"); |
| } |
| |
| /** |
| * Fetch metrics metadata from store from V1 table (no UUID) |
| * @throws SQLException |
| */ |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataFromStoreV1() throws SQLException { |
| return hBaseAccessor.getTimelineMetricMetadataV1(); |
| } |
| |
| /** |
| * Fetch metrics metadata from store |
| * @throws SQLException |
| */ |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataFromStore() throws SQLException { |
| return hBaseAccessor.getTimelineMetricMetadata(); |
| } |
| |
| /** |
| * Fetch hosted apps from store |
| * @throws SQLException |
| */ |
| Map<String, TimelineMetricHostMetadata> getHostedAppsFromStore() throws SQLException { |
| return hBaseAccessor.getHostedAppsMetadata(); |
| } |
| |
| Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException { |
| return hBaseAccessor.getInstanceHostsMetdata(); |
| } |
| |
| private boolean supportAggregates(TimelineMetric metric) { |
| return MapUtils.isEmpty(metric.getMetadata()) || |
| !(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation"))); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| // UUID Management |
| ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Load the UUID mappings from the UUID table on startup. |
| */ |
| private void loadUuidMapsOnInit() { |
| |
| for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { |
| TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); |
| if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) { |
| uuidKeyMap.put(new TimelineMetricUuid(timelineMetricMetadata.getUuid()), key); |
| } |
| } |
| |
| for (String host : HOSTED_APPS_MAP.keySet()) { |
| TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host); |
| if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) { |
| uuidHostMap.put(new TimelineMetricUuid(timelineMetricHostMetadata.getUuid()), host); |
| } |
| } |
| } |
| |
| /** |
| * Returns the UUID gen strategy. |
| * @param configuration the config |
| * @return the UUID generator of type org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy |
| */ |
| private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) { |
| return new Murmur3HashUuidGenStrategy(); |
| } |
| |
| /** |
| * Given the hostname, generates a byte array of length 'HOSTNAME_UUID_LENGTH' |
| * @param hostname the hostname |
| * @param createIfNotPresent Generate UUID if not present. |
| * @return uuid byte array of length 'HOSTNAME_UUID_LENGTH' |
| */ |
| private byte[] getUuidForHostname(String hostname, boolean createIfNotPresent) { |
| |
| TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); |
| if (timelineMetricHostMetadata != null) { |
| byte[] uuid = timelineMetricHostMetadata.getUuid(); |
| if (uuid != null) { |
| return uuid; |
| } |
| } |
| |
| if (!createIfNotPresent) { |
| LOG.debug("UUID not found for " + hostname + ", createIfNotPresent is false"); |
| return null; |
| } |
| |
| byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH); |
| if (uuidHostMap.containsKey(new TimelineMetricUuid(uuid))) { |
| LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuid)); |
| return null; |
| } |
| |
| if (timelineMetricHostMetadata == null) { |
| timelineMetricHostMetadata = new TimelineMetricHostMetadata(); |
| HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata); |
| } |
| timelineMetricHostMetadata.setUuid(uuid); |
| uuidHostMap.put(new TimelineMetricUuid(uuid), hostname); |
| |
| return uuid; |
| } |
| |
| /** |
| * Given a timelineClusterMetric instance, generates a UUID for Metric-App-Instance combination. |
| * @param timelineClusterMetric The timeline cluster metric for which the UUID needs to be generated. |
| * @param createIfNotPresent Generate UUID if not present. |
| * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' |
| */ |
| public byte[] getUuid(TimelineClusterMetric timelineClusterMetric, boolean createIfNotPresent) { |
| |
| TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), |
| timelineClusterMetric.getAppId(), timelineClusterMetric.getInstanceId()); |
| |
| TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); |
| if (timelineMetricMetadata != null) { |
| byte[] uuid = timelineMetricMetadata.getUuid(); |
| if (uuid != null) { |
| return uuid; |
| } |
| } |
| |
| if (!createIfNotPresent) { |
| LOG.debug("UUID not found for " + key + ", createIfNotPresent is false"); |
| return null; |
| } |
| |
| byte[] uuidBytes = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH); |
| |
| TimelineMetricUuid uuid = new TimelineMetricUuid(uuidBytes); |
| if (uuidKeyMap.containsKey(uuid) && !uuidKeyMap.get(uuid).equals(key)) { |
| TimelineMetricMetadataKey collidingKey = uuidKeyMap.get(uuid); |
| LOG.error("Duplicate key " + uuid + " computed for " + timelineClusterMetric + ", Collides with " + collidingKey); |
| return null; |
| } |
| |
| if (timelineMetricMetadata == null) { |
| timelineMetricMetadata = new TimelineMetricMetadata(); |
| timelineMetricMetadata.setMetricName(timelineClusterMetric.getMetricName()); |
| timelineMetricMetadata.setAppId(timelineClusterMetric.getAppId()); |
| timelineMetricMetadata.setInstanceId(timelineClusterMetric.getInstanceId()); |
| METADATA_CACHE.put(key, timelineMetricMetadata); |
| } |
| |
| timelineMetricMetadata.setUuid(uuid.uuid); |
| timelineMetricMetadata.setIsPersisted(false); |
| uuidKeyMap.put(uuid, key); |
| return uuid.uuid; |
| } |
| |
| /** |
| * Given a timelineMetric instance, generates a UUID for Metric-App-Instance-Host combination. |
| * @param timelineMetric The timeline metric for which the UUID needs to be generated. |
| * @param createIfNotPresent Generate UUID if not present. |
| * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + 'HOSTNAME_UUID_LENGTH' |
| */ |
| public byte[] getUuid(TimelineMetric timelineMetric, boolean createIfNotPresent) { |
| |
| byte[] metricUuid = getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), |
| timelineMetric.getInstanceId(), -1l), createIfNotPresent); |
| byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName(), createIfNotPresent); |
| |
| if (hostUuid != null) { |
| putIfModifiedHostedAppsMetadata(timelineMetric.getHostName(), timelineMetric.getAppId()); |
| } |
| |
| if (metricUuid == null || hostUuid == null) { |
| return null; |
| } |
| return ArrayUtils.addAll(metricUuid, hostUuid); |
| } |
| |
| /** |
| * Given a metric name, appId, instanceId and hotname, generates a UUID for Metric-App-Instance-Host combination. |
| * @param createIfNotPresent Generate UUID if not present. |
| * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + 'HOSTNAME_UUID_LENGTH' |
| */ |
| public byte[] getUuid(String metricName, String appId, String instanceId, String hostname, boolean createIfNotPresent) { |
| |
| byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, instanceId, -1l), createIfNotPresent); |
| if (StringUtils.isNotEmpty(hostname)) { |
| byte[] hostUuid = getUuidForHostname(hostname, createIfNotPresent); |
| if (hostUuid == null || metricUuid == null) { |
| return null; |
| } |
| return ArrayUtils.addAll(metricUuid, hostUuid); |
| } |
| return metricUuid; |
| } |
| |
| public String getMetricNameFromUuid(byte[] uuid) { |
| |
| byte[] metricUuid = uuid; |
| if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) { |
| metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); |
| } |
| |
| TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid)); |
| return key != null ? key.getMetricName() : null; |
| } |
| |
| /** |
| * Given a UUID (from DB hopefully), return the timeline metric it is associated with. |
| * @param uuid 'TIMELINE_METRIC_UUID_LENGTH' + 'HOSTNAME_UUID_LENGTH' byte UUID. |
| * @return TimelineMetric object if present in the metadata. |
| */ |
| public TimelineMetric getMetricFromUuid(byte[] uuid) { |
| if (uuid == null) { |
| return null; |
| } |
| |
| if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) { |
| TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(uuid)); |
| return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null; |
| } else { |
| byte[] metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); |
| TimelineMetricMetadataKey key = uuidKeyMap.get(new TimelineMetricUuid(metricUuid)); |
| if (key == null) { |
| LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid)); |
| return null; |
| } |
| TimelineMetric timelineMetric = new TimelineMetric(); |
| timelineMetric.setMetricName(key.metricName); |
| timelineMetric.setAppId(key.appId); |
| timelineMetric.setInstanceId(key.instanceId); |
| |
| byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH); |
| timelineMetric.setHostName(uuidHostMap.get(new TimelineMetricUuid(hostUuid))); |
| return timelineMetric; |
| } |
| } |
| |
| public List<byte[]> getUuidsForGetMetricQuery(Collection<String> metricNames, |
| List<String> hostnames, |
| String appId, |
| String instanceId) { |
| return getUuidsForGetMetricQuery(metricNames, hostnames, appId, instanceId, Collections.EMPTY_LIST); |
| } |
| /** |
| * Returns the set of UUIDs for a given GET request. If there are wildcards (%), resolves them based on UUID map. |
| * If metricName-App-Instance or hostname not present in Metadata, the combination will be skipped. |
| * @param metricNames |
| * @param hostnames |
| * @param appId |
| * @param instanceId |
| * @return Set of UUIds |
| */ |
| public List<byte[]> getUuidsForGetMetricQuery(Collection<String> metricNames, |
| List<String> hostnames, |
| String appId, |
| String instanceId, |
| List<String> transientMetricNames) { |
| |
| Collection<String> sanitizedMetricNames = new HashSet<>(); |
| List<byte[]> uuids = new ArrayList<>(); |
| |
| for (String metricName : metricNames) { |
| if (metricName.contains("%")) { |
| String metricRegEx = getJavaRegexFromSqlRegex(metricName); |
| for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { |
| String metricNameFromMetadata = key.getMetricName(); |
| if (metricNameFromMetadata.matches(metricRegEx)) { |
| sanitizedMetricNames.add(metricNameFromMetadata); |
| } |
| } |
| } else { |
| sanitizedMetricNames.add(metricName); |
| } |
| } |
| |
| if(sanitizedMetricNames.isEmpty()) { |
| return uuids; |
| } |
| |
| Set<String> sanitizedHostNames = getSanitizedHostnames(hostnames); |
| |
| if ( StringUtils.isNotEmpty(appId) && !(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { //HACK.. Why?? |
| appId = appId.toLowerCase(); |
| } |
| if (CollectionUtils.isNotEmpty(sanitizedHostNames)) { |
| if (CollectionUtils.isNotEmpty(sanitizedMetricNames)) { |
| |
| //Skip getting UUID if it is a transient metric. |
| //An attempt to get it will also be OK as we don't add null UUIDs. |
| for (String metricName : sanitizedMetricNames) { |
| if (isTransientMetric(metricName, appId)) { |
| transientMetricNames.add(metricName); |
| continue; |
| } |
| TimelineMetric metric = new TimelineMetric(); |
| metric.setMetricName(metricName); |
| metric.setAppId(appId); |
| metric.setInstanceId(instanceId); |
| for (String hostname : sanitizedHostNames) { |
| metric.setHostName(hostname); |
| byte[] uuid = getUuid(metric, false); |
| if (uuid != null) { |
| uuids.add(uuid); |
| } |
| } |
| } |
| } else { |
| for (String hostname : sanitizedHostNames) { |
| byte[] uuid = getUuidForHostname(hostname, false); |
| if (uuid != null) { |
| uuids.add(uuid); |
| } |
| } |
| } |
| } else { |
| for (String metricName : sanitizedMetricNames) { |
| //Skip getting UUID if it is a transient metric. An attempt to get it will also be OK as we don't add null UUIDs. |
| if (isTransientMetric(metricName, appId)) { |
| transientMetricNames.add(metricName); |
| continue; |
| } |
| TimelineClusterMetric metric = new TimelineClusterMetric(metricName, appId, instanceId, -1l); |
| byte[] uuid = getUuid(metric, false); |
| if (uuid != null) { |
| uuids.add(uuid); |
| } |
| } |
| } |
| |
| return uuids; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| public List<String> getNotLikeHostnames(List<String> hostnames) { |
| List<String> result = new ArrayList<>(); |
| Set<String> sanitizedHostNames = getSanitizedHostnames(hostnames); |
| for (String hostname: HOSTED_APPS_MAP.keySet()) { |
| if (!sanitizedHostNames.contains(hostname)) { |
| result.add(hostname); |
| } |
| } |
| return result; |
| } |
| |
| private Set<String> getSanitizedHostnames(List<String> hostnamedWithOrWithoutWildcard) { |
| |
| Set<String> sanitizedHostNames = new HashSet<>(); |
| if (CollectionUtils.isNotEmpty(hostnamedWithOrWithoutWildcard)) { |
| for (String hostname : hostnamedWithOrWithoutWildcard) { |
| if (hostname.contains("%")) { |
| String hostRegEx; |
| hostRegEx = hostname.replace("%", ".*"); |
| for (String host : HOSTED_APPS_MAP.keySet()) { |
| if (host.matches(hostRegEx)) { |
| sanitizedHostNames.add(host); |
| } |
| } |
| } else { |
| sanitizedHostNames.add(hostname); |
| } |
| } |
| } |
| return sanitizedHostNames; |
| } |
| |
| /** |
| * |
| * @param appId |
| * @param metricPattern |
| * @param includeBlacklistedMetrics |
| * @return |
| * @throws SQLException |
| * @throws IOException |
| */ |
| public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadataByAppId(String appId, String metricPattern, |
| boolean includeBlacklistedMetrics) throws SQLException, IOException { |
| |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataCache(); |
| |
| boolean filterByAppId = StringUtils.isNotEmpty(appId); |
| boolean filterByMetricName = StringUtils.isNotEmpty(metricPattern); |
| Pattern metricFilterPattern = null; |
| if (filterByMetricName) { |
| metricFilterPattern = Pattern.compile(metricPattern); |
| } |
| |
| // Group Metadata by AppId |
| Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); |
| for (TimelineMetricMetadata metricMetadata : metadata.values()) { |
| |
| if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) { |
| continue; |
| } |
| |
| String currentAppId = metricMetadata.getAppId(); |
| if (filterByAppId && !currentAppId.equals(appId)) { |
| continue; |
| } |
| |
| if (filterByMetricName) { |
| Matcher m = metricFilterPattern.matcher(metricMetadata.getMetricName()); |
| if (!m.find()) { |
| continue; |
| } |
| } |
| |
| List<TimelineMetricMetadata> metadataList = metadataByAppId.get(currentAppId); |
| if (metadataList == null) { |
| metadataList = new ArrayList<>(); |
| metadataByAppId.put(currentAppId, metadataList); |
| } |
| |
| metadataList.add(metricMetadata); |
| } |
| |
| return metadataByAppId; |
| } |
| |
| /** |
| * Returns metadata summary |
| * @return |
| * @throws IOException |
| * @throws SQLException |
| */ |
| public Map<String, String> getMetadataSummary() throws IOException, SQLException { |
| Map<String, String> summary = new HashMap<>(); |
| summary.put("Number of Hosts", String.valueOf(HOSTED_APPS_MAP.size())); |
| Map<String, List<TimelineMetricMetadata>> metadataMap = getTimelineMetricMetadataByAppId(StringUtils.EMPTY, |
| StringUtils.EMPTY, |
| true); |
| |
| if (metadataMap != null) { |
| for (String appId : metadataMap.keySet()) { |
| summary.put(appId, String.valueOf(metadataMap.get(appId).size())); |
| } |
| } |
| return summary; |
| } |
| |
| |
| /** |
| * |
| * @param metricName |
| * @param appId |
| * @return |
| */ |
| public boolean isTransientMetric(String metricName, String appId) { |
| //Currently we use only metric name. In the future we may use appId as well. |
| |
| for (String pattern : transientMetricPatterns) { |
| if (metricName.matches(pattern)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Run TimelineMetricMetadataSync once |
| */ |
| public void forceMetricsMetadataSync() { |
| metricMetadataSync.run(); |
| } |
| |
| public void updateMetadataCacheUsingV1Tables() throws SQLException { |
| Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataV1Map = getMetadataFromStoreV1(); |
| for (TimelineMetricMetadataKey key: METADATA_CACHE.keySet()) { |
| TimelineMetricMetadata cacheValue = METADATA_CACHE.get(key); |
| TimelineMetricMetadata oldValue = metadataV1Map.get(key); |
| |
| if (oldValue != null) { |
| if (!cacheValue.isPersisted()) { |
| LOG.info(String.format("Updating properties for %s", key)); |
| cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime()); |
| cacheValue.setSupportsAggregates(oldValue.isSupportsAggregates()); |
| cacheValue.setType(oldValue.getType()); |
| cacheValue.setIsWhitelisted(oldValue.isWhitelisted()); |
| } else if (oldValue.getSeriesStartTime() < cacheValue.getSeriesStartTime() && |
| cacheValue.getSeriesStartTime() != 0L && |
| cacheValue.isWhitelisted()) |
| { |
| LOG.info(String.format("Updating startTime for %s", key)); |
| cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime()); |
| cacheValue.setIsPersisted(false); |
| } |
| } |
| } |
| } |
| } |