blob: f642c54b65ff83c76747517aefe470032e2c18f6 [file] [log] [blame]
/*
* Copyright 2018 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.worker;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.storm.utils.JCQueue;
import org.junit.Test;
public class BackPressureTrackerTest {
private static final String WORKER_ID = "worker";
@Test
public void testGetBackpressure() {
int taskIdNoBackPressure = 1;
JCQueue noBackPressureQueue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
BackPressureStatus status = tracker.getCurrStatus();
assertThat(status.workerId, is(WORKER_ID));
assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
assertThat(status.bpTasks, is(empty()));
}
@Test
public void testSetBackpressure() {
int taskIdNoBackPressure = 1;
JCQueue noBackPressureQueue = mock(JCQueue.class);
int taskIdBackPressure = 2;
JCQueue backPressureQueue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskIdNoBackPressure, noBackPressureQueue,
taskIdBackPressure, backPressureQueue));
BackpressureState state = tracker.getBackpressureState(taskIdBackPressure);
boolean backpressureChanged = tracker.recordBackPressure(state);
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(true));
assertThat(status.workerId, is(WORKER_ID));
assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
assertThat(status.bpTasks, contains(taskIdBackPressure));
}
@Test
public void testSetBackpressureWithExistingBackpressure() {
int taskId = 1;
JCQueue queue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
boolean backpressureChanged = tracker.recordBackPressure(state);
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(false));
assertThat(status.workerId, is(WORKER_ID));
assertThat(status.bpTasks, contains(taskId));
}
@Test
public void testRefreshBackpressureWithEmptyOverflow() {
int taskId = 1;
JCQueue queue = mock(JCQueue.class);
when(queue.isEmptyOverflow()).thenReturn(true);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
boolean backpressureChanged = tracker.refreshBpTaskList();
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(true));
assertThat(status.workerId, is(WORKER_ID));
assertThat(status.nonBpTasks, contains(taskId));
}
@Test
public void testRefreshBackPressureWithNonEmptyOverflow() {
int taskId = 1;
JCQueue queue = mock(JCQueue.class);
when(queue.isEmptyOverflow()).thenReturn(false);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
boolean backpressureChanged = tracker.refreshBpTaskList();
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(false));
assertThat(status.workerId, is(WORKER_ID));
assertThat(status.bpTasks, contains(taskId));
}
@Test
public void testSetLastOverflowCount() {
int taskId = 1;
int overflow = 5;
JCQueue queue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
tracker.setLastOverflowCount(state, overflow);
BackpressureState retrievedState = tracker.getBackpressureState(taskId);
int lastOverflowCount = tracker.getLastOverflowCount(retrievedState);
assertThat(lastOverflowCount, is(overflow));
}
}