| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.diagnostics; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.metrics.reporter.Metrics; |
| import org.apache.samza.metrics.reporter.MetricsHeader; |
| import org.apache.samza.metrics.reporter.MetricsSnapshot; |
| import org.apache.samza.serializers.model.SamzaObjectMapper; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.type.TypeReference; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Defines the contents for any message emitted to the diagnostic stream by the {@link DiagnosticsManager}. |
| * All contents are stored in a {@link MetricsHeader} and a metricsMessage map which combine to get a {@link MetricsSnapshot}, |
| * which can be serialized using serdes ({@link org.apache.samza.serializers.MetricsSnapshotSerdeV2}). |
| * This class serializes {@link ContainerModel} using {@link SamzaObjectMapper} before adding to the metrics message. |
| * |
| */ |
| public class DiagnosticsStreamMessage { |
| private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsStreamMessage.class); |
| |
| public static final String GROUP_NAME_FOR_DIAGNOSTICS_MANAGER = DiagnosticsManager.class.getName(); |
| // Using DiagnosticsManager as the group name for processor-stop-events, job-related params, and container model |
| |
| private static final String SAMZACONTAINER_METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics"; |
| // Using SamzaContainerMetrics as the group name for exceptions to maintain compatibility with existing diagnostics |
| private static final String EXCEPTION_LIST_METRIC_NAME = "exceptions"; |
| |
| private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents"; |
| private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb"; |
| private static final String CONTAINER_NUM_CORES_METRIC_NAME = "containerNumCores"; |
| private static final String CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME = "numPersistentStores"; |
| private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME = "maxHeap"; |
| private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize"; |
| private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels"; |
| private static final String AUTOSIZING_ENABLED_METRIC_NAME = "autosizingEnabled"; |
| private static final String CONFIG_METRIC_NAME = "config"; |
| |
| private final MetricsHeader metricsHeader; |
| private final Map<String, Map<String, Object>> metricsMessage; |
| |
| public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId, |
| String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) { |
| |
| // Create the metricHeader |
| metricsHeader = |
| new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(), |
| taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp); |
| |
| this.metricsMessage = new HashMap<>(); |
| } |
| |
| /** |
| * Add the container memory mb parameter to the message. |
| * @param containerMemoryMb the memory mb parameter value. |
| */ |
| public void addContainerMb(Integer containerMemoryMb) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MB_METRIC_NAME, containerMemoryMb); |
| } |
| |
| /** |
| * Add the container num cores parameter to the message. |
| * @param containerNumCores the num core parameter value. |
| */ |
| public void addContainerNumCores(Integer containerNumCores) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME, containerNumCores); |
| } |
| |
| /** |
| * Add the num stores with changelog parameter to the message. |
| * @param numPersistentStores the parameter value. |
| */ |
| public void addNumPersistentStores(Integer numPersistentStores) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME, |
| numPersistentStores); |
| } |
| |
| /** |
| * Add the configured max heap size in bytes. |
| * @param maxHeapSize the parameter value. |
| */ |
| public void addMaxHeapSize(Long maxHeapSize) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME, maxHeapSize); |
| } |
| |
| /** |
| * Add the configured container thread pool size. |
| * @param threadPoolSize the parameter value. |
| */ |
| public void addContainerThreadPoolSize(Integer threadPoolSize) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_THREAD_POOL_SIZE_METRIC_NAME, threadPoolSize); |
| } |
| |
| /** |
| * Add a map of container models (indexed by containerID) to the message. |
| * @param containerModelMap the container models map |
| */ |
| public void addContainerModels(Map<String, ContainerModel> containerModelMap) { |
| if (containerModelMap != null && !containerModelMap.isEmpty()) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MODELS_METRIC_NAME, |
| serializeContainerModelMap(containerModelMap)); |
| } |
| } |
| |
| /** |
| * Add the current auto-scaling setting. |
| * @param autosizingEnabled the parameter value. |
| */ |
| public void addAutosizingEnabled(Boolean autosizingEnabled) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, AUTOSIZING_ENABLED_METRIC_NAME, autosizingEnabled); |
| } |
| |
| /** |
| * Add a list of {@link DiagnosticsExceptionEvent}s to the message. |
| * @param exceptionList the list to add. |
| */ |
| public void addDiagnosticsExceptionEvents(Collection<DiagnosticsExceptionEvent> exceptionList) { |
| if (exceptionList != null && !exceptionList.isEmpty()) { |
| addToMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, EXCEPTION_LIST_METRIC_NAME, exceptionList); |
| } |
| } |
| |
| /** |
| * Add a list of {@link org.apache.samza.diagnostics.ProcessorStopEvent}s to add to the list. |
| * @param stopEventList the list to add. |
| */ |
| public void addProcessorStopEvents(List<ProcessorStopEvent> stopEventList) { |
| if (stopEventList != null && !stopEventList.isEmpty()) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, STOP_EVENT_LIST_METRIC_NAME, stopEventList); |
| } |
| } |
| |
| /** |
| * Add the job's config to the message. |
| * @param config the config to add. |
| */ |
| public void addConfig(Config config) { |
| addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map<String, String>) config); |
| } |
| |
| /** |
| * Convert this message into a {@link MetricsSnapshot}, useful for serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}. |
| * @return |
| */ |
| public MetricsSnapshot convertToMetricsSnapshot() { |
| MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage)); |
| return metricsSnapshot; |
| } |
| |
| /** |
| * Check if the message has no contents. |
| * @return True if the message is empty, false otherwise. |
| */ |
| public boolean isEmpty() { |
| return metricsMessage.isEmpty(); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| DiagnosticsStreamMessage that = (DiagnosticsStreamMessage) o; |
| return metricsHeader.getAsMap().equals(that.metricsHeader.getAsMap()) && metricsMessage.equals(that.metricsMessage); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(metricsHeader, metricsMessage); |
| } |
| |
| public Collection<ProcessorStopEvent> getProcessorStopEvents() { |
| return (Collection<ProcessorStopEvent>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, |
| STOP_EVENT_LIST_METRIC_NAME); |
| } |
| |
| public Collection<DiagnosticsExceptionEvent> getExceptionEvents() { |
| return (Collection<DiagnosticsExceptionEvent>) getFromMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, |
| EXCEPTION_LIST_METRIC_NAME); |
| } |
| |
| public Integer getContainerMb() { |
| return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MB_METRIC_NAME); |
| } |
| |
| public Integer getContainerNumCores() { |
| return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME); |
| } |
| |
| public Integer getNumPersistentStores() { |
| return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, |
| CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME); |
| } |
| |
| public Long getMaxHeapSize() { |
| return (Long) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME); |
| } |
| |
| public Integer getContainerThreadPoolSize() { |
| return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_THREAD_POOL_SIZE_METRIC_NAME); |
| } |
| |
| public Map<String, ContainerModel> getContainerModels() { |
| return deserializeContainerModelMap((String) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MODELS_METRIC_NAME)); |
| } |
| |
| public Boolean getAutosizingEnabled() { |
| return (Boolean) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, AUTOSIZING_ENABLED_METRIC_NAME); |
| } |
| |
| /** |
| * This method gets the config of the job from the MetricsMessage. |
| * @return the config of the job. |
| */ |
| public Config getConfig() { |
| return new MapConfig((Map<String, String>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME)); |
| } |
| |
| // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link MetricsSnapshot}. |
| // * This is typically used when deserializing messages from a diagnostics-stream. |
| // * @param metricsSnapshot |
| public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(MetricsSnapshot metricsSnapshot) { |
| DiagnosticsStreamMessage diagnosticsStreamMessage = |
| new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), metricsSnapshot.getHeader().getJobId(), |
| metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(), |
| metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(), |
| metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(), |
| metricsSnapshot.getHeader().getResetTime()); |
| |
| Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap(); |
| Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER); |
| Map<String, Object> containerMetricsGroupMap = metricsMap.get(SAMZACONTAINER_METRICS_GROUP_NAME); |
| |
| if (diagnosticsManagerGroupMap != null) { |
| |
| diagnosticsStreamMessage.addContainerNumCores((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME)); |
| diagnosticsStreamMessage.addContainerMb((Integer) diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME)); |
| diagnosticsStreamMessage.addNumPersistentStores((Integer) diagnosticsManagerGroupMap.get( |
| CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME)); |
| diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String) diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME))); |
| diagnosticsStreamMessage.addMaxHeapSize((Long) diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME)); |
| diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME)); |
| diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME)); |
| diagnosticsStreamMessage.addAutosizingEnabled((Boolean) diagnosticsManagerGroupMap.get(AUTOSIZING_ENABLED_METRIC_NAME)); |
| diagnosticsStreamMessage.addConfig(new MapConfig((Map<String, String>) diagnosticsManagerGroupMap.get(CONFIG_METRIC_NAME))); |
| } |
| |
| if (containerMetricsGroupMap != null && containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) { |
| diagnosticsStreamMessage.addDiagnosticsExceptionEvents( |
| (Collection<DiagnosticsExceptionEvent>) containerMetricsGroupMap.get(EXCEPTION_LIST_METRIC_NAME)); |
| } |
| |
| return diagnosticsStreamMessage; |
| } |
| |
| /** |
| * Helper method to use {@link SamzaObjectMapper} to serialize {@link ContainerModel}s. |
| * We use SamzaObjectMapper for ContainerModels, rather than using ObjectMapper (in MetricsSnapshotSerdeV2) |
| * because MetricsSnapshotSerdeV2 enables default typing, which writes type information for all containerModel (and |
| * underlying) classes, deserializing which requires a large number of jackson related changes to those classes |
| * (annotations and/or mixins). We cannot disable default typing to avoid backward incompatibility. This is why |
| * we serde-deserde ContainerModel explicitly using SamzaObjectMapper (which is also used for reads-writes to coordinator |
| * stream). |
| * {@link SamzaObjectMapper} provides several conventions and optimizations for serializing containerModels. |
| * @param containerModelMap map of container models to serialize. |
| * @return |
| */ |
| private static String serializeContainerModelMap(Map<String, ContainerModel> containerModelMap) { |
| ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); |
| try { |
| return samzaObjectMapper.writeValueAsString(containerModelMap); |
| } catch (IOException e) { |
| LOG.error("Exception in serializing container model ", e); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Helper method to use {@link SamzaObjectMapper} to deserialize {@link ContainerModel}s. |
| * {@link SamzaObjectMapper} provides several conventions and optimizations for deserializing containerModels. |
| * @return |
| */ |
| private static Map<String, ContainerModel> deserializeContainerModelMap( |
| String serializedContainerModel) { |
| Map<String, ContainerModel> containerModelMap = null; |
| ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); |
| |
| try { |
| if (serializedContainerModel != null) { |
| containerModelMap = samzaObjectMapper.readValue(serializedContainerModel, new TypeReference<Map<String, ContainerModel>>() { |
| }); |
| } |
| } catch (IOException e) { |
| LOG.error("Exception in deserializing container model ", e); |
| } |
| |
| return containerModelMap; |
| } |
| |
| private void addToMetricsMessage(String groupName, String metricName, Object value) { |
| if (value != null) { |
| metricsMessage.putIfAbsent(groupName, new HashMap<>()); |
| metricsMessage.get(groupName).put(metricName, value); |
| } |
| } |
| |
| private Object getFromMetricsMessage(String groupName, String metricName) { |
| if (metricsMessage.containsKey(groupName) && metricsMessage.get(groupName) != null) { |
| return metricsMessage.get(groupName).get(metricName); |
| } else { |
| return null; |
| } |
| } |
| } |