| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.diagnostics; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.serializers.MetricsSnapshotSerdeV2; |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.apache.samza.system.SystemProducer; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.util.ReflectionUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Responsible for publishing data to the diagnostic stream. |
| * Currently emits exception/error events obtained using a customer-appender that attaches to the root-logger. |
| */ |
| public class DiagnosticsManager { |
| private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsManager.class); |
| private static final Duration DEFAULT_PUBLISH_PERIOD = Duration.ofSeconds(60); |
| // Period size for pushing data to the diagnostic stream |
| |
| private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d"; |
| |
| // Parameters used for populating the MetricHeader when sending diagnostic-stream messages |
| private final String jobName; |
| private final String jobId; |
| private final String containerId; |
| private final String executionEnvContainerId; |
| private final String taskClassVersion; |
| private final String samzaVersion; |
| private final String hostname; |
| private final Instant resetTime; |
| |
| // Job-related params |
| private final int containerMemoryMb; |
| private final int containerNumCores; |
| private final int numPersistentStores; |
| private final long maxHeapSizeBytes; |
| private final int containerThreadPoolSize; |
| private final Map<String, ContainerModel> containerModels; |
| private final boolean autosizingEnabled; |
| private final Config config; |
| private boolean jobParamsEmitted = false; |
| |
| private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data |
| private final BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent |
| private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents; |
| // A BoundedList for storing DiagnosticExceptionEvent |
| private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream |
| private final Duration terminationDuration; // duration to wait when terminating the scheduler |
| private final SystemStream diagnosticSystemStream; |
| |
| public DiagnosticsManager(String jobName, |
| String jobId, |
| Map<String, ContainerModel> containerModels, |
| int containerMemoryMb, |
| int containerNumCores, |
| int numPersistentStores, |
| long maxHeapSizeBytes, |
| int containerThreadPoolSize, |
| String containerId, |
| String executionEnvContainerId, |
| String taskClassVersion, |
| String samzaVersion, |
| String hostname, |
| SystemStream diagnosticSystemStream, |
| SystemProducer systemProducer, |
| Duration terminationDuration, |
| boolean autosizingEnabled, |
| Config config) { |
| |
| this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize, |
| containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer, |
| terminationDuration, Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config); |
| } |
| |
| @VisibleForTesting |
| DiagnosticsManager(String jobName, |
| String jobId, |
| Map<String, ContainerModel> containerModels, |
| int containerMemoryMb, |
| int containerNumCores, |
| int numPersistentStores, |
| long maxHeapSizeBytes, |
| int containerThreadPoolSize, |
| String containerId, |
| String executionEnvContainerId, |
| String taskClassVersion, |
| String samzaVersion, |
| String hostname, |
| SystemStream diagnosticSystemStream, |
| SystemProducer systemProducer, |
| Duration terminationDuration, |
| ScheduledExecutorService executorService, |
| boolean autosizingEnabled, |
| Config config) { |
| this.jobName = jobName; |
| this.jobId = jobId; |
| this.containerModels = containerModels; |
| this.containerMemoryMb = containerMemoryMb; |
| this.containerNumCores = containerNumCores; |
| this.numPersistentStores = numPersistentStores; |
| this.maxHeapSizeBytes = maxHeapSizeBytes; |
| this.containerThreadPoolSize = containerThreadPoolSize; |
| this.containerId = containerId; |
| this.executionEnvContainerId = executionEnvContainerId; |
| this.taskClassVersion = taskClassVersion; |
| this.samzaVersion = samzaVersion; |
| this.hostname = hostname; |
| this.diagnosticSystemStream = diagnosticSystemStream; |
| this.systemProducer = systemProducer; |
| this.terminationDuration = terminationDuration; |
| |
| this.processorStopEvents = new ConcurrentLinkedQueue<>(); |
| this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters |
| this.scheduler = executorService; |
| this.autosizingEnabled = autosizingEnabled; |
| this.config = config; |
| |
| resetTime = Instant.now(); |
| this.systemProducer.register(getClass().getSimpleName()); |
| |
| try { |
| ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", |
| Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); |
| LOG.info("Attached log4j diagnostics appender."); |
| } catch (Exception e) { |
| try { |
| ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", |
| Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); |
| LOG.info("Attached log4j2 diagnostics appender."); |
| } catch (Exception ex) { |
| LOG.warn( |
| "Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.", |
| ex); |
| } |
| } |
| } |
| |
| public void start() { |
| this.systemProducer.start(); |
| this.scheduler.scheduleWithFixedDelay(new DiagnosticsStreamPublisher(), 0, DEFAULT_PUBLISH_PERIOD.getSeconds(), |
| TimeUnit.SECONDS); |
| } |
| |
| public void stop() throws InterruptedException { |
| scheduler.shutdown(); |
| |
| // Allow any scheduled publishes to finish, and block for termination |
| scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS); |
| |
| if (!scheduler.isTerminated()) { |
| LOG.warn("Unable to terminate scheduler"); |
| scheduler.shutdownNow(); |
| } |
| this.systemProducer.stop(); |
| } |
| |
| public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) { |
| this.exceptions.add(diagnosticsExceptionEvent); |
| } |
| |
| public void addProcessorStopEvent(String processorId, String resourceId, String host, int exitStatus) { |
| this.processorStopEvents.add(new ProcessorStopEvent(processorId, resourceId, host, exitStatus)); |
| LOG.info("Added stop event for Container Id: {}, resource Id: {}, host: {}, exitStatus: {}", processorId, |
| resourceId, host, exitStatus); |
| } |
| |
| private class DiagnosticsStreamPublisher implements Runnable { |
| |
| @Override |
| public void run() { |
| try { |
| DiagnosticsStreamMessage diagnosticsStreamMessage = |
| new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId, |
| taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli()); |
| |
| // Add job-related params to the message (if not already published) |
| if (!jobParamsEmitted) { |
| diagnosticsStreamMessage.addContainerMb(containerMemoryMb); |
| diagnosticsStreamMessage.addContainerNumCores(containerNumCores); |
| diagnosticsStreamMessage.addNumPersistentStores(numPersistentStores); |
| diagnosticsStreamMessage.addContainerModels(containerModels); |
| diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes); |
| diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize); |
| diagnosticsStreamMessage.addAutosizingEnabled(autosizingEnabled); |
| diagnosticsStreamMessage.addConfig(config); |
| } |
| |
| // Add stop event list to the message |
| diagnosticsStreamMessage.addProcessorStopEvents(new ArrayList(processorStopEvents)); |
| |
| // Add exception events to the message |
| diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptions.getValues()); |
| |
| if (!diagnosticsStreamMessage.isEmpty()) { |
| |
| systemProducer.send(DiagnosticsManager.class.getName(), |
| new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, null, |
| new MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot()))); |
| systemProducer.flush(DiagnosticsManager.class.getName()); |
| |
| // Remove stop events from list after successful publish |
| if (diagnosticsStreamMessage.getProcessorStopEvents() != null) { |
| processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents()); |
| } |
| |
| // Remove exceptions from list after successful publish to diagnostics stream |
| if (diagnosticsStreamMessage.getExceptionEvents() != null) { |
| exceptions.remove(diagnosticsStreamMessage.getExceptionEvents()); |
| } |
| |
| // Emit jobParams once |
| jobParamsEmitted = true; |
| } |
| } catch (Exception e) { |
| LOG.error("Exception when flushing diagnosticsStreamMessage", e); |
| } |
| } |
| } |
| |
| } |