| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.metricsmgr.sink.metricscache; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.time.temporal.ChronoUnit; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.heron.common.basics.Communicator; |
| import org.apache.heron.common.basics.NIOLooper; |
| import org.apache.heron.common.basics.SingletonRegistry; |
| import org.apache.heron.common.basics.SysUtils; |
| import org.apache.heron.common.basics.TypeUtils; |
| import org.apache.heron.common.config.SystemConfig; |
| import org.apache.heron.common.network.HeronSocketOptions; |
| import org.apache.heron.metricsmgr.MetricsManagerServer; |
| import org.apache.heron.metricsmgr.MetricsUtil; |
| import org.apache.heron.proto.tmaster.TopologyMaster; |
| import org.apache.heron.spi.metricsmgr.metrics.ExceptionInfo; |
| import org.apache.heron.spi.metricsmgr.metrics.MetricsFilter; |
| import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo; |
| import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord; |
| import org.apache.heron.spi.metricsmgr.sink.IMetricsSink; |
| import org.apache.heron.spi.metricsmgr.sink.SinkContext; |
| |
| /** |
| * An IMetricsSink sends Metrics to MetricsCache. |
| * 1. It gets the MetricsCacheLocation |
| * <p> |
| * 2. Then it would construct a long-live Service running metricsCacheClient, which could automatically |
| * recover from uncaught exceptions, i.e. close the old one and start a new one. |
| * Also, it provides API to update the MetricsCacheLocation that metricsCacheClient need to connect and |
| * restart the metricsCacheClient. |
| * There are two scenarios we need to restart a metricsCacheClient in our case: |
| * <p> |
| * -- Uncaught exceptions happen within metricsCacheClient; then we would restart metricsCacheClient inside |
| * the same ExecutorService inside the UncaughtExceptionHandlers. |
| * Notice that, in java, exceptions occur inside UncaughtExceptionHandlers would not invoke |
| * UncaughtExceptionHandlers; instead, it would kill the thread with that exception. |
| * So if exceptions thrown during restart a new metricsCacheClient, this MetricsCacheSink would die, and |
| * external logic would take care of it. |
| * <p> |
| * -- MetricsCacheLocation changes (though in fact, metricsCacheClient might also throw exceptions in this case), |
| * in this case, we would invoke MetricsCacheService to start from tMasterLocationStarter's thread. |
| * But the MetricsCacheService and metricsCacheClient still start wihtin the thread they run. |
| * <p> |
| * 3. When a new MetricsRecord comes by invoking processRecord, it would push the MetricsRecord |
| * to the Communicator Queue to metricsCacheClient |
| * <p> |
| * Notice that we would not send all metrics to MetricsCache; we would use MetricsFilter to figure out |
| * needed metrics. |
| */ |
| |
| public class MetricsCacheSink implements IMetricsSink { |
| private static final Logger LOG = Logger.getLogger(MetricsCacheSink.class.getName()); |
| |
| // These configs would be read from metrics-sink-configs.yaml |
| private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC = |
| "metricscache-location-check-interval-sec"; |
| private static final String KEY_TMASTER = "metricscache-client"; |
| private static final String KEY_TMASTER_RECONNECT_INTERVAL_SEC = "reconnect-interval-second"; |
| private static final String KEY_NETWORK_WRITE_BATCH_SIZE_BYTES = "network-write-batch-size-bytes"; |
| private static final String KEY_NETWORK_WRITE_BATCH_TIME_MS = "network-write-batch-time-ms"; |
| private static final String KEY_NETWORK_READ_BATCH_SIZE_BYTES = "network-read-batch-size-bytes"; |
| private static final String KEY_NETWORK_READ_BATCH_TIME_MS = "network-read-batch-time-ms"; |
| private static final String KEY_SOCKET_SEND_BUFFER_BYTES = "socket-send-buffer-size-bytes"; |
| private static final String KEY_SOCKET_RECEIVED_BUFFER_BYTES = |
| "socket-received-buffer-size-bytes"; |
| private static final String KEY_TMASTER_METRICS_TYPE = "metricscache-metrics-type"; |
| |
| // Bean name to fetch the MetricsCacheLocation object from SingletonRegistry |
| // private static final String TMASTER_LOCATION_BEAN_NAME = |
| // TopologyMaster.MetricsCacheLocation.newBuilder().getDescriptorForType().getFullName(); |
| // Metrics Counter Name |
| private static final String METRICS_COUNT = "metrics-count"; |
| private static final String EXCEPTIONS_COUNT = "exceptions-count"; |
| private static final String RECORD_PROCESS_COUNT = "record-process-count"; |
| private static final String METRICSMGR_RESTART_COUNT = "metricsmgr-restart-count"; |
| private static final String METRICSMGR_LOCATION_UPDATE_COUNT = "metricsmgr-location-update-count"; |
| private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator = |
| new Communicator<>(); |
| private final MetricsFilter tMasterMetricsFilter = new MetricsFilter(); |
| private final Map<String, Object> sinkConfig = new HashMap<>(); |
| // A scheduled executor service to check whether the MetricsCacheLocation has changed |
| // If so, restart the metricsCacheClientService with the new MetricsCacheLocation |
| // Start of metricsCacheClientService will also be in this thread |
| private final ScheduledExecutorService tMasterLocationStarter = |
| Executors.newSingleThreadScheduledExecutor(); |
| private MetricsCacheClientService metricsCacheClientService; |
| // We need to cache it locally to check whether the MetricsCacheLocation is changed |
| // This field is changed only in ScheduledExecutorService's thread, |
| // so no need to make it volatile |
| private TopologyMaster.MetricsCacheLocation currentMetricsCacheLocation = null; |
| private SinkContext sinkContext; |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public void init(Map<String, Object> conf, SinkContext context) { |
| LOG.info("metricscache sink init"); |
| sinkConfig.putAll(conf); |
| |
| sinkContext = context; |
| |
| // Fill the tMasterMetricsFilter according to metrics-sink-configs.yaml |
| Map<String, String> tmasterMetricsType = |
| (Map<String, String>) sinkConfig.get(KEY_TMASTER_METRICS_TYPE); |
| if (tmasterMetricsType != null) { |
| for (Map.Entry<String, String> metricToType : tmasterMetricsType.entrySet()) { |
| String value = metricToType.getValue(); |
| MetricsFilter.MetricAggregationType type = |
| MetricsFilter.MetricAggregationType.valueOf(value); |
| tMasterMetricsFilter.setPrefixToType(metricToType.getKey(), type); |
| } |
| } |
| |
| // Construct the long-live metricsCacheClientService |
| metricsCacheClientService = |
| new MetricsCacheClientService((Map<String, Object>) |
| sinkConfig.get(KEY_TMASTER), metricsCommunicator); |
| |
| // Start the tMasterLocationStarter |
| startMetricsCacheChecker(); |
| } |
| |
| // Start the MetricsCacheCheck, which would check whether the MetricsCacheLocation is changed |
| // at an interval. |
| // If so, restart the metricsCacheClientService with the new MetricsCacheLocation |
| private void startMetricsCacheChecker() { |
| final int checkIntervalSec = |
| TypeUtils.getInteger(sinkConfig.get(KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC)); |
| |
| Runnable runnable = new Runnable() { |
| @Override |
| public void run() { |
| TopologyMaster.MetricsCacheLocation location = |
| (TopologyMaster.MetricsCacheLocation) SingletonRegistry.INSTANCE.getSingleton( |
| MetricsManagerServer.METRICSCACHE_LOCATION_BEAN_NAME); |
| |
| if (location != null) { |
| if (currentMetricsCacheLocation == null |
| || !location.equals(currentMetricsCacheLocation)) { |
| LOG.info("Update current MetricsCacheLocation to: " + location); |
| currentMetricsCacheLocation = location; |
| metricsCacheClientService.updateMetricsCacheLocation(currentMetricsCacheLocation); |
| metricsCacheClientService.startNewMasterClient(); |
| |
| // Update Metrics |
| sinkContext.exportCountMetric(METRICSMGR_LOCATION_UPDATE_COUNT, 1); |
| } |
| } |
| |
| // Schedule itself in future |
| tMasterLocationStarter.schedule(this, checkIntervalSec, TimeUnit.SECONDS); |
| } |
| }; |
| |
| // First Entry |
| tMasterLocationStarter.schedule(runnable, checkIntervalSec, TimeUnit.SECONDS); |
| LOG.info("MetricsCacheChecker started with interval: " + checkIntervalSec); |
| } |
| |
| @Override |
| public void processRecord(MetricsRecord record) { |
| LOG.info("metricscache sink processRecord"); |
| // Format it into TopologyMaster.PublishMetrics |
| |
| // The format of record is "host:port/componentName/instanceId" |
| // So MetricsRecord.getSource().split("/") would be an array with 3 elements: |
| // ["host:port", componentName, instanceId] |
| String[] sources = MetricsUtil.splitRecordSource(record); |
| String hostPort = sources[0]; |
| String componentName = sources[1]; |
| String instanceId = sources[2]; |
| |
| TopologyMaster.PublishMetrics.Builder publishMetrics = |
| TopologyMaster.PublishMetrics.newBuilder(); |
| |
| for (MetricsInfo metricsInfo : tMasterMetricsFilter.filter(record.getMetrics())) { |
| // We would filter out unneeded metrics |
| TopologyMaster.MetricDatum metricDatum = TopologyMaster.MetricDatum.newBuilder(). |
| setComponentName(componentName).setInstanceId(instanceId).setName(metricsInfo.getName()). |
| setValue(metricsInfo.getValue()).setTimestamp(record.getTimestamp()).build(); |
| publishMetrics.addMetrics(metricDatum); |
| } |
| |
| for (ExceptionInfo exceptionInfo : record.getExceptions()) { |
| String exceptionStackTrace = exceptionInfo.getStackTrace(); |
| String[] exceptionStackTraceLines = exceptionStackTrace.split("\r\n|[\r\n]", 3); |
| String exceptionStackTraceFirstTwoLines = String.join(System.lineSeparator(), |
| exceptionStackTraceLines[0], exceptionStackTraceLines[1]); |
| TopologyMaster.TmasterExceptionLog exceptionLog = |
| TopologyMaster.TmasterExceptionLog.newBuilder() |
| .setComponentName(componentName) |
| .setHostname(hostPort) |
| .setInstanceId(instanceId) |
| .setStacktrace(exceptionStackTraceFirstTwoLines) |
| .setLasttime(exceptionInfo.getLastTime()) |
| .setFirsttime(exceptionInfo.getFirstTime()) |
| .setCount(exceptionInfo.getCount()) |
| .setLogging(exceptionInfo.getLogging()).build(); |
| publishMetrics.addExceptions(exceptionLog); |
| } |
| |
| metricsCommunicator.offer(publishMetrics.build()); |
| |
| // Update metrics |
| sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1); |
| sinkContext.exportCountMetric(METRICS_COUNT, publishMetrics.getMetricsCount()); |
| sinkContext.exportCountMetric(EXCEPTIONS_COUNT, publishMetrics.getExceptionsCount()); |
| } |
| |
| @Override |
| public void flush() { |
| // We do nothing here but update metrics |
| sinkContext.exportCountMetric(METRICSMGR_RESTART_COUNT, |
| metricsCacheClientService.startedAttempts.longValue()); |
| } |
| |
| @Override |
| public void close() { |
| metricsCacheClientService.close(); |
| metricsCommunicator.clear(); |
| } |
| |
| @VisibleForTesting |
| MetricsCacheClientService getMetricsCacheClientService() { |
| return metricsCacheClientService; |
| } |
| |
| @VisibleForTesting |
| void createSimpleMetricsCacheClientService(Map<String, Object> serviceConfig) { |
| metricsCacheClientService = |
| new MetricsCacheClientService(serviceConfig, metricsCommunicator); |
| } |
| |
| @VisibleForTesting |
| MetricsCacheClient getMetricsCacheClient() { |
| return metricsCacheClientService.getMetricsCacheClient(); |
| } |
| |
| @VisibleForTesting |
| void startNewMetricsCacheClient(TopologyMaster.MetricsCacheLocation location) { |
| metricsCacheClientService.updateMetricsCacheLocation(location); |
| metricsCacheClientService.startNewMasterClient(); |
| } |
| |
| @VisibleForTesting |
| int getMetricsCacheStartedAttempts() { |
| return metricsCacheClientService.startedAttempts.get(); |
| } |
| |
| @VisibleForTesting |
| TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocation() { |
| return currentMetricsCacheLocation; |
| } |
| |
| @VisibleForTesting |
| TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocationInService() { |
| return metricsCacheClientService.getCurrentMetricsCacheLocation(); |
| } |
| |
| /** |
| * A long-live Service running metricsCacheClient |
| * It would automatically restart the metricsCacheClient connecting and communicating to the latest |
| * MetricsCacheLocation if any uncaught exceptions throw. |
| * <p> |
| * It provides startNewMasterClient(TopologyMaster.MetricsCacheLocation location), which would also |
| * update the currentMetricsCacheLocation to the lastest location. |
| * <p> |
| * So a new metricsCacheClient would start in two cases: |
| * 1. The old one threw exceptions and died. |
| * 2. startNewMasterClient() is invoked externally with MetricsCacheLocation. |
| */ |
| private static final class MetricsCacheClientService { |
| private final AtomicInteger startedAttempts = new AtomicInteger(0); |
| private final Map<String, Object> metricsCacheClientConfig; |
| private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator; |
| private final ExecutorService metricsCacheClientExecutor = |
| Executors.newSingleThreadExecutor(new MetricsCacheClientThreadFactory()); |
| private volatile MetricsCacheClient metricsCacheClient; |
| // We need to cache MetricsCacheLocation for failover case |
| // This value is set in ScheduledExecutorService' thread while |
| // it is used in metricsCacheClientService thread, |
| // so we need to make it volatile to guarantee the visiability. |
| private volatile TopologyMaster.MetricsCacheLocation currentMetricsCacheLocation; |
| |
| private MetricsCacheClientService( |
| Map<String, Object> metricsCacheClientConfig, |
| Communicator<TopologyMaster.PublishMetrics> metricsCommunicator) { |
| this.metricsCacheClientConfig = metricsCacheClientConfig; |
| this.metricsCommunicator = metricsCommunicator; |
| } |
| |
| // Update the MetricsCacheLocation to connect within the metricsCacheClient |
| // This method is thread-safe, since |
| // currentMetricsCacheLocation is volatile and we just replace it. |
| // In our scenario, it is only invoked when MetricsCacheLocation is changed, |
| // i.e. this method is only invoked in scheduled executor thread. |
| public void updateMetricsCacheLocation(TopologyMaster.MetricsCacheLocation location) { |
| currentMetricsCacheLocation = location; |
| } |
| |
| // This method could be invoked by different threads |
| // Make it synchronized to guarantee thread-safe |
| public synchronized void startNewMasterClient() { |
| |
| // Exit any running metricsCacheClient if there is any to release |
| // the thread in metricsCacheClientExecutor |
| if (metricsCacheClient != null) { |
| metricsCacheClient.stop(); |
| metricsCacheClient.getNIOLooper().exitLoop(); |
| } |
| |
| // Construct the new metricsCacheClient |
| final NIOLooper looper; |
| try { |
| looper = new NIOLooper(); |
| } catch (IOException e) { |
| throw new RuntimeException("Could not create the NIOLooper", e); |
| } |
| |
| SystemConfig systemConfig = |
| (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG); |
| HeronSocketOptions socketOptions = |
| new HeronSocketOptions( |
| TypeUtils.getByteAmount( |
| metricsCacheClientConfig.get(KEY_NETWORK_WRITE_BATCH_SIZE_BYTES)), |
| TypeUtils.getDuration( |
| metricsCacheClientConfig.get(KEY_NETWORK_WRITE_BATCH_TIME_MS), ChronoUnit.MILLIS), |
| TypeUtils.getByteAmount( |
| metricsCacheClientConfig.get(KEY_NETWORK_READ_BATCH_SIZE_BYTES)), |
| TypeUtils.getDuration( |
| metricsCacheClientConfig.get(KEY_NETWORK_READ_BATCH_TIME_MS), ChronoUnit.MILLIS), |
| TypeUtils.getByteAmount( |
| metricsCacheClientConfig.get(KEY_SOCKET_SEND_BUFFER_BYTES)), |
| TypeUtils.getByteAmount( |
| metricsCacheClientConfig.get(KEY_SOCKET_RECEIVED_BUFFER_BYTES)), |
| systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize()); |
| |
| // Reset the Consumer |
| metricsCommunicator.setConsumer(looper); |
| |
| metricsCacheClient = new MetricsCacheClient(looper, |
| currentMetricsCacheLocation.getHost(), |
| currentMetricsCacheLocation.getMasterPort(), |
| socketOptions, metricsCommunicator, |
| TypeUtils.getDuration( |
| metricsCacheClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC), |
| ChronoUnit.SECONDS)); |
| |
| int attempts = startedAttempts.incrementAndGet(); |
| LOG.severe(String.format("Starting metricsCacheClient for the %d time.", attempts)); |
| metricsCacheClientExecutor.execute(metricsCacheClient); |
| } |
| |
| // This method could be invoked by different threads |
| // Make it synchronized to guarantee thread-safe |
| public synchronized void close() { |
| metricsCacheClient.getNIOLooper().exitLoop(); |
| metricsCacheClientExecutor.shutdownNow(); |
| } |
| |
| @VisibleForTesting |
| MetricsCacheClient getMetricsCacheClient() { |
| return metricsCacheClient; |
| } |
| |
| @VisibleForTesting |
| int getMetricsCacheStartedAttempts() { |
| return startedAttempts.get(); |
| } |
| |
| @VisibleForTesting |
| TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocation() { |
| return currentMetricsCacheLocation; |
| } |
| |
| // An UncaughtExceptionHandler, which would restart MetricsCacheLocation with |
| // current MetricsCacheLocation. |
| private class MetricsCacheClientThreadFactory implements ThreadFactory { |
| @Override |
| public Thread newThread(Runnable r) { |
| final Thread thread = new Thread(r); |
| thread.setUncaughtExceptionHandler(new MetricsCacheClientExceptionHandler()); |
| return thread; |
| } |
| |
| private class MetricsCacheClientExceptionHandler implements Thread.UncaughtExceptionHandler { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| LOG.log(Level.SEVERE, "metricsCacheClient dies in thread: " + t, e); |
| |
| Duration reconnectInterval = TypeUtils.getDuration( |
| metricsCacheClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC), ChronoUnit.SECONDS); |
| SysUtils.sleep(reconnectInterval); |
| LOG.info("Restarting metricsCacheClient"); |
| |
| // We would use the MetricsCacheLocation in cache, since |
| // the new metricsCacheClient is started due to exception thrown, |
| // rather than MetricsCacheLocation changes |
| startNewMasterClient(); |
| } |
| } |
| } |
| } |
| } |