| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.metrics2.sink.timeline; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Suppliers; |
| import com.google.common.reflect.TypeToken; |
| import com.google.gson.Gson; |
| import com.google.gson.JsonSyntaxException; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper; |
| import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException; |
| import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy; |
| import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy; |
| import org.apache.http.HttpStatus; |
| import org.codehaus.jackson.map.AnnotationIntrospector; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.map.annotate.JsonSerialize; |
| import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; |
| |
| import javax.net.ssl.HttpsURLConnection; |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLSocketFactory; |
| import javax.net.ssl.TrustManagerFactory; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.StringWriter; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.security.KeyStore; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| public abstract class AbstractTimelineMetricsSink { |
| public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; |
| public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize"; |
| public static final String METRICS_SEND_INTERVAL = "sendInterval"; |
| public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout"; |
| public static final String COLLECTOR_HOSTS_PROPERTY = "collector.hosts"; |
| public static final String COLLECTOR_PROTOCOL = "protocol"; |
| public static final String COLLECTOR_PORT = "port"; |
| public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum"; |
| public static final String COLLECTOR_ZOOKEEPER_QUORUM = "metrics.zookeeper.quorum"; |
| public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; |
| public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative"; |
| public static final String RPC_METRIC_PREFIX = "metric.rpc"; |
| public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; |
| public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path"; |
| public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type"; |
| public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password"; |
| public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation"; |
| public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port"; |
| public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes"; |
| public static final String INSTANCE_ID_PROPERTY = "instanceId"; |
| public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId"; |
| public static final String COOKIE = "Cookie"; |
| private static final String WWW_AUTHENTICATE = "WWW-Authenticate"; |
| private static final String NEGOTIATE = "Negotiate"; |
| |
| protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0); |
| public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100; |
| protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0); |
| public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20; |
| public int ZK_CONNECT_TRY_COUNT = 10; |
| public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000; |
| public boolean shardExpired = true; |
| private int zookeeperMinBackoffTimeMins = 2; |
| private int zookeeperMaxBackoffTimeMins = 5; |
| private long zookeeperBackoffTimeMillis; |
| private long lastFailedZkRequestTime = 0l; |
| |
| private SSLSocketFactory sslSocketFactory; |
| private AppCookieManager appCookieManager = null; |
| |
| protected final Log LOG; |
| |
| protected static ObjectMapper mapper; |
| |
| protected MetricCollectorHAHelper collectorHAHelper; |
| |
| protected MetricSinkWriteShardStrategy metricSinkWriteShardStrategy; |
| |
| // Single element cache with fixed expiration - Helps adjacent Sinks as |
| // well as timed refresh |
| protected Supplier<String> targetCollectorHostSupplier; |
| |
| protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>(); |
| |
| private volatile boolean isInitializedForHA = false; |
| |
| @SuppressWarnings("all") |
| private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5; |
| |
| private final Gson gson = new Gson(); |
| |
| private final Random rand = new Random(); |
| |
| private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75; |
| private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60; |
| |
| static { |
| mapper = new ObjectMapper(); |
| AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); |
| mapper.setAnnotationIntrospector(introspector); |
| mapper.getSerializationConfig() |
| .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); |
| } |
| |
| public AbstractTimelineMetricsSink() { |
| LOG = LogFactory.getLog(this.getClass()); |
| } |
| |
| /** |
| * Initialize Sink write strategy with respect to HA Collector |
| */ |
| protected void init() { |
| metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname()); |
| collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(), |
| ZK_CONNECT_TRY_COUNT, ZK_SLEEP_BETWEEN_RETRY_TIME); |
| zookeeperBackoffTimeMillis = getZookeeperBackoffTimeMillis(); |
| isInitializedForHA = true; |
| } |
| |
| protected boolean emitMetricsJson(String connectUrl, String jsonData) { |
| int timeout = getTimeoutSeconds() * 1000; |
| HttpURLConnection connection = null; |
| try { |
| if (connectUrl == null) { |
| throw new IOException("Unknown URL. Unable to connect to metrics collector."); |
| } |
| connection = connectUrl.startsWith("https") ? |
| getSSLConnection(connectUrl) : getConnection(connectUrl); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("emitMetricsJson to " + connectUrl + ", " + jsonData); |
| } |
| AppCookieManager appCookieManager = getAppCookieManager(); |
| String appCookie = appCookieManager.getCachedAppCookie(connectUrl); |
| if (appCookie != null) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Using cached app cookie for URL:" + connectUrl); |
| } |
| connection.setRequestProperty(COOKIE, appCookie); |
| } |
| |
| connection.setRequestMethod("POST"); |
| connection.setRequestProperty("Content-Type", "application/json"); |
| connection.setRequestProperty("Connection", "Keep-Alive"); |
| connection.setConnectTimeout(timeout); |
| connection.setReadTimeout(timeout); |
| connection.setDoOutput(true); |
| |
| if (jsonData != null) { |
| try (OutputStream os = connection.getOutputStream()) { |
| os.write(jsonData.getBytes("UTF-8")); |
| } |
| } |
| |
| int statusCode = connection.getResponseCode(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("emitMetricsJson: statusCode = " + statusCode); |
| } |
| |
| if (statusCode == HttpStatus.SC_UNAUTHORIZED ) { |
| String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + connectUrl); |
| } |
| if (wwwAuthHeader != null && wwwAuthHeader.trim().startsWith(NEGOTIATE)) { |
| appCookie = appCookieManager.getAppCookie(connectUrl, true); |
| if (appCookie != null) { |
| connection.setRequestProperty(COOKIE, appCookie); |
| |
| if (jsonData != null) { |
| try (OutputStream os = connection.getOutputStream()) { |
| os.write(jsonData.getBytes("UTF-8")); |
| } |
| } |
| |
| statusCode = connection.getResponseCode(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("emitMetricsJson: statusCode2 = " + statusCode); |
| } |
| } |
| } else { |
| // no supported authentication type found |
| // we would let the original response propagate |
| LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + connectUrl); |
| } |
| } |
| |
| if (statusCode != 200) { |
| LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + |
| "statusCode = " + statusCode); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Metrics posted to Collector " + connectUrl); |
| } |
| } |
| cleanupInputStream(connection.getInputStream()); |
| // reset failedCollectorConnectionsCounter to "0" |
| failedCollectorConnectionsCounter.set(0); |
| return true; |
| } catch (IOException ioe) { |
| StringBuilder errorMessage = |
| new StringBuilder("Unable to connect to collector, " + connectUrl + "\n" |
| + "This exceptions will be ignored for next " + NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS + " times\n"); |
| try { |
| if ((connection != null)) { |
| errorMessage.append(cleanupInputStream(connection.getErrorStream())); |
| } |
| } catch (IOException e) { |
| //NOP |
| } |
| |
| if (failedCollectorConnectionsCounter.getAndIncrement() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(errorMessage, ioe); |
| } else { |
| LOG.info(errorMessage); |
| } |
| throw new UnableToConnectException(ioe).setConnectUrl(connectUrl); |
| } else { |
| failedCollectorConnectionsCounter.compareAndSet(NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS, 0); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format("Ignoring %s AMS connection exceptions", NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS)); |
| } |
| return false; |
| } |
| } |
| } |
| |
| protected String getCurrentCollectorHost() { |
| String collectorHost; |
| // Get cached target |
| if (targetCollectorHostSupplier != null) { |
| collectorHost = targetCollectorHostSupplier.get(); |
| // Last X attempts have failed - force refresh |
| if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { |
| LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors."); |
| allKnownLiveCollectors.remove(collectorHost); |
| targetCollectorHostSupplier = null; |
| collectorHost = findPreferredCollectHost(); |
| } |
| } else { |
| collectorHost = findPreferredCollectHost(); |
| } |
| |
| if (collectorHost == null) { |
| if (nullCollectorCounter.getAndIncrement() == 0) { |
| LOG.info("No live collector to send metrics to. Metrics to be sent will be discarded. " + |
| "This message will be skipped for the next " + NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS + " times."); |
| } else { |
| nullCollectorCounter.compareAndSet(NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS, 0); |
| } |
| } else { |
| nullCollectorCounter.set(0); |
| } |
| return collectorHost; |
| } |
| |
| protected boolean emitMetrics(TimelineMetrics metrics) { |
| String connectUrl; |
| if (isHostInMemoryAggregationEnabled()) { |
| connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort())); |
| } else { |
| String collectorHost = getCurrentCollectorHost(); |
| connectUrl = getCollectorUri(collectorHost); |
| } |
| |
| String jsonData = null; |
| LOG.debug("EmitMetrics connectUrl = " + connectUrl); |
| try { |
| jsonData = mapper.writeValueAsString(metrics); |
| } catch (IOException e) { |
| LOG.error("Unable to parse metrics", e); |
| } |
| if (jsonData != null) { |
| return emitMetricsJson(connectUrl, jsonData); |
| } |
| return false; |
| } |
| |
| /** |
| * Get the associated app cookie manager. |
| * |
| * @return the app cookie manager |
| */ |
| public synchronized AppCookieManager getAppCookieManager() { |
| if (appCookieManager == null) { |
| appCookieManager = new AppCookieManager(); |
| } |
| return appCookieManager; |
| } |
| |
| /** |
| * Cleans up and closes an input stream |
| * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html |
| * @param is the InputStream to clean up |
| * @return string read from the InputStream |
| * @throws IOException |
| */ |
| protected String cleanupInputStream(InputStream is) throws IOException { |
| StringBuilder sb = new StringBuilder(); |
| if (is != null) { |
| try ( |
| InputStreamReader isr = new InputStreamReader(is); |
| BufferedReader br = new BufferedReader(isr) |
| ) { |
| // read the response body |
| String line; |
| while ((line = br.readLine()) != null) { |
| if (LOG.isDebugEnabled()) { |
| sb.append(line); |
| } |
| } |
| } finally { |
| is.close(); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| // Get a connection |
| protected HttpURLConnection getConnection(String spec) throws IOException { |
| return (HttpURLConnection) new URL(spec).openConnection(); |
| } |
| |
| // Get an ssl connection |
| protected HttpsURLConnection getSSLConnection(String spec) |
| throws IOException, IllegalStateException { |
| |
| HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection()); |
| |
| connection.setSSLSocketFactory(sslSocketFactory); |
| |
| return connection; |
| } |
| |
| protected void loadTruststore(String trustStorePath, String trustStoreType, |
| String trustStorePassword) { |
| if (sslSocketFactory == null) { |
| if (trustStorePath == null || trustStorePassword == null) { |
| String msg = "Can't load TrustStore. Truststore path or password is not set."; |
| LOG.error(msg); |
| throw new IllegalStateException(msg); |
| } |
| FileInputStream in = null; |
| try { |
| in = new FileInputStream(new File(trustStorePath)); |
| KeyStore store = KeyStore.getInstance(trustStoreType == null ? |
| KeyStore.getDefaultType() : trustStoreType); |
| store.load(in, trustStorePassword.toCharArray()); |
| TrustManagerFactory tmf = TrustManagerFactory |
| .getInstance(TrustManagerFactory.getDefaultAlgorithm()); |
| tmf.init(store); |
| SSLContext context = SSLContext.getInstance("TLS"); |
| context.init(null, tmf.getTrustManagers(), null); |
| sslSocketFactory = context.getSocketFactory(); |
| } catch (Exception e) { |
| LOG.error("Unable to load TrustStore", e); |
| } finally { |
| if (in != null) { |
| try { |
| in.close(); |
| } catch (IOException e) { |
| LOG.error("Unable to load TrustStore", e); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Find appropriate write shard for this sink using the {@link org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy} |
| * |
| * 1. Use configured collector(s) to discover available collectors |
| * 2. If configured collector(s) are unresponsive check Zookeeper to find live hosts |
| * 3. Refresh known collector list using ZK |
| * 4. Default: Return configured collector with no side effect due to discovery. |
| * |
| * throws {#link MetricsSinkInitializationException} if called before |
| * initialization, not other side effect |
| * |
| * @return String Collector hostname |
| */ |
| protected synchronized String findPreferredCollectHost() { |
| if (!isInitializedForHA) { |
| init(); |
| } |
| |
| shardExpired = false; |
| // Auto expire and re-calculate after 1 hour |
| if (targetCollectorHostSupplier != null) { |
| String targetCollector = targetCollectorHostSupplier.get(); |
| if (targetCollector != null) { |
| return targetCollector; |
| } |
| } |
| |
| // Reach out to all configured collectors before Zookeeper |
| Collection<String> collectorHosts = getConfiguredCollectorHosts(); |
| refreshCollectorsFromConfigured(collectorHosts); |
| |
| // Lookup Zookeeper for live hosts - max 10 seconds wait time |
| long currentTime = System.currentTimeMillis(); |
| if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null |
| && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) { |
| |
| LOG.debug("No live collectors from configuration. Requesting zookeeper..."); |
| allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); |
| boolean noNewCollectorFromZk = true; |
| for (String collectorHostFromZk : allKnownLiveCollectors) { |
| if (!collectorHosts.contains(collectorHostFromZk)) { |
| noNewCollectorFromZk = false; |
| break; |
| } |
| } |
| if (noNewCollectorFromZk) { |
| LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis"); |
| lastFailedZkRequestTime = System.currentTimeMillis(); |
| } |
| } |
| |
| if (allKnownLiveCollectors.size() != 0) { |
| targetCollectorHostSupplier = Suppliers.memoizeWithExpiration( |
| new Supplier<String>() { |
| @Override |
| public String get() { |
| //shardExpired flag is used to determine if the Supplier.get() is invoked through the |
| // findPreferredCollectHost method (No need to refresh collector hosts |
| // OR |
| // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). |
| if (shardExpired) { |
| refreshCollectorsFromConfigured(getConfiguredCollectorHosts()); |
| } |
| return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); |
| } |
| }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes) |
| rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES |
| - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) |
| + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES, |
| TimeUnit.MINUTES |
| ); |
| |
| String collectorHost = targetCollectorHostSupplier.get(); |
| shardExpired = true; |
| return collectorHost; |
| } |
| LOG.debug("Couldn't find any live collectors. Returning null"); |
| shardExpired = true; |
| return null; |
| } |
| |
| private void refreshCollectorsFromConfigured(Collection<String> collectorHosts) { |
| |
| LOG.debug("Trying to find live collector host from : " + collectorHosts); |
| if (collectorHosts != null && !collectorHosts.isEmpty()) { |
| for (String hostStr : collectorHosts) { |
| hostStr = hostStr.trim(); |
| if (!hostStr.isEmpty()) { |
| try { |
| Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort()); |
| // Update live Hosts - current host will already be a part of this |
| for (String host : liveHosts) { |
| allKnownLiveCollectors.add(host); |
| } |
| break; // Found at least 1 live collector |
| } catch (MetricCollectorUnavailableException e) { |
| LOG.debug("Collector " + hostStr + " is not longer live. Removing " + |
| "it from list of know live collector hosts : " + allKnownLiveCollectors); |
| allKnownLiveCollectors.remove(hostStr); |
| } |
| } |
| } |
| } |
| } |
| |
| Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException { |
| List<String> collectors = new ArrayList<>(); |
| HttpURLConnection connection = null; |
| StringBuilder sb = new StringBuilder(getCollectorProtocol()); |
| sb.append("://"); |
| sb.append(host); |
| sb.append(":"); |
| sb.append(port); |
| sb.append(COLLECTOR_LIVE_NODES_PATH); |
| String connectUrl = sb.toString(); |
| LOG.debug("Requesting live collector nodes : " + connectUrl); |
| try { |
| connection = getCollectorProtocol().startsWith("https") ? |
| getSSLConnection(connectUrl) : getConnection(connectUrl); |
| |
| connection.setRequestMethod("GET"); |
| // 5 seconds for this op is plenty of wait time |
| connection.setConnectTimeout(3000); |
| connection.setReadTimeout(2000); |
| |
| int responseCode = connection.getResponseCode(); |
| if (responseCode == 200) { |
| try (InputStream in = connection.getInputStream()) { |
| StringWriter writer = new StringWriter(); |
| IOUtils.copy(in, writer); |
| try { |
| collectors = gson.fromJson(writer.toString(), new TypeToken<List<String>>(){}.getType()); |
| } catch (JsonSyntaxException jse) { |
| // Swallow this at the behest of still trying to POST |
| LOG.debug("Exception deserializing the json data on live " + |
| "collector nodes.", jse); |
| } |
| } |
| } |
| |
| } catch (IOException ioe) { |
| StringBuilder errorMessage = |
| new StringBuilder("Unable to connect to collector, " + connectUrl); |
| try { |
| if ((connection != null)) { |
| errorMessage.append(cleanupInputStream(connection.getErrorStream())); |
| } |
| } catch (IOException e) { |
| //NOP |
| } |
| LOG.debug(errorMessage); |
| LOG.debug(ioe); |
| String warnMsg = "Unable to connect to collector to find live nodes."; |
| throw new MetricCollectorUnavailableException(warnMsg); |
| } |
| return collectors; |
| } |
| |
| // Constructing without UriBuilder to avoid unfavorable httpclient |
| // dependencies |
| protected String constructTimelineMetricUri(String protocol, String host, String port) { |
| StringBuilder sb = new StringBuilder(protocol); |
| sb.append("://"); |
| sb.append(host); |
| sb.append(":"); |
| sb.append(port); |
| sb.append(WS_V1_TIMELINE_METRICS); |
| return sb.toString(); |
| } |
| |
| /** |
| * Parses input Sting of format "host1,host2" into Collection of hostnames |
| */ |
| public Collection<String> parseHostsStringIntoCollection(String hostsString) { |
| Set<String> hosts = new HashSet<>(); |
| |
| if (StringUtils.isEmpty(hostsString)) { |
| LOG.error("No Metric collector configured."); |
| return hosts; |
| } |
| |
| |
| for (String host : hostsString.split(",")) { |
| if (StringUtils.isEmpty(host)) |
| continue; |
| hosts.add(host.trim()); |
| } |
| |
| return hosts; |
| } |
| |
| |
| private long getZookeeperBackoffTimeMillis() { |
| return (zookeeperMinBackoffTimeMins + |
| rand.nextInt(zookeeperMaxBackoffTimeMins - zookeeperMinBackoffTimeMins + 1)) * 60*1000l; |
| } |
| |
| /** |
| * Get a pre-formatted URI for the collector |
| */ |
| abstract protected String getCollectorUri(String host); |
| |
| abstract protected String getCollectorProtocol(); |
| |
| abstract protected String getCollectorPort(); |
| |
| /** |
| * How soon to timeout on the emit calls in seconds. |
| */ |
| abstract protected int getTimeoutSeconds(); |
| |
| /** |
| * Get the zookeeper quorum for the cluster used to find collector |
| * @return String "host1:port1,host2:port2" |
| */ |
| abstract protected String getZookeeperQuorum(); |
| |
| /** |
| * Get pre-configured list of collectors hosts available |
| * @return Collection<String> host1,host2 |
| */ |
| abstract protected Collection<String> getConfiguredCollectorHosts(); |
| |
| /** |
| * Get hostname used for calculating write shard. |
| * @return String "host1" |
| */ |
| abstract protected String getHostname(); |
| |
| /** |
| * Check if host in-memory aggregation is enabled |
| * @return |
| */ |
| abstract protected boolean isHostInMemoryAggregationEnabled(); |
| |
| /** |
| * In memory aggregation port |
| * @return |
| */ |
| abstract protected int getHostInMemoryAggregationPort(); |
| } |