HBASE-25790 NamedQueue 'BalancerRejection' for recent history of balancer skipping (#3182)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 2dcf77b..56fb96e 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -63,6 +63,10 @@
"hbase.master.balancer.decision.buffer.enabled";
public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
+ public static final String BALANCER_REJECTION_BUFFER_ENABLED =
+ "hbase.master.balancer.rejection.buffer.enabled";
+ public static final boolean DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED = false;
+
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerRejection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerRejection.java
new file mode 100644
index 0000000..d6e6cee
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerRejection.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
+
+/**
+ * History of detail information that balancer movements was rejected
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+final public class BalancerRejection extends LogEntry {
+ //The reason why balancer was rejected
+ private final String reason;
+ private final List<String> costFuncInfoList;
+
+ // used to convert object to pretty printed format
+ // used by toJsonPrettyPrint()
+ private static final Gson GSON = GsonUtil.createGson()
+ .setPrettyPrinting()
+ .disableHtmlEscaping()
+ .registerTypeAdapter(BalancerRejection.class, (JsonSerializer<BalancerRejection>)
+ (balancerRejection, type, jsonSerializationContext) -> {
+ Gson gson = new Gson();
+ return gson.toJsonTree(balancerRejection);
+ }).create();
+
+ private BalancerRejection(String reason, List<String> costFuncInfoList) {
+ this.reason = reason;
+ if(costFuncInfoList == null){
+ this.costFuncInfoList = Collections.emptyList();
+ }
+ else {
+ this.costFuncInfoList = costFuncInfoList;
+ }
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public List<String> getCostFuncInfoList() {
+ return costFuncInfoList;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("reason", reason)
+ .append("costFuncInfoList", costFuncInfoList.toString())
+ .toString();
+ }
+
+ @Override
+ public String toJsonPrettyPrint() {
+ return GSON.toJson(this);
+ }
+
+ public static class Builder {
+ private String reason;
+ private List<String> costFuncInfoList;
+
+ public Builder setReason(String reason) {
+ this.reason = reason;
+ return this;
+ }
+
+ public void addCostFuncInfo(String funcName, double cost, float multiplier){
+ if(costFuncInfoList == null){
+ costFuncInfoList = new ArrayList<>();
+ }
+ costFuncInfoList.add(
+ new StringBuilder()
+ .append(funcName)
+ .append(" cost:").append(cost)
+ .append(" multiplier:").append(multiplier)
+ .toString());
+ }
+
+ public Builder setCostFuncInfoList(List<String> costFuncInfoList){
+ this.costFuncInfoList = costFuncInfoList;
+ return this;
+ }
+
+ public BalancerRejection build() {
+ return new BalancerRejection(reason, costFuncInfoList);
+ }
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 38bddde..4980576 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -4202,6 +4202,15 @@
.call();
}
+ private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
+ return this.<List<LogEntry>>newMasterCaller()
+ .action((controller, stub) ->
+ this.call(controller, stub,
+ ProtobufUtil.toBalancerRejectionRequest(limit),
+ MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerRejectionResponse))
+ .call();
+ }
+
@Override
public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
String logType, ServerType serverType, int limit,
@@ -4209,19 +4218,28 @@
if (logType == null || serverType == null) {
throw new IllegalArgumentException("logType and/or serverType cannot be empty");
}
- if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
- if (ServerType.MASTER.equals(serverType)) {
- throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
- }
- return getSlowLogResponses(filterParams, serverNames, limit, logType);
- } else if (logType.equals("BALANCER_DECISION")) {
- if (ServerType.REGION_SERVER.equals(serverType)) {
- throw new IllegalArgumentException(
- "Balancer Decision logs are not maintained by HRegionServer");
- }
- return getBalancerDecisions(limit);
+ switch (logType){
+ case "SLOW_LOG":
+ case "LARGE_LOG":
+ if (ServerType.MASTER.equals(serverType)) {
+ throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
+ }
+ return getSlowLogResponses(filterParams, serverNames, limit, logType);
+ case "BALANCER_DECISION":
+ if (ServerType.REGION_SERVER.equals(serverType)) {
+ throw new IllegalArgumentException(
+ "Balancer Decision logs are not maintained by HRegionServer");
+ }
+ return getBalancerDecisions(limit);
+ case "BALANCER_REJECTION":
+ if (ServerType.REGION_SERVER.equals(serverType)) {
+ throw new IllegalArgumentException(
+ "Balancer Rejection logs are not maintained by HRegionServer");
+ }
+ return getBalancerRejections(limit);
+ default:
+ return CompletableFuture.completedFuture(Collections.emptyList());
}
- return CompletableFuture.completedFuture(Collections.emptyList());
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1b00887..799198e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
@@ -3776,6 +3777,25 @@
throw new RuntimeException("Invalid response from server");
}
+ public static List<LogEntry> toBalancerRejectionResponse(
+ HBaseProtos.LogEntry logEntry) {
+ try {
+ final String logClassName = logEntry.getLogClassName();
+ Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
+ Method method = logClass.getMethod("parseFrom", ByteString.class);
+ if (logClassName.contains("BalancerRejectionsResponse")) {
+ MasterProtos.BalancerRejectionsResponse response =
+ (MasterProtos.BalancerRejectionsResponse) method
+ .invoke(null, logEntry.getLogMessage());
+ return getBalancerRejectionEntries(response);
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Error while retrieving response from server");
+ }
+ throw new RuntimeException("Invalid response from server");
+ }
+
public static List<LogEntry> getBalancerDecisionEntries(
MasterProtos.BalancerDecisionsResponse response) {
List<RecentLogs.BalancerDecision> balancerDecisions = response.getBalancerDecisionList();
@@ -3792,6 +3812,19 @@
.collect(Collectors.toList());
}
+ public static List<LogEntry> getBalancerRejectionEntries(
+ MasterProtos.BalancerRejectionsResponse response) {
+ List<RecentLogs.BalancerRejection> balancerRejections = response.getBalancerRejectionList();
+ if (CollectionUtils.isEmpty(balancerRejections)) {
+ return Collections.emptyList();
+ }
+ return balancerRejections.stream().map(balancerRejection -> new BalancerRejection.Builder()
+ .setReason(balancerRejection.getReason())
+ .setCostFuncInfoList(balancerRejection.getCostFuncInfoList())
+ .build())
+ .collect(Collectors.toList());
+ }
+
public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
MasterProtos.BalancerDecisionsRequest.newBuilder().setLimit(limit).build();
@@ -3801,4 +3834,13 @@
.build();
}
+ public static HBaseProtos.LogRequest toBalancerRejectionRequest(int limit) {
+ MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
+ MasterProtos.BalancerRejectionsRequest.newBuilder().setLimit(limit).build();
+ return HBaseProtos.LogRequest.newBuilder()
+ .setLogClassName(balancerRejectionsRequest.getClass().getName())
+ .setLogMessage(balancerRejectionsRequest.toByteString())
+ .build();
+ }
+
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 6380eea..6dd6440 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -2007,7 +2007,7 @@
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
- <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService</value>
+ <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
@@ -2026,4 +2026,13 @@
the ring buffer is indicated by config: hbase.master.balancer.decision.queue.size
</description>
</property>
+ <property>
+ <name>hbase.master.balancer.rejection.buffer.enabled</name>
+ <value>false</value>
+ <description>
+ Indicates whether active HMaster has ring buffer running for storing
+ balancer rejection in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
+ </description>
+ </property>
</configuration>
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 45b4149..3d265dd 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -707,6 +707,13 @@
}
/**
+ * Same as BalancerDecision but used for BalancerRejection
+ */
+message BalancerRejectionsRequest {
+ optional uint32 limit = 1;
+}
+
+/**
* BalancerDecision (LogEntry) use-case specific RPC response. This response payload will be
* converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
* LogEntry message has two params:
@@ -717,6 +724,10 @@
repeated BalancerDecision balancer_decision = 1;
}
+message BalancerRejectionsResponse {
+ repeated BalancerRejection balancer_rejection = 1;
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/RecentLogs.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/RecentLogs.proto
index ea50b81..1e74820 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/RecentLogs.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/RecentLogs.proto
@@ -37,3 +37,8 @@
repeated string region_plans = 6;
}
+
+message BalancerRejection {
+ required string reason = 1;
+ repeated string cost_func_info = 2;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 772e1a3..13dc31e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -84,6 +84,7 @@
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
@@ -3396,6 +3397,16 @@
.setLogClassName(balancerDecisionsResponse.getClass().getName())
.setLogMessage(balancerDecisionsResponse.toByteString())
.build();
+ }else if (logClassName.contains("BalancerRejectionsRequest")){
+ MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
+ (MasterProtos.BalancerRejectionsRequest) method
+ .invoke(null, request.getLogMessage());
+ MasterProtos.BalancerRejectionsResponse balancerRejectionsResponse =
+ getBalancerRejections(balancerRejectionsRequest);
+ return HBaseProtos.LogEntry.newBuilder()
+ .setLogClassName(balancerRejectionsResponse.getClass().getName())
+ .setLogMessage(balancerRejectionsResponse.toByteString())
+ .build();
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
@@ -3423,4 +3434,22 @@
.addAllBalancerDecision(balancerDecisions).build();
}
+ private MasterProtos.BalancerRejectionsResponse getBalancerRejections(
+ MasterProtos.BalancerRejectionsRequest request) {
+ final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
+ if (namedQueueRecorder == null) {
+ return MasterProtos.BalancerRejectionsResponse.newBuilder()
+ .addAllBalancerRejection(Collections.emptyList()).build();
+ }
+ final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
+ namedQueueGetRequest.setBalancerRejectionsRequest(request);
+ NamedQueueGetResponse namedQueueGetResponse =
+ namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ List<RecentLogs.BalancerRejection> balancerRejections =
+ namedQueueGetResponse.getBalancerRejections();
+ return MasterProtos.BalancerRejectionsResponse.newBuilder()
+ .addAllBalancerRejection(balancerRejections).build();
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 598e9a3..a3e9c58 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -35,9 +35,11 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
+import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -129,6 +131,8 @@
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
+ private boolean isBalancerDecisionRecording = false;
+ private boolean isBalancerRejectionRecording = false;
private List<CandidateGenerator> candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
@@ -217,10 +221,15 @@
curFunctionCosts= new Double[costFunctions.size()];
tempFunctionCosts= new Double[costFunctions.size()];
- boolean isBalancerDecisionRecording = getConf()
+ isBalancerDecisionRecording = getConf()
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
- if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
+ isBalancerRejectionRecording = getConf()
+ .getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
+
+ if (this.namedQueueRecorder == null && (isBalancerDecisionRecording
+ || isBalancerRejectionRecording)) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
}
@@ -327,6 +336,10 @@
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
+ if (this.isBalancerRejectionRecording) {
+ sendRejectionReasonToRingBuffer("The number of RegionServers " +
+ cs.getNumServers() + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
+ }
return false;
}
if (areSomeRegionReplicasColocated(cluster)) {
@@ -355,6 +368,19 @@
boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
+ if(balanced && isBalancerRejectionRecording){
+ String reason = "";
+ if (total <= 0) {
+ reason = "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
+ } else if (sumMultiplier <= 0) {
+ reason = "sumMultiplier = " + sumMultiplier + " <= 0";
+ } else if ((total / sumMultiplier) < minCostNeedBalance) {
+ reason =
+ "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " + (total
+ / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
+ }
+ sendRejectionReasonToRingBuffer(reason, costFunctions);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
balanced ? "Skipping load balancing because balanced" : "We need to load balance",
@@ -488,9 +514,27 @@
return null;
}
+ private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions){
+ if (this.isBalancerRejectionRecording){
+ BalancerRejection.Builder builder =
+ new BalancerRejection.Builder()
+ .setReason(reason);
+ if (costFunctions != null) {
+ for (CostFunction c : costFunctions) {
+ float multiplier = c.getMultiplier();
+ if (multiplier <= 0 || !c.isNeeded()) {
+ continue;
+ }
+ builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
+ }
+ }
+ namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
+ }
+ }
+
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
- if (this.namedQueueRecorder != null) {
+ if (this.isBalancerDecisionRecording) {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
regionPlans.add(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerRejectionDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerRejectionDetails.java
new file mode 100644
index 0000000..4a663f8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerRejectionDetails.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.client.BalancerRejection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Balancer rejection details that would be passed on to ring buffer for history
+ */
+@InterfaceAudience.Private
+public class BalancerRejectionDetails extends NamedQueuePayload {
+
+ public static final int BALANCER_REJECTION_EVENT = 2;
+
+ private final BalancerRejection balancerRejection;
+
+ public BalancerRejectionDetails(BalancerRejection balancerRejection) {
+ super(BALANCER_REJECTION_EVENT);
+ this.balancerRejection = balancerRejection;
+ }
+
+ public BalancerRejection getBalancerRejection() {
+ return balancerRejection;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("balancerRejection", balancerRejection)
+ .toString();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index eff2df9..36e39b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -31,7 +31,8 @@
public enum NamedQueueEvent {
SLOW_LOG(0),
- BALANCE_DECISION(1);
+ BALANCE_DECISION(1),
+ BALANCE_REJECTION(2);
private final int value;
@@ -47,6 +48,9 @@
case 1: {
return BALANCE_DECISION;
}
+ case 2: {
+ return BALANCE_REJECTION;
+ }
default: {
throw new IllegalArgumentException(
"NamedQueue event with ordinal " + value + " not defined");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
new file mode 100644
index 0000000..6da7083
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.BalancerRejection;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
+import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.NamedQueueService;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+/**
+ * In-memory Queue service provider for Balancer Rejection events
+ */
+@InterfaceAudience.Private
+public class BalancerRejectionQueueService implements NamedQueueService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BalancerRejectionQueueService.class);
+
+ private final boolean isBalancerRejectionRecording;
+ private static final String BALANCER_REJECTION_QUEUE_SIZE =
+ "hbase.master.balancer.rejection.queue.size";
+ private static final int DEFAULT_BALANCER_REJECTION_QUEUE_SIZE = 250;
+
+ private final Queue<RecentLogs.BalancerRejection> balancerRejectionQueue;
+
+ public BalancerRejectionQueueService(Configuration conf) {
+ isBalancerRejectionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
+ if (!isBalancerRejectionRecording) {
+ balancerRejectionQueue = null;
+ return;
+ }
+ final int queueSize =
+ conf.getInt(BALANCER_REJECTION_QUEUE_SIZE, DEFAULT_BALANCER_REJECTION_QUEUE_SIZE);
+ final EvictingQueue<RecentLogs.BalancerRejection> evictingQueue =
+ EvictingQueue.create(queueSize);
+ balancerRejectionQueue = Queues.synchronizedQueue(evictingQueue);
+ }
+
+ @Override
+ public NamedQueuePayload.NamedQueueEvent getEvent() {
+ return NamedQueuePayload.NamedQueueEvent.BALANCE_REJECTION;
+ }
+
+ @Override
+ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+ if (!isBalancerRejectionRecording) {
+ return;
+ }
+ if (!(namedQueuePayload instanceof BalancerRejectionDetails)) {
+ LOG.warn(
+ "BalancerRejectionQueueService: NamedQueuePayload is not of type BalancerRejectionDetails.");
+ return;
+ }
+ BalancerRejectionDetails balancerRejectionDetails = (BalancerRejectionDetails) namedQueuePayload;
+ BalancerRejection balancerRejectionRecord =
+ balancerRejectionDetails.getBalancerRejection();
+ RecentLogs.BalancerRejection BalancerRejection = RecentLogs.BalancerRejection.newBuilder()
+ .setReason(balancerRejectionRecord.getReason())
+ .addAllCostFuncInfo(balancerRejectionRecord.getCostFuncInfoList())
+ .build();
+ balancerRejectionQueue.add(BalancerRejection);
+ }
+
+ @Override
+ public boolean clearNamedQueue() {
+ if (!isBalancerRejectionRecording) {
+ return false;
+ }
+ LOG.debug("Received request to clean up balancer rejection queue.");
+ balancerRejectionQueue.clear();
+ return true;
+ }
+
+ @Override
+ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ if (!isBalancerRejectionRecording) {
+ return null;
+ }
+ List<RecentLogs.BalancerRejection> balancerRejections =
+ Arrays.stream(balancerRejectionQueue.toArray(new RecentLogs.BalancerRejection[0]))
+ .collect(Collectors.toList());
+ // latest records should be displayed first, hence reverse order sorting
+ Collections.reverse(balancerRejections);
+ int limit = balancerRejections.size();
+ if (request.getBalancerRejectionsRequest().hasLimit()) {
+ limit = Math.min(request.getBalancerRejectionsRequest().getLimit(), balancerRejections.size());
+ }
+ // filter limit if provided
+ balancerRejections = balancerRejections.subList(0, limit);
+ final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse();
+ namedQueueGetResponse.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
+ namedQueueGetResponse.setBalancerRejections(balancerRejections);
+ return namedQueueGetResponse;
+ }
+
+ @Override
+ public void persistAll() {
+ // no-op for now
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
index 182cfd1..b4c5d7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
@@ -38,6 +38,7 @@
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
private MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest;
+ private MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
@@ -52,11 +53,20 @@
return balancerDecisionsRequest;
}
+ public MasterProtos.BalancerRejectionsRequest getBalancerRejectionsRequest() {
+ return balancerRejectionsRequest;
+ }
+
public void setBalancerDecisionsRequest(
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest) {
this.balancerDecisionsRequest = balancerDecisionsRequest;
}
+ public void setBalancerRejectionsRequest(
+ MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest) {
+ this.balancerRejectionsRequest = balancerRejectionsRequest;
+ }
+
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
@@ -71,6 +81,7 @@
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
.append("balancerDecisionsRequest", balancerDecisionsRequest)
+ .append("balancerRejectionsRequest", balancerRejectionsRequest)
.toString();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
index 224402a..0af300d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
@@ -34,6 +34,7 @@
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
private List<RecentLogs.BalancerDecision> balancerDecisions;
+ private List<RecentLogs.BalancerRejection> balancerRejections;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
@@ -52,6 +53,14 @@
this.balancerDecisions = balancerDecisions;
}
+ public List<RecentLogs.BalancerRejection> getBalancerRejections() {
+ return balancerRejections;
+ }
+
+ public void setBalancerRejections(List<RecentLogs.BalancerRejection> balancerRejections) {
+ this.balancerRejections = balancerRejections;
+ }
+
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
@@ -65,6 +74,7 @@
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
.append("balancerDecisions", balancerDecisions)
+ .append("balancerRejections", balancerRejections)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 36fdc77..1b045e4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -703,7 +703,10 @@
final boolean isBalancerDecisionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
- if (isBalancerDecisionRecording) {
+ final boolean isBalancerRejectionRecording = conf
+ .getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
+ if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java
new file mode 100644
index 0000000..778da56
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.LogEntry;
+import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
+
+/**
+ * Test BalancerRejection ring buffer using namedQueue interface
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestBalancerRejection extends StochasticBalancerTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBalancerRejection.class);
+
+ static class MockCostFunction extends CostFunction{
+ public static double mockCost;
+
+ public MockCostFunction(Configuration c) {
+ }
+
+ @Override
+ protected double cost() {
+ return mockCost;
+ }
+
+ @Override
+ boolean isNeeded() {
+ return super.isNeeded();
+ }
+
+ @Override
+ float getMultiplier() {
+ return 1;
+ }
+ }
+
+ @Test
+ public void testBalancerRejections() throws Exception{
+ try {
+ //enabled balancer rejection recording
+ conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true);
+ conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName());
+ loadBalancer.setConf(conf);
+ //Simulate 2 servers with 5 regions.
+ Map<ServerName, List<RegionInfo>> servers = mockClusterServers(new int[] { 5, 5 });
+ Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
+
+ //Reject case 1: Total cost < 0
+ MockCostFunction.mockCost = -Double.MAX_VALUE;
+ //Since the Balancer was rejected, there should not be any plans
+ Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
+
+ //Reject case 2: Cost < minCostNeedBalance
+ MockCostFunction.mockCost = 1;
+ conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", Float.MAX_VALUE);
+ loadBalancer.setConf(conf);
+ Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
+
+ //NamedQueue is an async Producer-consumer Pattern, waiting here until it completed
+ int maxWaitingCount = 10;
+ while (maxWaitingCount-- > 0 && getBalancerRejectionLogEntries().size() != 2) {
+ Thread.sleep(1000);
+ }
+ //There are two cases, should be 2 logEntries
+ List<LogEntry> logEntries = getBalancerRejectionLogEntries();
+ Assert.assertEquals(2, logEntries.size());
+ Assert.assertTrue(
+ logEntries.get(0).toJsonPrettyPrint().contains("minCostNeedBalance"));
+ Assert.assertTrue(
+ logEntries.get(1).toJsonPrettyPrint().contains("cost1*multiplier1"));
+ }finally {
+ conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
+ conf.unset(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED);
+ loadBalancer.setConf(conf);
+ }
+ }
+
+ private List<LogEntry> getBalancerRejectionLogEntries(){
+ NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
+ namedQueueGetRequest.setBalancerRejectionsRequest(MasterProtos.BalancerRejectionsRequest.getDefaultInstance());
+ NamedQueueGetResponse namedQueueGetResponse =
+ loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ List<RecentLogs.BalancerRejection> balancerRejections = namedQueueGetResponse.getBalancerRejections();
+ MasterProtos.BalancerRejectionsResponse response =
+ MasterProtos.BalancerRejectionsResponse.newBuilder()
+ .addAllBalancerRejection(balancerRejections)
+ .build();
+ List<LogEntry> balancerRejectionRecords =
+ ProtobufUtil.getBalancerRejectionEntries(response);
+ return balancerRejectionRecords;
+ }
+}
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 8b237d1..e02f919 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -1761,6 +1761,25 @@
end
#----------------------------------------------------------------------------------------------
+ # Retrieve latest balancer rejections made by LoadBalancers
+ def get_balancer_rejections(args)
+ if args.key? 'LIMIT'
+ limit = args['LIMIT']
+ else
+ limit = 250
+ end
+
+ log_type = 'BALANCER_REJECTION'
+ log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER
+ balancer_rejections_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil)
+ balancer_rejections_resp_arr = []
+ balancer_rejections_responses.each { |balancer_dec_resp|
+ balancer_rejections_resp_arr << balancer_dec_resp.toJsonPrettyPrint
+ }
+ balancer_rejections_resp_arr
+ end
+
+ #----------------------------------------------------------------------------------------------
# Stop the active Master
def stop_master
@admin.stopMaster
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 549e31d..2617a58 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -455,6 +455,7 @@
compaction_switch
flush
get_balancer_decisions
+ get_balancer_rejections
get_slowlog_responses
get_largelog_responses
major_compact
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_balancer_rejections.rb b/hbase-shell/src/main/ruby/shell/commands/get_balancer_rejections.rb
new file mode 100644
index 0000000..df04a2c
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/get_balancer_rejections.rb
@@ -0,0 +1,49 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Retrieve latest balancer rejections maintained in memory by HMaster
+
+module Shell
+ module Commands
+ # Retrieve latest large log responses
+ class GetBalancerRejections < Command
+ def help
+ <<-EOF
+Retrieve latest balancer rejections made by LoadBalancers.
+
+Examples:
+
+ hbase> get_balancer_rejections => Retrieve recent balancer rejections with
+ region plans
+ hbase> get_balancer_rejections LIMIT => 10 => Retrieve 10 most recent balancer rejections
+ with region plans
+
+ EOF
+ end
+
+ def command(args = {})
+ unless args.is_a? Hash
+ raise 'Filter parameters are not Hash'
+ end
+
+ balancer_rejections_resp_arr = admin.get_balancer_rejections(args)
+ puts 'Retrieved BalancerRejection Responses'
+ puts balancer_rejections_resp_arr
+ end
+ end
+ end
+end
diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc
index 8335914..388dbd2 100644
--- a/src/main/asciidoc/_chapters/hbase-default.adoc
+++ b/src/main/asciidoc/_chapters/hbase-default.adoc
@@ -2338,3 +2338,17 @@
.Default
`false`
+[[hbase.master.balancer.rejection.buffer.enabled]]
+*`hbase.master.balancer.rejection.buffer.enabled`*::
++
+.Description
+
+ Indicates whether active HMaster has ring buffer running for storing
+ balancer rejection in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config:
+ hbase.master.balancer.rejection.queue.size
+
++
+.Default
+`false`
+