blob: 4d2357fb47c98dc3689ae221150bd72fd0b248d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.metricsmgr.sink.metricscache;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.metricsmgr.MetricsManagerServer;
import org.apache.heron.metricsmgr.MetricsUtil;
import org.apache.heron.proto.tmaster.TopologyMaster;
import org.apache.heron.spi.metricsmgr.metrics.ExceptionInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsFilter;
import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.IMetricsSink;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;
/**
* An IMetricsSink sends Metrics to MetricsCache.
* 1. It gets the MetricsCacheLocation
* <p>
* 2. Then it would construct a long-live Service running metricsCacheClient, which could automatically
* recover from uncaught exceptions, i.e. close the old one and start a new one.
* Also, it provides API to update the MetricsCacheLocation that metricsCacheClient need to connect and
* restart the metricsCacheClient.
* There are two scenarios we need to restart a metricsCacheClient in our case:
* <p>
* -- Uncaught exceptions happen within metricsCacheClient; then we would restart metricsCacheClient inside
* the same ExecutorService inside the UncaughtExceptionHandlers.
* Notice that, in java, exceptions occur inside UncaughtExceptionHandlers would not invoke
* UncaughtExceptionHandlers; instead, it would kill the thread with that exception.
* So if exceptions thrown during restart a new metricsCacheClient, this MetricsCacheSink would die, and
* external logic would take care of it.
* <p>
* -- MetricsCacheLocation changes (though in fact, metricsCacheClient might also throw exceptions in this case),
* in this case, we would invoke MetricsCacheService to start from tMasterLocationStarter's thread.
* But the MetricsCacheService and metricsCacheClient still start wihtin the thread they run.
* <p>
* 3. When a new MetricsRecord comes by invoking processRecord, it would push the MetricsRecord
* to the Communicator Queue to metricsCacheClient
* <p>
* Notice that we would not send all metrics to MetricsCache; we would use MetricsFilter to figure out
* needed metrics.
*/
public class MetricsCacheSink implements IMetricsSink {
private static final Logger LOG = Logger.getLogger(MetricsCacheSink.class.getName());
// These configs would be read from metrics-sink-configs.yaml
private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
"metricscache-location-check-interval-sec";
private static final String KEY_TMASTER = "metricscache-client";
private static final String KEY_TMASTER_RECONNECT_INTERVAL_SEC = "reconnect-interval-second";
private static final String KEY_NETWORK_WRITE_BATCH_SIZE_BYTES = "network-write-batch-size-bytes";
private static final String KEY_NETWORK_WRITE_BATCH_TIME_MS = "network-write-batch-time-ms";
private static final String KEY_NETWORK_READ_BATCH_SIZE_BYTES = "network-read-batch-size-bytes";
private static final String KEY_NETWORK_READ_BATCH_TIME_MS = "network-read-batch-time-ms";
private static final String KEY_SOCKET_SEND_BUFFER_BYTES = "socket-send-buffer-size-bytes";
private static final String KEY_SOCKET_RECEIVED_BUFFER_BYTES =
"socket-received-buffer-size-bytes";
private static final String KEY_TMASTER_METRICS_TYPE = "metricscache-metrics-type";
// Bean name to fetch the MetricsCacheLocation object from SingletonRegistry
// private static final String TMASTER_LOCATION_BEAN_NAME =
// TopologyMaster.MetricsCacheLocation.newBuilder().getDescriptorForType().getFullName();
// Metrics Counter Name
private static final String METRICS_COUNT = "metrics-count";
private static final String EXCEPTIONS_COUNT = "exceptions-count";
private static final String RECORD_PROCESS_COUNT = "record-process-count";
private static final String METRICSMGR_RESTART_COUNT = "metricsmgr-restart-count";
private static final String METRICSMGR_LOCATION_UPDATE_COUNT = "metricsmgr-location-update-count";
private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator =
new Communicator<>();
private final MetricsFilter tMasterMetricsFilter = new MetricsFilter();
private final Map<String, Object> sinkConfig = new HashMap<>();
// A scheduled executor service to check whether the MetricsCacheLocation has changed
// If so, restart the metricsCacheClientService with the new MetricsCacheLocation
// Start of metricsCacheClientService will also be in this thread
private final ScheduledExecutorService tMasterLocationStarter =
Executors.newSingleThreadScheduledExecutor();
private MetricsCacheClientService metricsCacheClientService;
// We need to cache it locally to check whether the MetricsCacheLocation is changed
// This field is changed only in ScheduledExecutorService's thread,
// so no need to make it volatile
private TopologyMaster.MetricsCacheLocation currentMetricsCacheLocation = null;
private SinkContext sinkContext;
@Override
@SuppressWarnings("unchecked")
public void init(Map<String, Object> conf, SinkContext context) {
LOG.info("metricscache sink init");
sinkConfig.putAll(conf);
sinkContext = context;
// Fill the tMasterMetricsFilter according to metrics-sink-configs.yaml
Map<String, String> tmasterMetricsType =
(Map<String, String>) sinkConfig.get(KEY_TMASTER_METRICS_TYPE);
if (tmasterMetricsType != null) {
for (Map.Entry<String, String> metricToType : tmasterMetricsType.entrySet()) {
String value = metricToType.getValue();
MetricsFilter.MetricAggregationType type =
MetricsFilter.MetricAggregationType.valueOf(value);
tMasterMetricsFilter.setPrefixToType(metricToType.getKey(), type);
}
}
// Construct the long-live metricsCacheClientService
metricsCacheClientService =
new MetricsCacheClientService((Map<String, Object>)
sinkConfig.get(KEY_TMASTER), metricsCommunicator);
// Start the tMasterLocationStarter
startMetricsCacheChecker();
}
// Start the MetricsCacheCheck, which would check whether the MetricsCacheLocation is changed
// at an interval.
// If so, restart the metricsCacheClientService with the new MetricsCacheLocation
private void startMetricsCacheChecker() {
final int checkIntervalSec =
TypeUtils.getInteger(sinkConfig.get(KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC));
Runnable runnable = new Runnable() {
@Override
public void run() {
TopologyMaster.MetricsCacheLocation location =
(TopologyMaster.MetricsCacheLocation) SingletonRegistry.INSTANCE.getSingleton(
MetricsManagerServer.METRICSCACHE_LOCATION_BEAN_NAME);
if (location != null) {
if (currentMetricsCacheLocation == null
|| !location.equals(currentMetricsCacheLocation)) {
LOG.info("Update current MetricsCacheLocation to: " + location);
currentMetricsCacheLocation = location;
metricsCacheClientService.updateMetricsCacheLocation(currentMetricsCacheLocation);
metricsCacheClientService.startNewMasterClient();
// Update Metrics
sinkContext.exportCountMetric(METRICSMGR_LOCATION_UPDATE_COUNT, 1);
}
}
// Schedule itself in future
tMasterLocationStarter.schedule(this, checkIntervalSec, TimeUnit.SECONDS);
}
};
// First Entry
tMasterLocationStarter.schedule(runnable, checkIntervalSec, TimeUnit.SECONDS);
LOG.info("MetricsCacheChecker started with interval: " + checkIntervalSec);
}
@Override
public void processRecord(MetricsRecord record) {
LOG.info("metricscache sink processRecord");
// Format it into TopologyMaster.PublishMetrics
// The format of record is "host:port/componentName/instanceId"
// So MetricsRecord.getSource().split("/") would be an array with 3 elements:
// ["host:port", componentName, instanceId]
String[] sources = MetricsUtil.splitRecordSource(record);
String hostPort = sources[0];
String componentName = sources[1];
String instanceId = sources[2];
TopologyMaster.PublishMetrics.Builder publishMetrics =
TopologyMaster.PublishMetrics.newBuilder();
for (MetricsInfo metricsInfo : tMasterMetricsFilter.filter(record.getMetrics())) {
// We would filter out unneeded metrics
TopologyMaster.MetricDatum metricDatum = TopologyMaster.MetricDatum.newBuilder().
setComponentName(componentName).setInstanceId(instanceId).setName(metricsInfo.getName()).
setValue(metricsInfo.getValue()).setTimestamp(record.getTimestamp()).build();
publishMetrics.addMetrics(metricDatum);
}
for (ExceptionInfo exceptionInfo : record.getExceptions()) {
String exceptionStackTrace = exceptionInfo.getStackTrace();
String[] exceptionStackTraceLines = exceptionStackTrace.split("\r\n|[\r\n]", 3);
String exceptionStackTraceFirstTwoLines = String.join(System.lineSeparator(),
exceptionStackTraceLines[0], exceptionStackTraceLines[1]);
TopologyMaster.TmasterExceptionLog exceptionLog =
TopologyMaster.TmasterExceptionLog.newBuilder()
.setComponentName(componentName)
.setHostname(hostPort)
.setInstanceId(instanceId)
.setStacktrace(exceptionStackTraceFirstTwoLines)
.setLasttime(exceptionInfo.getLastTime())
.setFirsttime(exceptionInfo.getFirstTime())
.setCount(exceptionInfo.getCount())
.setLogging(exceptionInfo.getLogging()).build();
publishMetrics.addExceptions(exceptionLog);
}
metricsCommunicator.offer(publishMetrics.build());
// Update metrics
sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
sinkContext.exportCountMetric(METRICS_COUNT, publishMetrics.getMetricsCount());
sinkContext.exportCountMetric(EXCEPTIONS_COUNT, publishMetrics.getExceptionsCount());
}
@Override
public void flush() {
// We do nothing here but update metrics
sinkContext.exportCountMetric(METRICSMGR_RESTART_COUNT,
metricsCacheClientService.startedAttempts.longValue());
}
@Override
public void close() {
metricsCacheClientService.close();
metricsCommunicator.clear();
}
@VisibleForTesting
MetricsCacheClientService getMetricsCacheClientService() {
return metricsCacheClientService;
}
@VisibleForTesting
void createSimpleMetricsCacheClientService(Map<String, Object> serviceConfig) {
metricsCacheClientService =
new MetricsCacheClientService(serviceConfig, metricsCommunicator);
}
@VisibleForTesting
MetricsCacheClient getMetricsCacheClient() {
return metricsCacheClientService.getMetricsCacheClient();
}
@VisibleForTesting
void startNewMetricsCacheClient(TopologyMaster.MetricsCacheLocation location) {
metricsCacheClientService.updateMetricsCacheLocation(location);
metricsCacheClientService.startNewMasterClient();
}
@VisibleForTesting
int getMetricsCacheStartedAttempts() {
return metricsCacheClientService.startedAttempts.get();
}
@VisibleForTesting
TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocation() {
return currentMetricsCacheLocation;
}
@VisibleForTesting
TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocationInService() {
return metricsCacheClientService.getCurrentMetricsCacheLocation();
}
/**
* A long-live Service running metricsCacheClient
* It would automatically restart the metricsCacheClient connecting and communicating to the latest
* MetricsCacheLocation if any uncaught exceptions throw.
* <p>
* It provides startNewMasterClient(TopologyMaster.MetricsCacheLocation location), which would also
* update the currentMetricsCacheLocation to the lastest location.
* <p>
* So a new metricsCacheClient would start in two cases:
* 1. The old one threw exceptions and died.
* 2. startNewMasterClient() is invoked externally with MetricsCacheLocation.
*/
private static final class MetricsCacheClientService {
private final AtomicInteger startedAttempts = new AtomicInteger(0);
private final Map<String, Object> metricsCacheClientConfig;
private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator;
private final ExecutorService metricsCacheClientExecutor =
Executors.newSingleThreadExecutor(new MetricsCacheClientThreadFactory());
private volatile MetricsCacheClient metricsCacheClient;
// We need to cache MetricsCacheLocation for failover case
// This value is set in ScheduledExecutorService' thread while
// it is used in metricsCacheClientService thread,
// so we need to make it volatile to guarantee the visiability.
private volatile TopologyMaster.MetricsCacheLocation currentMetricsCacheLocation;
private MetricsCacheClientService(
Map<String, Object> metricsCacheClientConfig,
Communicator<TopologyMaster.PublishMetrics> metricsCommunicator) {
this.metricsCacheClientConfig = metricsCacheClientConfig;
this.metricsCommunicator = metricsCommunicator;
}
// Update the MetricsCacheLocation to connect within the metricsCacheClient
// This method is thread-safe, since
// currentMetricsCacheLocation is volatile and we just replace it.
// In our scenario, it is only invoked when MetricsCacheLocation is changed,
// i.e. this method is only invoked in scheduled executor thread.
public void updateMetricsCacheLocation(TopologyMaster.MetricsCacheLocation location) {
currentMetricsCacheLocation = location;
}
// This method could be invoked by different threads
// Make it synchronized to guarantee thread-safe
public synchronized void startNewMasterClient() {
// Exit any running metricsCacheClient if there is any to release
// the thread in metricsCacheClientExecutor
if (metricsCacheClient != null) {
metricsCacheClient.stop();
metricsCacheClient.getNIOLooper().exitLoop();
}
// Construct the new metricsCacheClient
final NIOLooper looper;
try {
looper = new NIOLooper();
} catch (IOException e) {
throw new RuntimeException("Could not create the NIOLooper", e);
}
SystemConfig systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
HeronSocketOptions socketOptions =
new HeronSocketOptions(
TypeUtils.getByteAmount(
metricsCacheClientConfig.get(KEY_NETWORK_WRITE_BATCH_SIZE_BYTES)),
TypeUtils.getDuration(
metricsCacheClientConfig.get(KEY_NETWORK_WRITE_BATCH_TIME_MS), ChronoUnit.MILLIS),
TypeUtils.getByteAmount(
metricsCacheClientConfig.get(KEY_NETWORK_READ_BATCH_SIZE_BYTES)),
TypeUtils.getDuration(
metricsCacheClientConfig.get(KEY_NETWORK_READ_BATCH_TIME_MS), ChronoUnit.MILLIS),
TypeUtils.getByteAmount(
metricsCacheClientConfig.get(KEY_SOCKET_SEND_BUFFER_BYTES)),
TypeUtils.getByteAmount(
metricsCacheClientConfig.get(KEY_SOCKET_RECEIVED_BUFFER_BYTES)),
systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize());
// Reset the Consumer
metricsCommunicator.setConsumer(looper);
metricsCacheClient = new MetricsCacheClient(looper,
currentMetricsCacheLocation.getHost(),
currentMetricsCacheLocation.getMasterPort(),
socketOptions, metricsCommunicator,
TypeUtils.getDuration(
metricsCacheClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC),
ChronoUnit.SECONDS));
int attempts = startedAttempts.incrementAndGet();
LOG.severe(String.format("Starting metricsCacheClient for the %d time.", attempts));
metricsCacheClientExecutor.execute(metricsCacheClient);
}
// This method could be invoked by different threads
// Make it synchronized to guarantee thread-safe
public synchronized void close() {
metricsCacheClient.getNIOLooper().exitLoop();
metricsCacheClientExecutor.shutdownNow();
}
@VisibleForTesting
MetricsCacheClient getMetricsCacheClient() {
return metricsCacheClient;
}
@VisibleForTesting
int getMetricsCacheStartedAttempts() {
return startedAttempts.get();
}
@VisibleForTesting
TopologyMaster.MetricsCacheLocation getCurrentMetricsCacheLocation() {
return currentMetricsCacheLocation;
}
// An UncaughtExceptionHandler, which would restart MetricsCacheLocation with
// current MetricsCacheLocation.
private class MetricsCacheClientThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new MetricsCacheClientExceptionHandler());
return thread;
}
private class MetricsCacheClientExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.log(Level.SEVERE, "metricsCacheClient dies in thread: " + t, e);
Duration reconnectInterval = TypeUtils.getDuration(
metricsCacheClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC), ChronoUnit.SECONDS);
SysUtils.sleep(reconnectInterval);
LOG.info("Restarting metricsCacheClient");
// We would use the MetricsCacheLocation in cache, since
// the new metricsCacheClient is started due to exception thrown,
// rather than MetricsCacheLocation changes
startNewMasterClient();
}
}
}
}
}