blob: 161bcc11a2034e1fac2261e6b2d621e0273bcb9a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Tests for Online SlowLog Provider Service
*/
@Category({MasterTests.class, MediumTests.class})
public class TestNamedQueueRecorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private NamedQueueRecorder namedQueueRecorder;
private static int i = 0;
private static Configuration applySlowLogRecorderConf(int eventSize) {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
return conf;
}
/**
* confirm that for a ringbuffer of slow logs, payload on given index of buffer
* has expected elements
*
* @param i index of ringbuffer logs
* @param j data value that was put on index i
* @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
* @return if actual values are as per expectations
*/
private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
return isClassExpected && isClientExpected && isUserExpected;
}
@Test
public void testOnlieSlowLogConsumption() throws Exception{
Configuration conf = applySlowLogRecorderConf(8);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
int i = 0;
// add 5 records initially
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
// add 2 more records
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 7));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
return slowLogPayloadsList.size() == 7
&& confirmPayloadParams(0, 7, slowLogPayloadsList)
&& confirmPayloadParams(5, 2, slowLogPayloadsList)
&& confirmPayloadParams(6, 1, slowLogPayloadsList);
})
);
// add 3 more records
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(7, 3, slowLogPayloadsList)
&& confirmPayloadParams(0, 10, slowLogPayloadsList)
&& confirmPayloadParams(1, 9, slowLogPayloadsList);
})
);
// add 4 more records
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is empty
return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
})
);
}
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
return namedQueueGetResponse == null ?
Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
}
@Test
public void testOnlineSlowLogWithHighRecords() throws Exception {
Configuration conf = applySlowLogRecorderConf(14);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 14 * 11; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
&& confirmPayloadParams(0, 154, slowLogPayloads)
&& confirmPayloadParams(1, 153, slowLogPayloads)
&& confirmPayloadParams(2, 152, slowLogPayloads)
&& confirmPayloadParams(3, 151, slowLogPayloads)
&& confirmPayloadParams(4, 150, slowLogPayloads)
&& confirmPayloadParams(5, 149, slowLogPayloads)
&& confirmPayloadParams(6, 148, slowLogPayloads)
&& confirmPayloadParams(7, 147, slowLogPayloads)
&& confirmPayloadParams(8, 146, slowLogPayloads)
&& confirmPayloadParams(9, 145, slowLogPayloads)
&& confirmPayloadParams(10, 144, slowLogPayloads)
&& confirmPayloadParams(11, 143, slowLogPayloads)
&& confirmPayloadParams(12, 142, slowLogPayloads)
&& confirmPayloadParams(13, 141, slowLogPayloads);
})
);
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
}
@Test
public void testOnlineSlowLogWithDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
}
@Test
public void testSlowLogFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 1));
AdminProtos.SlowLogResponseRequest requestClient =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setClientAddress("client_85")
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestClient).size() == 1));
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testConcurrentSlowLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(50000);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(500000)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int j = 0; j < 1000; j++) {
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 3500; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
});
}
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> getSlowLogPayloads(request).size() > 10000));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
}
@Test
public void testSlowLargeLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(28);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
boolean isSlowLog;
boolean isLargeLog;
for (int i = 0; i < 14 * 11; i++) {
if (i % 2 == 0) {
isSlowLog = true;
isLargeLog = false;
} else {
isSlowLog = false;
isLargeLog = true;
}
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
isSlowLog, isLargeLog);
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
&& confirmPayloadParams(0, 153, slowLogPayloads)
&& confirmPayloadParams(1, 151, slowLogPayloads)
&& confirmPayloadParams(2, 149, slowLogPayloads)
&& confirmPayloadParams(3, 147, slowLogPayloads)
&& confirmPayloadParams(4, 145, slowLogPayloads)
&& confirmPayloadParams(5, 143, slowLogPayloads)
&& confirmPayloadParams(6, 141, slowLogPayloads)
&& confirmPayloadParams(7, 139, slowLogPayloads)
&& confirmPayloadParams(8, 137, slowLogPayloads)
&& confirmPayloadParams(9, 135, slowLogPayloads)
&& confirmPayloadParams(10, 133, slowLogPayloads)
&& confirmPayloadParams(11, 131, slowLogPayloads)
&& confirmPayloadParams(12, 129, slowLogPayloads)
&& confirmPayloadParams(13, 127, slowLogPayloads);
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(14 * 11)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequest).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
// confirm strict order of slow log payloads
return largeLogPayloads.size() == 14
&& confirmPayloadParams(0, 154, largeLogPayloads)
&& confirmPayloadParams(1, 152, largeLogPayloads)
&& confirmPayloadParams(2, 150, largeLogPayloads)
&& confirmPayloadParams(3, 148, largeLogPayloads)
&& confirmPayloadParams(4, 146, largeLogPayloads)
&& confirmPayloadParams(5, 144, largeLogPayloads)
&& confirmPayloadParams(6, 142, largeLogPayloads)
&& confirmPayloadParams(7, 140, largeLogPayloads)
&& confirmPayloadParams(8, 138, largeLogPayloads)
&& confirmPayloadParams(9, 136, largeLogPayloads)
&& confirmPayloadParams(10, 134, largeLogPayloads)
&& confirmPayloadParams(11, 132, largeLogPayloads)
&& confirmPayloadParams(12, 130, largeLogPayloads)
&& confirmPayloadParams(13, 128, largeLogPayloads);
})
);
}
@Test
public void testSlowLogMixedFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.setClientAddress("client_88")
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 2));
AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_1")
.setClientAddress("client_2")
.build();
Assert.assertEquals(0, getSlowLogPayloads(request2).size());
AdminProtos.SlowLogResponseRequest request3 =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.setClientAddress("client_88")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(0, getSlowLogPayloads(request3).size());
AdminProtos.SlowLogResponseRequest request4 =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.setClientAddress("client_87")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(1, getSlowLogPayloads(request4).size());
AdminProtos.SlowLogResponseRequest request5 =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_88")
.setClientAddress("client_89")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
.build();
Assert.assertEquals(2, getSlowLogPayloads(request5).size());
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
RpcCall rpcCall = getRpcCall(userName);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
String className, boolean isSlowLog, boolean isLargeLog) {
RpcCall rpcCall = getRpcCall(userName);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
isLargeLog);
}
private static RpcCall getRpcCall(String userName) {
RpcCall rpcCall = new RpcCall() {
@Override
public BlockingService getService() {
return null;
}
@Override
public Descriptors.MethodDescriptor getMethod() {
return null;
}
@Override
public Message getParam() {
return getMessage();
}
@Override
public CellScanner getCellScanner() {
return null;
}
@Override
public long getReceiveTime() {
return 0;
}
@Override
public long getStartTime() {
return 0;
}
@Override
public void setStartTime(long startTime) {
}
@Override
public int getTimeout() {
return 0;
}
@Override
public int getPriority() {
return 0;
}
@Override
public long getDeadline() {
return 0;
}
@Override
public long getSize() {
return 0;
}
@Override
public RPCProtos.RequestHeader getHeader() {
return null;
}
@Override
public int getRemotePort() {
return 0;
}
@Override
public void setResponse(Message param, CellScanner cells,
Throwable errorThrowable, String error) {
}
@Override
public void sendResponseIfReady() throws IOException {
}
@Override
public void cleanup() {
}
@Override
public String toShortString() {
return null;
}
@Override
public long disconnectSince() {
return 0;
}
@Override
public boolean isClientCellBlockSupported() {
return false;
}
@Override
public Optional<User> getRequestUser() {
return getUser(userName);
}
@Override
public InetAddress getRemoteAddress() {
return null;
}
@Override
public HBaseProtos.VersionInfo getClientVersionInfo() {
return null;
}
@Override
public void setCallBack(RpcCallback callback) {
}
@Override
public boolean isRetryImmediatelySupported() {
return false;
}
@Override
public long getResponseCellSize() {
return 0;
}
@Override
public void incrementResponseCellSize(long cellSize) {
}
@Override
public long getResponseBlockSize() {
return 0;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
}
@Override
public long getResponseExceptionSize() {
return 0;
}
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}
};
return rpcCall;
}
private static Message getMessage() {
i = (i + 1) % 3;
Message message = null;
switch (i) {
case 0: {
message = ClientProtos.ScanRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region1"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
.build())
.build();
break;
}
case 1: {
message = ClientProtos.MutateRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region2"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
.setMutation(ClientProtos.MutationProto.newBuilder()
.setRow(ByteString.copyFromUtf8("row123"))
.build())
.build();
break;
}
case 2: {
message = ClientProtos.GetRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region2"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
.setGet(ClientProtos.Get.newBuilder()
.setRow(ByteString.copyFromUtf8("row123"))
.build())
.build();
break;
}
default:
throw new RuntimeException("Not supposed to get here?");
}
return message;
}
private static Optional<User> getUser(String userName) {
return Optional.of(new User() {
@Override
public String getShortName() {
return userName;
}
@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}
@Override
public <T> T runAs(PrivilegedExceptionAction<T> action) {
return null;
}
});
}
}