blob: f044292bce584f5daaff1b28110d519346128da6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.metricsmgr.sink.tmaster;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.metricsmgr.MetricsUtil;
import org.apache.heron.proto.tmaster.TopologyMaster;
import org.apache.heron.spi.metricsmgr.metrics.ExceptionInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsFilter;
import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.IMetricsSink;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;
/**
* An IMetricsSink sends Metrics to TMaster.
* 1. It gets the TMasterLocation
* <p>
* 2. Then it would construct a long-live Service running TMasterClient, which could automatically
* recover from uncaught exceptions, i.e. close the old one and start a new one.
* Also, it provides API to update the TMasterLocation that TMasterClient need to connect and
* restart the TMasterClient.
* There are two scenarios we need to restart a TMasterClient in our case:
* <p>
* -- Uncaught exceptions happen within TMasterClient; then we would restart TMasterClient inside
* the same ExecutorService inside the UncaughtExceptionHandlers.
* Notice that, in java, exceptions occur inside UncaughtExceptionHandlers would not invoke
* UncaughtExceptionHandlers; instead, it would kill the thread with that exception.
* So if exceptions thrown during restart a new TMasterClient, this TMasterSink would die, and
* external logic would take care of it.
* <p>
* -- TMasterLocation changes (though in fact, TMasterClient might also throw exceptions in this case),
* in this case, we would invoke TMasterService to start from tMasterLocationStarter's thread.
* But the TMasterService and TMasterClient still start wihtin the thread they run.
* <p>
* 3. When a new MetricsRecord comes by invoking processRecord, it would push the MetricsRecord
* to the Communicator Queue to TMasterClient
* <p>
* Notice that we would not send all metrics to TMaster; we would use MetricsFilter to figure out
* needed metrics.
*/
public class TMasterSink implements IMetricsSink {
private static final Logger LOG = Logger.getLogger(TMasterSink.class.getName());
// These configs would be read from metrics-sink-configs.yaml
private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
"tmaster-location-check-interval-sec";
private static final String KEY_TMASTER = "tmaster-client";
private static final String KEY_TMASTER_RECONNECT_INTERVAL_SEC = "reconnect-interval-second";
private static final String KEY_NETWORK_WRITE_BATCH_SIZE_BYTES = "network-write-batch-size-bytes";
private static final String KEY_NETWORK_WRITE_BATCH_TIME_MS = "network-write-batch-time-ms";
private static final String KEY_NETWORK_READ_BATCH_SIZE_BYTES = "network-read-batch-size-bytes";
private static final String KEY_NETWORK_READ_BATCH_TIME_MS = "network-read-batch-time-ms";
private static final String KEY_SOCKET_SEND_BUFFER_BYTES = "socket-send-buffer-size-bytes";
private static final String KEY_SOCKET_RECEIVED_BUFFER_BYTES =
"socket-received-buffer-size-bytes";
private static final String KEY_TMASTER_METRICS_TYPE = "tmaster-metrics-type";
// Bean name to fetch the TMasterLocation object from SingletonRegistry
private static final String TMASTER_LOCATION_BEAN_NAME =
TopologyMaster.TMasterLocation.newBuilder().getDescriptorForType().getFullName();
// Metrics Counter Name
private static final String METRICS_COUNT = "metrics-count";
private static final String EXCEPTIONS_COUNT = "exceptions-count";
private static final String RECORD_PROCESS_COUNT = "record-process-count";
private static final String TMASTER_RESTART_COUNT = "tmaster-restart-count";
private static final String TMASTER_LOCATION_UPDATE_COUNT = "tmaster-location-update-count";
private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator =
new Communicator<>();
private final MetricsFilter tMasterMetricsFilter = new MetricsFilter();
private final Map<String, Object> sinkConfig = new HashMap<>();
// A scheduled executor service to check whether the TMasterLocation has changed
// If so, restart the TMasterClientService with the new TMasterLocation
// Start of TMasterClientService will also be in this thread
private final ScheduledExecutorService tMasterLocationStarter =
Executors.newSingleThreadScheduledExecutor();
private TMasterClientService tMasterClientService;
// We need to cache it locally to check whether the TMasterLocation is changed
// This field is changed only in ScheduledExecutorService's thread,
// so no need to make it volatile
private TopologyMaster.TMasterLocation currentTMasterLocation = null;
private SinkContext sinkContext;
@Override
@SuppressWarnings("unchecked")
public void init(Map<String, Object> conf, SinkContext context) {
sinkConfig.putAll(conf);
sinkContext = context;
// Fill the tMasterMetricsFilter according to metrics-sink-configs.yaml
Map<String, String> tmasterMetricsType =
(Map<String, String>) sinkConfig.get(KEY_TMASTER_METRICS_TYPE);
if (tmasterMetricsType != null) {
for (Map.Entry<String, String> metricToType : tmasterMetricsType.entrySet()) {
String value = metricToType.getValue();
MetricsFilter.MetricAggregationType type;
if ("SUM".equals(value)) {
type = MetricsFilter.MetricAggregationType.SUM;
} else if ("AVG".equals(value)) {
type = MetricsFilter.MetricAggregationType.AVG;
} else if ("LAST".equals(value)) {
type = MetricsFilter.MetricAggregationType.LAST;
} else {
type = MetricsFilter.MetricAggregationType.UNKNOWN;
}
tMasterMetricsFilter.setPrefixToType(metricToType.getKey(), type);
}
}
// Construct the long-live TMasterClientService
tMasterClientService =
new TMasterClientService((Map<String, Object>)
sinkConfig.get(KEY_TMASTER), metricsCommunicator);
// Start the tMasterLocationStarter
startTMasterChecker();
}
// Start the TMasterCheck, which would check whether the TMasterLocation is changed
// at an interval.
// If so, restart the TMasterClientService with the new TMasterLocation
private void startTMasterChecker() {
final int checkIntervalSec =
TypeUtils.getInteger(sinkConfig.get(KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC));
Runnable runnable = new Runnable() {
@Override
public void run() {
TopologyMaster.TMasterLocation location =
(TopologyMaster.TMasterLocation) SingletonRegistry.INSTANCE.getSingleton(
TMASTER_LOCATION_BEAN_NAME);
if (location != null) {
if (currentTMasterLocation == null || !location.equals(currentTMasterLocation)) {
LOG.info("Update current TMasterLocation to: " + location);
currentTMasterLocation = location;
tMasterClientService.updateTMasterLocation(currentTMasterLocation);
tMasterClientService.startNewMasterClient();
// Update Metrics
sinkContext.exportCountMetric(TMASTER_LOCATION_UPDATE_COUNT, 1);
}
}
// Schedule itself in future
tMasterLocationStarter.schedule(this, checkIntervalSec, TimeUnit.SECONDS);
}
};
// First Entry
tMasterLocationStarter.schedule(runnable, checkIntervalSec, TimeUnit.SECONDS);
LOG.info("TMasterChecker started with interval: " + checkIntervalSec);
}
@Override
public void processRecord(MetricsRecord record) {
// Format it into TopologyMaster.PublishMetrics
// The format of record is "host:port/componentName/instanceId"
// So MetricsRecord.getSource().split("/") would be an array with 3 elements:
// ["host:port", componentName, instanceId]
String[] sources = MetricsUtil.splitRecordSource(record);
String hostPort = sources[0];
String componentName = sources[1];
String instanceId = sources[2];
TopologyMaster.PublishMetrics.Builder publishMetrics =
TopologyMaster.PublishMetrics.newBuilder();
for (MetricsInfo metricsInfo : tMasterMetricsFilter.filter(record.getMetrics())) {
// We would filter out unneeded metrics
TopologyMaster.MetricDatum metricDatum = TopologyMaster.MetricDatum.newBuilder().
setComponentName(componentName).setInstanceId(instanceId).setName(metricsInfo.getName()).
setValue(metricsInfo.getValue()).setTimestamp(record.getTimestamp()).build();
publishMetrics.addMetrics(metricDatum);
}
for (ExceptionInfo exceptionInfo : record.getExceptions()) {
TopologyMaster.TmasterExceptionLog exceptionLog =
TopologyMaster.TmasterExceptionLog.newBuilder()
.setComponentName(componentName)
.setHostname(hostPort)
.setInstanceId(instanceId)
.setStacktrace(exceptionInfo.getStackTrace())
.setLasttime(exceptionInfo.getLastTime())
.setFirsttime(exceptionInfo.getFirstTime())
.setCount(exceptionInfo.getCount())
.setLogging(exceptionInfo.getLogging()).build();
publishMetrics.addExceptions(exceptionLog);
}
metricsCommunicator.offer(publishMetrics.build());
// Update metrics
sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
sinkContext.exportCountMetric(METRICS_COUNT, publishMetrics.getMetricsCount());
sinkContext.exportCountMetric(EXCEPTIONS_COUNT, publishMetrics.getExceptionsCount());
}
@Override
public void flush() {
// We do nothing here but update metrics
sinkContext.exportCountMetric(TMASTER_RESTART_COUNT,
tMasterClientService.startedAttempts.longValue());
}
@Override
public void close() {
tMasterClientService.close();
metricsCommunicator.clear();
}
/////////////////////////////////////////////////////////
// Following protected methods should be used only for unit testing
/////////////////////////////////////////////////////////
protected TMasterClientService getTMasterClientService() {
return tMasterClientService;
}
protected void createSimpleTMasterClientService(Map<String, Object> serviceConfig) {
tMasterClientService =
new TMasterClientService(serviceConfig, metricsCommunicator);
}
protected TMasterClient getTMasterClient() {
return tMasterClientService.getTMasterClient();
}
protected void startNewTMasterClient(TopologyMaster.TMasterLocation location) {
tMasterClientService.updateTMasterLocation(location);
tMasterClientService.startNewMasterClient();
}
protected int getTMasterStartedAttempts() {
return tMasterClientService.startedAttempts.get();
}
protected TopologyMaster.TMasterLocation getCurrentTMasterLocation() {
return currentTMasterLocation;
}
protected TopologyMaster.TMasterLocation getCurrentTMasterLocationInService() {
return tMasterClientService.getCurrentTMasterLocation();
}
/**
* A long-live Service running TMasterClient
* It would automatically restart the TMasterClient connecting and communicating to the latest
* TMasterLocation if any uncaught exceptions throw.
* <p>
* It provides startNewMasterClient(TopologyMaster.TMasterLocation location), which would also
* update the currentTMasterLocation to the lastest location.
* <p>
* So a new TMasterClient would start in two cases:
* 1. The old one threw exceptions and died.
* 2. startNewMasterClient() is invoked externally with TMasterLocation.
*/
private static final class TMasterClientService {
private final AtomicInteger startedAttempts = new AtomicInteger(0);
private final Map<String, Object> tmasterClientConfig;
private final Communicator<TopologyMaster.PublishMetrics> metricsCommunicator;
private final ExecutorService tmasterClientExecutor =
Executors.newSingleThreadExecutor(new TMasterClientThreadFactory());
private volatile TMasterClient tMasterClient;
// We need to cache TMasterLocation for failover case
// This value is set in ScheduledExecutorService' thread while
// it is used in TMasterClientService thread,
// so we need to make it volatile to guarantee the visiability.
private volatile TopologyMaster.TMasterLocation currentTMasterLocation;
private TMasterClientService(Map<String, Object> tmasterClientConfig,
Communicator<TopologyMaster.PublishMetrics> metricsCommunicator) {
this.tmasterClientConfig = tmasterClientConfig;
this.metricsCommunicator = metricsCommunicator;
}
// Update the TMasterLocation to connect within the TMasterClient
// This method is thread-safe, since
// currentTMasterLocation is volatile and we just replace it.
// In our scenario, it is only invoked when TMasterLocation is changed,
// i.e. this method is only invoked in scheduled executor thread.
public void updateTMasterLocation(TopologyMaster.TMasterLocation location) {
currentTMasterLocation = location;
}
// This method could be invoked by different threads
// Make it synchronized to guarantee thread-safe
public synchronized void startNewMasterClient() {
// Exit any running tMasterClient if there is any to release
// the thread in tmasterClientExecutor
if (tMasterClient != null) {
tMasterClient.stop();
tMasterClient.getNIOLooper().exitLoop();
}
// Construct the new TMasterClient
final NIOLooper looper;
try {
looper = new NIOLooper();
} catch (IOException e) {
throw new RuntimeException("Could not create the NIOLooper", e);
}
SystemConfig systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
HeronSocketOptions socketOptions =
new HeronSocketOptions(
TypeUtils.getByteAmount(tmasterClientConfig.get(KEY_NETWORK_WRITE_BATCH_SIZE_BYTES)),
TypeUtils.getDuration(
tmasterClientConfig.get(KEY_NETWORK_WRITE_BATCH_TIME_MS), ChronoUnit.MILLIS),
TypeUtils.getByteAmount(tmasterClientConfig.get(KEY_NETWORK_READ_BATCH_SIZE_BYTES)),
TypeUtils.getDuration(
tmasterClientConfig.get(KEY_NETWORK_READ_BATCH_TIME_MS), ChronoUnit.MILLIS),
TypeUtils.getByteAmount(tmasterClientConfig.get(KEY_SOCKET_SEND_BUFFER_BYTES)),
TypeUtils.getByteAmount(tmasterClientConfig.get(KEY_SOCKET_RECEIVED_BUFFER_BYTES)),
systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize());
// Reset the Consumer
metricsCommunicator.setConsumer(looper);
tMasterClient =
new TMasterClient(looper,
currentTMasterLocation.getHost(),
currentTMasterLocation.getMasterPort(),
socketOptions, metricsCommunicator,
TypeUtils.getDuration(
tmasterClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC), ChronoUnit.SECONDS));
int attempts = startedAttempts.incrementAndGet();
LOG.severe(String.format("Starting TMasterClient for the %d time.", attempts));
tmasterClientExecutor.execute(tMasterClient);
}
// This method could be invoked by different threads
// Make it synchronized to guarantee thread-safe
public synchronized void close() {
tMasterClient.getNIOLooper().exitLoop();
tmasterClientExecutor.shutdownNow();
}
/////////////////////////////////////////////////////////
// Following protected methods should be used only for unit testing
/////////////////////////////////////////////////////////
protected TMasterClient getTMasterClient() {
return tMasterClient;
}
protected int getTMasterStartedAttempts() {
return startedAttempts.get();
}
protected TopologyMaster.TMasterLocation getCurrentTMasterLocation() {
return currentTMasterLocation;
}
// An UncaughtExceptionHandler, which would restart TMasterLocation with
// current TMasterLocation.
private class TMasterClientThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new TMasterClientExceptionHandler());
return thread;
}
private class TMasterClientExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.log(Level.SEVERE, "TMasterClient dies in thread: " + t, e);
Duration reconnectInterval = TypeUtils.getDuration(
tmasterClientConfig.get(KEY_TMASTER_RECONNECT_INTERVAL_SEC), ChronoUnit.SECONDS);
SysUtils.sleep(reconnectInterval);
LOG.info("Restarting TMasterClient");
// We would use the TMasterLocation in cache, since
// the new TMasterClient is started due to exception thrown,
// rather than TMasterLocation changes
startNewMasterClient();
}
}
}
}
}