blob: 9d17b638d5c2ea5270bda6e82eec3628f74db958 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerMetricsGroupTest {
private final String connector = "org.FakeConnector";
private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
private final RuntimeException exception = new RuntimeException();
@Mock private ConnectMetrics connectMetrics;
private Sensor connectorStartupResults;
private Sensor connectorStartupAttempts;
private Sensor connectorStartupSuccesses;
private Sensor connectorStartupFailures;
private Sensor taskStartupResults;
private Sensor taskStartupAttempts;
private Sensor taskStartupSuccesses;
private Sensor taskStartupFailures;
@Mock private ConnectorStatus.Listener delegateConnectorListener;
@Mock private TaskStatus.Listener delegateTaskListener;
@Mock private ConnectMetricsRegistry connectMetricsRegistry;
@Mock private ConnectMetrics.MetricGroup metricGroup;
@Mock private MetricName metricName;
@Before
public void setup() {
// We don't expect metricGroup.metricName to be invoked with null in practice,
// but it's easier to test this way, and should have no impact
// on the efficacy of these tests
when(metricGroup.metricName((MetricNameTemplate) isNull())).thenReturn(metricName);
when(connectMetricsRegistry.workerGroupName()).thenReturn(ConnectMetricsRegistry.WORKER_GROUP_NAME);
when(connectMetrics.registry()).thenReturn(connectMetricsRegistry);
when(connectMetrics.group(ConnectMetricsRegistry.WORKER_GROUP_NAME)).thenReturn(metricGroup);
connectorStartupResults = mockSensor(metricGroup, "connector-startup-results");
connectorStartupAttempts = mockSensor(metricGroup, "connector-startup-attempts");
connectorStartupSuccesses = mockSensor(metricGroup, "connector-startup-successes");
connectorStartupFailures = mockSensor(metricGroup, "connector-startup-failures");
taskStartupResults = mockSensor(metricGroup, "task-startup-results");
taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
taskStartupSuccesses = mockSensor(metricGroup, "task-startup-successes");
taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
}
private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String name) {
Sensor sensor = mock(Sensor.class);
when(metricGroup.sensor(name)).thenReturn(sensor);
when(sensor.add(any(CompoundStat.class))).thenReturn(true);
when(sensor.add(any(MetricName.class), any(CumulativeSum.class))).thenReturn(true);
return sensor;
}
@Test
public void testConnectorStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
connectorListener.onStartup(connector);
verifyRecordConnectorStartupSuccess();
verify(delegateConnectorListener).onStartup(connector);
}
@Test
public void testConnectorFailureAfterStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
connectorListener.onStartup(connector);
connectorListener.onFailure(connector, exception);
verify(delegateConnectorListener).onStartup(connector);
verifyRecordConnectorStartupSuccess();
verify(delegateConnectorListener).onFailure(connector, exception);
// recordConnectorStartupFailure() should not be called if failure happens after a successful startup.
verify(connectorStartupFailures, never()).record(anyDouble());
}
@Test
public void testConnectorFailureBeforeStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final ConnectorStatus.Listener connectorListener = workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
connectorListener.onFailure(connector, exception);
verify(delegateConnectorListener).onFailure(connector, exception);
verifyRecordConnectorStartupFailure();
}
@Test
public void testTaskStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
taskListener.onStartup(task);
verify(delegateTaskListener).onStartup(task);
verifyRecordTaskSuccess();
}
@Test
public void testTaskFailureAfterStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
taskListener.onStartup(task);
taskListener.onFailure(task, exception);
verify(delegateTaskListener).onStartup(task);
verifyRecordTaskSuccess();
verify(delegateTaskListener).onFailure(task, exception);
// recordTaskFailure() should not be called if failure happens after a successful startup.
verify(taskStartupFailures, never()).record(anyDouble());
}
@Test
public void testTaskFailureBeforeStartupRecordedMetrics() {
WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new HashMap<>(), new HashMap<>(), connectMetrics);
final TaskStatus.Listener taskListener = workerMetricsGroup.wrapStatusListener(delegateTaskListener);
taskListener.onFailure(task, exception);
verify(delegateTaskListener).onFailure(task, exception);
verifyRecordTaskFailure();
}
private void verifyRecordTaskFailure() {
verify(taskStartupAttempts).record(1.0);
verify(taskStartupFailures).record(1.0);
verify(taskStartupResults).record(0.0);
}
private void verifyRecordTaskSuccess() {
verify(taskStartupAttempts).record(1.0);
verify(taskStartupSuccesses).record(1.0);
verify(taskStartupResults).record(1.0);
}
private void verifyRecordConnectorStartupSuccess() {
verify(connectorStartupAttempts).record(1.0);
verify(connectorStartupSuccesses).record(1.0);
verify(connectorStartupResults).record(1.0);
}
private void verifyRecordConnectorStartupFailure() {
verify(connectorStartupAttempts).record(1.0);
verify(connectorStartupFailures).record(1.0);
verify(connectorStartupResults).record(0.0);
}
}