blob: 72b1b5fa109db941f8307f71eea686009a892fc7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.diagnostics;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;
public class TestDiagnosticsStreamMessage {
private final String jobName = "Testjob";
private final String jobId = "test job id";
private final String containerName = "sample container name";
private final String executionEnvContainerId = "exec container id";
private final String taskClassVersion = "0.0.1";
private final String samzaVersion = "1.3.0";
private final String hostname = "sample host name";
private final long timestamp = System.currentTimeMillis();
private final long resetTimestamp = System.currentTimeMillis();
private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
DiagnosticsStreamMessage diagnosticsStreamMessage =
new DiagnosticsStreamMessage(jobName, jobId, containerName, executionEnvContainerId, taskClassVersion,
samzaVersion, hostname, timestamp, resetTimestamp);
diagnosticsStreamMessage.addContainerMb(1024);
diagnosticsStreamMessage.addContainerNumCores(2);
diagnosticsStreamMessage.addNumPersistentStores(3);
diagnosticsStreamMessage.addConfig(config);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
return diagnosticsStreamMessage;
}
public static Collection<DiagnosticsExceptionEvent> getExceptionList() {
BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions");
DiagnosticsExceptionEvent diagnosticsExceptionEvent =
new DiagnosticsExceptionEvent(1, new Exception("this is a samza exception", new Exception("cause")),
new HashMap());
boundedList.add(diagnosticsExceptionEvent);
return boundedList.getValues();
}
public List<ProcessorStopEvent> getProcessorStopEventList() {
List<ProcessorStopEvent> stopEventList = new ArrayList<>();
stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101));
return stopEventList;
}
public static Map<String, ContainerModel> getSampleContainerModels() {
Map<String, ContainerModel> containerModels = new HashMap<>();
Map<TaskName, TaskModel> tasks = new HashMap<>();
Set<SystemStreamPartition> sspsForTask1 = new HashSet<>();
sspsForTask1.add(new SystemStreamPartition("kafka", "test-stream", new Partition(0)));
tasks.put(new TaskName("Partition 0"), new TaskModel(new TaskName("Partition 0"), sspsForTask1, new Partition(0)));
Set<SystemStreamPartition> sspsForTask2 = new HashSet<>();
sspsForTask2.add(new SystemStreamPartition("kafka", "test-stream", new Partition(1)));
tasks.put(new TaskName("Partition 1"), new TaskModel(new TaskName("Partition 1"), sspsForTask2, new Partition(1)));
containerModels.put("0", new ContainerModel("0", tasks));
return containerModels;
}
/**
* Tests basic operations on {@link DiagnosticsStreamMessage}.
*/
@Test
public void basicTest() {
DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores());
Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores());
Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList());
}
/**
* Tests serialization and deserialization of a {@link DiagnosticsStreamMessage}
*/
@Test
public void serdeTest() {
DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage();
Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList();
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot();
Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId);
Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion);
Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion);
Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName());
Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap();
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numPersistentStores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("config"));
DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage));
}
}