blob: f817c47c0894f1d3713599f4d70e1fdf4ee9e756 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class})
public class TestDiagnosticsUtil {
private static final String STREAM_NAME = "someStreamName";
private static final String JOB_NAME = "someJob";
private static final String JOB_ID = "someId";
private static final String CONTAINER_ID = "someContainerId";
private static final String ENV_ID = "someEnvID";
public static final String REPORTER_FACTORY = "org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory";
public static final String SYSTEM_FACTORY = "com.foo.system.SomeSystemFactory";
@Test
public void testBuildDiagnosticsManagerReturnsConfiguredReporter() {
Config config = new MapConfig(buildTestConfigs());
JobModel mockJobModel = mock(JobModel.class);
SystemFactory systemFactory = mock(SystemFactory.class);
SystemProducer mockProducer = mock(SystemProducer.class);
MetricsReporterFactory metricsReporterFactory = mock(MetricsReporterFactory.class);
MetricsSnapshotReporter mockReporter = mock(MetricsSnapshotReporter.class);
when(systemFactory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(mockProducer);
when(metricsReporterFactory.getMetricsReporter(anyString(), anyString(), any(Config.class))).thenReturn(
mockReporter);
PowerMockito.mockStatic(ReflectionUtil.class);
when(ReflectionUtil.getObj(REPORTER_FACTORY, MetricsReporterFactory.class)).thenReturn(metricsReporterFactory);
when(ReflectionUtil.getObj(SYSTEM_FACTORY, SystemFactory.class)).thenReturn(systemFactory);
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> managerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID, mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
config);
Assert.assertTrue(managerReporterPair.isPresent());
Assert.assertEquals(mockReporter, managerReporterPair.get().getValue());
}
private Map<String, String> buildTestConfigs() {
Map<String, String> configs = new HashMap<>();
configs.put(JobConfig.JOB_DIAGNOSTICS_ENABLED, "true");
configs.put(String.format(MetricsConfig.METRICS_REPORTER_FACTORY,
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS), REPORTER_FACTORY);
configs.put(String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS),
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS + "." + STREAM_NAME);
configs.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS),
SYSTEM_FACTORY);
return configs;
}
}