| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ipc; |
| |
| import static java.lang.Thread.sleep; |
| |
| import org.junit.Test; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import javax.management.MBeanServer; |
| import javax.management.ObjectName; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.PrintStream; |
| import java.lang.management.ManagementFactory; |
| import java.util.concurrent.TimeUnit; |
| |
| public class TestDecayRpcScheduler { |
| private Schedulable mockCall(String id) { |
| Schedulable mockCall = mock(Schedulable.class); |
| UserGroupInformation ugi = mock(UserGroupInformation.class); |
| |
| when(ugi.getUserName()).thenReturn(id); |
| when(mockCall.getUserGroupInformation()).thenReturn(ugi); |
| |
| return mockCall; |
| } |
| |
| private DecayRpcScheduler scheduler; |
| |
| @Test(expected=IllegalArgumentException.class) |
| public void testNegativeScheduler() { |
| scheduler = new DecayRpcScheduler(-1, "", new Configuration()); |
| } |
| |
| @Test(expected=IllegalArgumentException.class) |
| public void testZeroScheduler() { |
| scheduler = new DecayRpcScheduler(0, "", new Configuration()); |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testParsePeriod() { |
| // By default |
| scheduler = new DecayRpcScheduler(1, "", new Configuration()); |
| assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT, |
| scheduler.getDecayPeriodMillis()); |
| |
| // Custom |
| Configuration conf = new Configuration(); |
| conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, |
| 1058); |
| scheduler = new DecayRpcScheduler(1, "ns", conf); |
| assertEquals(1058L, scheduler.getDecayPeriodMillis()); |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testParseFactor() { |
| // Default |
| scheduler = new DecayRpcScheduler(1, "", new Configuration()); |
| assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT, |
| scheduler.getDecayFactor(), 0.00001); |
| |
| // Custom |
| Configuration conf = new Configuration(); |
| conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, |
| "0.125"); |
| scheduler = new DecayRpcScheduler(1, "prefix", conf); |
| assertEquals(0.125, scheduler.getDecayFactor(), 0.00001); |
| } |
| |
| public void assertEqualDecimalArrays(double[] a, double[] b) { |
| assertEquals(a.length, b.length); |
| for(int i = 0; i < a.length; i++) { |
| assertEquals(a[i], b[i], 0.00001); |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testParseThresholds() { |
| // Defaults vary by number of queues |
| Configuration conf = new Configuration(); |
| scheduler = new DecayRpcScheduler(1, "", conf); |
| assertEqualDecimalArrays(new double[]{}, scheduler.getThresholds()); |
| |
| scheduler = new DecayRpcScheduler(2, "", conf); |
| assertEqualDecimalArrays(new double[]{0.5}, scheduler.getThresholds()); |
| |
| scheduler = new DecayRpcScheduler(3, "", conf); |
| assertEqualDecimalArrays(new double[]{0.25, 0.5}, scheduler.getThresholds()); |
| |
| scheduler = new DecayRpcScheduler(4, "", conf); |
| assertEqualDecimalArrays(new double[]{0.125, 0.25, 0.5}, scheduler.getThresholds()); |
| |
| // Custom |
| conf = new Configuration(); |
| conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, |
| "1, 10, 20, 50, 85"); |
| scheduler = new DecayRpcScheduler(6, "ns", conf); |
| assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds()); |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testAccumulate() { |
| Configuration conf = new Configuration(); |
| conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush |
| scheduler = new DecayRpcScheduler(1, "ns", conf); |
| |
| assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first |
| |
| getPriorityIncrementCallCount("A"); |
| assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); |
| assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); |
| |
| getPriorityIncrementCallCount("A"); |
| getPriorityIncrementCallCount("B"); |
| getPriorityIncrementCallCount("A"); |
| |
| assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue()); |
| assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue()); |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testDecay() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setLong("ns." // Never decay |
| + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999); |
| conf.setDouble("ns." |
| + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5); |
| scheduler = new DecayRpcScheduler(1, "ns", conf); |
| |
| assertEquals(0, scheduler.getTotalCallSnapshot()); |
| |
| for (int i = 0; i < 4; i++) { |
| getPriorityIncrementCallCount("A"); |
| } |
| |
| sleep(1000); |
| |
| for (int i = 0; i < 8; i++) { |
| getPriorityIncrementCallCount("B"); |
| } |
| |
| assertEquals(12, scheduler.getTotalCallSnapshot()); |
| assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue()); |
| assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue()); |
| |
| scheduler.forceDecay(); |
| |
| assertEquals(6, scheduler.getTotalCallSnapshot()); |
| assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue()); |
| assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue()); |
| |
| scheduler.forceDecay(); |
| |
| assertEquals(3, scheduler.getTotalCallSnapshot()); |
| assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); |
| assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue()); |
| |
| scheduler.forceDecay(); |
| |
| assertEquals(1, scheduler.getTotalCallSnapshot()); |
| assertEquals(null, scheduler.getCallCostSnapshot().get("A")); |
| assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue()); |
| |
| scheduler.forceDecay(); |
| |
| assertEquals(0, scheduler.getTotalCallSnapshot()); |
| assertEquals(null, scheduler.getCallCostSnapshot().get("A")); |
| assertEquals(null, scheduler.getCallCostSnapshot().get("B")); |
| } |
| |
| @Test |
| @SuppressWarnings("deprecation") |
| public void testPriority() throws Exception { |
| Configuration conf = new Configuration(); |
| final String namespace = "ns"; |
| conf.set(namespace + "." + DecayRpcScheduler |
| .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush |
| conf.set(namespace + "." + DecayRpcScheduler |
| .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75"); |
| scheduler = new DecayRpcScheduler(4, namespace, conf); |
| |
| assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls |
| assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls |
| assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls |
| assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls |
| assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls |
| assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls |
| assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls |
| assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls |
| assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls |
| assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls |
| |
| MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); |
| ObjectName mxbeanName = new ObjectName( |
| "Hadoop:service="+ namespace + ",name=DecayRpcScheduler"); |
| |
| String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); |
| assertTrue("Get expected JMX of CallVolumeSummary before decay", |
| cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}")); |
| |
| scheduler.forceDecay(); |
| |
| String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); |
| assertTrue("Get expected JMX for CallVolumeSummary after decay", |
| cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}")); |
| } |
| |
| @Test(timeout=2000) |
| @SuppressWarnings("deprecation") |
| public void testPeriodic() throws InterruptedException { |
| Configuration conf = new Configuration(); |
| conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10"); |
| conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); |
| scheduler = new DecayRpcScheduler(1, "ns", conf); |
| |
| assertEquals(10, scheduler.getDecayPeriodMillis()); |
| assertEquals(0, scheduler.getTotalCallSnapshot()); |
| |
| for (int i = 0; i < 64; i++) { |
| getPriorityIncrementCallCount("A"); |
| } |
| |
| // It should eventually decay to zero |
| while (scheduler.getTotalCallSnapshot() > 0) { |
| sleep(10); |
| } |
| } |
| |
| @Test(timeout=60000) |
| public void testNPEatInitialization() throws InterruptedException { |
| // redirect the LOG to and check if there is NPE message while initializing |
| // the DecayRpcScheduler |
| PrintStream output = System.out; |
| try { |
| ByteArrayOutputStream bytes = new ByteArrayOutputStream(); |
| System.setOut(new PrintStream(bytes)); |
| |
| // initializing DefaultMetricsSystem here would set "monitoring" flag in |
| // MetricsSystemImpl to true |
| DefaultMetricsSystem.initialize("NameNode"); |
| Configuration conf = new Configuration(); |
| scheduler = new DecayRpcScheduler(1, "ns", conf); |
| // check if there is npe in log |
| assertFalse(bytes.toString().contains("NullPointerException")); |
| } finally { |
| //set systout back |
| System.setOut(output); |
| } |
| } |
| |
| @Test |
| public void testUsingWeightedTimeCostProvider() { |
| scheduler = getSchedulerWithWeightedTimeCostProvider(3); |
| |
| // 3 details in increasing order of cost. Although medium has a longer |
| // duration, the shared lock is weighted less than the exclusive lock |
| ProcessingDetails callDetailsLow = |
| new ProcessingDetails(TimeUnit.MILLISECONDS); |
| callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1); |
| ProcessingDetails callDetailsMedium = |
| new ProcessingDetails(TimeUnit.MILLISECONDS); |
| callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500); |
| ProcessingDetails callDetailsHigh = |
| new ProcessingDetails(TimeUnit.MILLISECONDS); |
| callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100); |
| |
| for (int i = 0; i < 10; i++) { |
| scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow); |
| } |
| scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium); |
| scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh); |
| |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); |
| assertEquals(1, scheduler.getPriorityLevel(mockCall("MED"))); |
| assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH"))); |
| |
| assertEquals(3, scheduler.getUniqueIdentityCount()); |
| long totalCallInitial = scheduler.getTotalRawCallVolume(); |
| assertEquals(totalCallInitial, scheduler.getTotalCallVolume()); |
| |
| scheduler.forceDecay(); |
| |
| // Relative priorities should stay the same after a single decay |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); |
| assertEquals(1, scheduler.getPriorityLevel(mockCall("MED"))); |
| assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH"))); |
| |
| assertEquals(3, scheduler.getUniqueIdentityCount()); |
| assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume()); |
| assertTrue(scheduler.getTotalCallVolume() < totalCallInitial); |
| |
| for (int i = 0; i < 100; i++) { |
| scheduler.forceDecay(); |
| } |
| // After enough decay cycles, all callers should be high priority again |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("MED"))); |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH"))); |
| } |
| |
| @Test |
| public void testUsingWeightedTimeCostProviderWithZeroCostCalls() { |
| scheduler = getSchedulerWithWeightedTimeCostProvider(2); |
| |
| ProcessingDetails emptyDetails = |
| new ProcessingDetails(TimeUnit.MILLISECONDS); |
| |
| for (int i = 0; i < 1000; i++) { |
| scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails); |
| } |
| scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails); |
| |
| // Since the calls are all "free", they should have the same priority |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY"))); |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW"))); |
| } |
| |
| @Test |
| public void testUsingWeightedTimeCostProviderNoRequests() { |
| scheduler = getSchedulerWithWeightedTimeCostProvider(2); |
| |
| assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); |
| } |
| |
| /** |
| * Get a scheduler that uses {@link WeightedTimeCostProvider} and has |
| * normal decaying disabled. |
| */ |
| private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider( |
| int priorityLevels) { |
| Configuration conf = new Configuration(); |
| conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, |
| WeightedTimeCostProvider.class, CostProvider.class); |
| conf.setLong("ns." |
| + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999); |
| return new DecayRpcScheduler(priorityLevels, "ns", conf); |
| } |
| |
| /** |
| * Get the priority and increment the call count, assuming that |
| * {@link DefaultCostProvider} is in use. |
| */ |
| private int getPriorityIncrementCallCount(String callId) { |
| Schedulable mockCall = mockCall(callId); |
| int priority = scheduler.getPriorityLevel(mockCall); |
| // The DefaultCostProvider uses a cost of 1 for all calls, ignoring |
| // the processing details, so an empty one is fine |
| ProcessingDetails emptyProcessingDetails = |
| new ProcessingDetails(TimeUnit.MILLISECONDS); |
| scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails); |
| return priority; |
| } |
| } |