blob: b0a23b636bb635caeb6457c728a07bd38c6a4cab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.TaskStatus.Listener;
import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerTaskTest {
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
@Mock private TaskStatus.Listener statusListener;
@Mock private ClassLoader loader;
@Mock private StatusBackingStore statusBackingStore;
private ConnectMetrics metrics;
private RetryWithToleranceOperator retryWithToleranceOperator;
@Mock private ErrorHandlingMetrics errorHandlingMetrics;
@Before
public void setup() {
metrics = new MockConnectMetrics();
retryWithToleranceOperator = RetryWithToleranceOperatorTest.NOOP_OPERATOR;
}
@After
public void tearDown() {
if (metrics != null) metrics.stop();
}
@Test
public void standardStartup() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, errorHandlingMetrics,
retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
workerTask.initialize(TASK_CONFIG);
workerTask.run();
workerTask.stop();
workerTask.awaitStop(1000L);
verify(statusListener).onStartup(taskId);
verify(statusListener).onShutdown(taskId);
}
@Test
public void stopBeforeStarting() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, errorHandlingMetrics,
retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
@Override
public void initializeAndStart() {
fail("This method is expected to not be invoked");
}
@Override
public void execute() {
fail("This method is expected to not be invoked");
}
};
workerTask.initialize(TASK_CONFIG);
workerTask.stop();
workerTask.awaitStop(1000L);
// now run should not do anything
workerTask.run();
}
@Test
public void cancelBeforeStopping() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
final CountDownLatch stopped = new CountDownLatch(1);
WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, errorHandlingMetrics,
retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
@Override
public void execute() {
try {
stopped.await();
} catch (InterruptedException e) {
fail("Unexpected interrupt");
}
}
// Trigger task shutdown immediately after start. The task will block in its execute() method
// until the stopped latch is counted down (i.e. it doesn't actually stop after stop is triggered).
@Override
public void initializeAndStart() {
stop();
}
};
workerTask.initialize(TASK_CONFIG);
Thread t = new Thread(workerTask);
t.start();
workerTask.cancel();
stopped.countDown();
t.join();
verify(statusListener).onStartup(taskId);
// there should be no other status updates, including shutdown
verifyNoMoreInteractions(statusListener);
}
@Test
public void updateMetricsOnListenerEventsForStartupPauseResumeAndShutdown() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
ConnectMetrics metrics = new MockConnectMetrics();
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
group.onStartup(taskId);
assertRunningMetric(group);
group.onPause(taskId);
assertPausedMetric(group);
group.onResume(taskId);
assertRunningMetric(group);
group.onShutdown(taskId);
assertStoppedMetric(group);
verify(statusListener).onStartup(taskId);
verify(statusListener).onPause(taskId);
verify(statusListener).onResume(taskId);
verify(statusListener).onShutdown(taskId);
}
@Test
public void updateMetricsOnListenerEventsForStartupPauseResumeAndFailure() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
MockConnectMetrics metrics = new MockConnectMetrics();
MockTime time = metrics.time();
ConnectException error = new ConnectException("error");
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
time.sleep(1000L);
group.onStartup(taskId);
assertRunningMetric(group);
time.sleep(2000L);
group.onPause(taskId);
assertPausedMetric(group);
time.sleep(3000L);
group.onResume(taskId);
assertRunningMetric(group);
time.sleep(4000L);
group.onPause(taskId);
assertPausedMetric(group);
time.sleep(5000L);
group.onResume(taskId);
assertRunningMetric(group);
time.sleep(6000L);
group.onFailure(taskId, error);
assertFailedMetric(group);
time.sleep(7000L);
group.onShutdown(taskId);
assertStoppedMetric(group);
verify(statusListener).onStartup(taskId);
verify(statusListener, times(2)).onPause(taskId);
verify(statusListener, times(2)).onResume(taskId);
verify(statusListener).onFailure(taskId, error);
verify(statusListener).onShutdown(taskId);
long totalTime = 27000L;
double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime;
assertEquals(pauseTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "pause-ratio"), 0.000001d);
assertEquals(runningTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "running-ratio"), 0.000001d);
}
private static abstract class TestSinkTask extends SinkTask {
}
private static class TestWorkerTask extends WorkerTask {
public TestWorkerTask(ConnectorTaskId id, Listener statusListener, TargetState initialState, ClassLoader loader,
ConnectMetrics connectMetrics, ErrorHandlingMetrics errorHandlingMetrics, RetryWithToleranceOperator retryWithToleranceOperator, Time time,
StatusBackingStore statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics, errorHandlingMetrics, retryWithToleranceOperator, time, statusBackingStore);
}
@Override
public void initialize(TaskConfig taskConfig) {
}
@Override
protected void initializeAndStart() {
}
@Override
protected void execute() {
}
@Override
protected void close() {
}
}
protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
assertEquals(AbstractStatus.State.FAILED, metricsGroup.state());
}
protected void assertPausedMetric(TaskMetricsGroup metricsGroup) {
assertEquals(AbstractStatus.State.PAUSED, metricsGroup.state());
}
protected void assertRunningMetric(TaskMetricsGroup metricsGroup) {
assertEquals(AbstractStatus.State.RUNNING, metricsGroup.state());
}
protected void assertStoppedMetric(TaskMetricsGroup metricsGroup) {
assertEquals(AbstractStatus.State.UNASSIGNED, metricsGroup.state());
}
}