blob: 10ab40ace1f678e51c2044f07bc92d4f210a85f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import static java.lang.Thread.sleep;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class);
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
return mockCall;
}
private DecayRpcScheduler scheduler;
@Test(expected=IllegalArgumentException.class)
public void testNegativeScheduler() {
scheduler = new DecayRpcScheduler(-1, "", new Configuration());
}
@Test(expected=IllegalArgumentException.class)
public void testZeroScheduler() {
scheduler = new DecayRpcScheduler(0, "", new Configuration());
}
@Test
@SuppressWarnings("deprecation")
public void testParsePeriod() {
// By default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());
// Custom
Configuration conf = new Configuration();
conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}
@Test
@SuppressWarnings("deprecation")
public void testParseFactor() {
// Default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
scheduler.getDecayFactor(), 0.00001);
// Custom
Configuration conf = new Configuration();
conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
"0.125");
scheduler = new DecayRpcScheduler(1, "prefix", conf);
assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
}
public void assertEqualDecimalArrays(double[] a, double[] b) {
assertEquals(a.length, b.length);
for(int i = 0; i < a.length; i++) {
assertEquals(a[i], b[i], 0.00001);
}
}
@Test
@SuppressWarnings("deprecation")
public void testParseThresholds() {
// Defaults vary by number of queues
Configuration conf = new Configuration();
scheduler = new DecayRpcScheduler(1, "", conf);
assertEqualDecimalArrays(new double[]{}, scheduler.getThresholds());
scheduler = new DecayRpcScheduler(2, "", conf);
assertEqualDecimalArrays(new double[]{0.5}, scheduler.getThresholds());
scheduler = new DecayRpcScheduler(3, "", conf);
assertEqualDecimalArrays(new double[]{0.25, 0.5}, scheduler.getThresholds());
scheduler = new DecayRpcScheduler(4, "", conf);
assertEqualDecimalArrays(new double[]{0.125, 0.25, 0.5}, scheduler.getThresholds());
// Custom
conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"1, 10, 20, 50, 85");
scheduler = new DecayRpcScheduler(6, "ns", conf);
assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
}
@Test
@SuppressWarnings("deprecation")
public void testAccumulate() {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
scheduler.getPriorityLevel(mockCall("A"));
assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
scheduler.getPriorityLevel(mockCall("A"));
scheduler.getPriorityLevel(mockCall("B"));
scheduler.getPriorityLevel(mockCall("A"));
assertEquals(3, scheduler.getCallCountSnapshot().get("A").longValue());
assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
}
@Test
@SuppressWarnings("deprecation")
public void testDecay() throws Exception {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getTotalCallSnapshot());
for (int i = 0; i < 4; i++) {
scheduler.getPriorityLevel(mockCall("A"));
}
sleep(1000);
for (int i = 0; i < 8; i++) {
scheduler.getPriorityLevel(mockCall("B"));
}
assertEquals(12, scheduler.getTotalCallSnapshot());
assertEquals(4, scheduler.getCallCountSnapshot().get("A").longValue());
assertEquals(8, scheduler.getCallCountSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(6, scheduler.getTotalCallSnapshot());
assertEquals(2, scheduler.getCallCountSnapshot().get("A").longValue());
assertEquals(4, scheduler.getCallCountSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(3, scheduler.getTotalCallSnapshot());
assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
assertEquals(2, scheduler.getCallCountSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(1, scheduler.getTotalCallSnapshot());
assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(0, scheduler.getTotalCallSnapshot());
assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
assertEquals(null, scheduler.getCallCountSnapshot().get("B"));
}
@Test
@SuppressWarnings("deprecation")
public void testPriority() throws Exception {
Configuration conf = new Configuration();
final String namespace = "ns";
conf.set(namespace + "." + DecayRpcScheduler
.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set(namespace + "." + DecayRpcScheduler
.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
scheduler = new DecayRpcScheduler(4, namespace, conf);
assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(0, scheduler.getPriorityLevel(mockCall("B")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("B")));
assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service="+ namespace + ",name=DecayRpcScheduler");
String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX of CallVolumeSummary before decay",
cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}"));
scheduler.forceDecay();
String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
assertTrue("Get expected JMX for CallVolumeSummary after decay",
cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}"));
}
@Test(timeout=2000)
@SuppressWarnings("deprecation")
public void testPeriodic() throws InterruptedException {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(10, scheduler.getDecayPeriodMillis());
assertEquals(0, scheduler.getTotalCallSnapshot());
for (int i = 0; i < 64; i++) {
scheduler.getPriorityLevel(mockCall("A"));
}
// It should eventually decay to zero
while (scheduler.getTotalCallSnapshot() > 0) {
sleep(10);
}
}
@Test(timeout=60000)
public void testNPEatInitialization() throws InterruptedException {
// redirect the LOG to and check if there is NPE message while initializing
// the DecayRpcScheduler
PrintStream output = System.out;
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
// initializing DefaultMetricsSystem here would set "monitoring" flag in
// MetricsSystemImpl to true
DefaultMetricsSystem.initialize("NameNode");
Configuration conf = new Configuration();
scheduler = new DecayRpcScheduler(1, "ns", conf);
// check if there is npe in log
assertFalse(bytes.toString().contains("NullPointerException"));
} finally {
//set systout back
System.setOut(output);
}
}
}