blob: b907aa640093c126a40825029f931360331a8897 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
* does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
* dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
* degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
* while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
* <p>
* There are three key parameters:
* <p>
* 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
* threshold, the HotProtector will work, 100 by default
* <p>
* 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
* the same time.
* <p>
* 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
* prepare writing puts to a Store at the same time.
* <p>
* Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
* WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
* enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
* slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
* <p>
* This protector is enabled by default and could be turned off by setting
* hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
*/
@InterfaceAudience.Private
public class StoreHotnessProtector {
private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class);
private volatile int parallelPutToStoreThreadLimit;
private volatile int parallelPreparePutToStoreThreadLimit;
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
"hbase.region.store.parallel.put.limit";
public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
"hbase.region.store.parallel.prepare.put.multiplier";
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
"hbase.region.store.parallel.put.limit.min.column.count";
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
private final Region region;
public StoreHotnessProtector(Region region, Configuration conf) {
init(conf);
this.region = region;
}
public void init(Configuration conf) {
this.parallelPutToStoreThreadLimit =
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
this.parallelPutToStoreThreadLimitCheckMinColumnCount =
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
}
public void update(Configuration conf) {
init(conf);
preparePutToStoreMap.clear();
LOG.debug("update config: {}", this);
}
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
if (!isEnable()) {
return;
}
String tooBusyStore = null;
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
Store store = this.region.getStore(e.getKey());
if (store == null || e.getValue() == null) {
continue;
}
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
//cleared when changing the configuration.
int preparePutCount = preparePutToStoreMap
.computeIfAbsent(e.getKey(), key -> new AtomicInteger())
.incrementAndGet();
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
tooBusyStore = (tooBusyStore == null ?
store.getColumnFamilyName() :
tooBusyStore + "," + store.getColumnFamilyName());
}
if (LOG.isTraceEnabled()) {
LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
+ "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
}
}
}
if (tooBusyStore != null) {
String msg =
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
LOG.trace(msg);
throw new RegionTooBusyException(msg);
}
}
public void finish(Map<byte[], List<Cell>> familyMaps) {
if (!isEnable()) {
return;
}
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
Store store = this.region.getStore(e.getKey());
if (store == null || e.getValue() == null) {
continue;
}
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
// preparePutToStoreMap will be cleared when changing the configuration, so it may turn
// into a negative value. It will be not accuracy in a short time, it's a trade-off for
// performance.
if (counter != null && counter.decrementAndGet() < 0) {
counter.incrementAndGet();
}
}
}
}
public String toString() {
return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
+ this.parallelPutToStoreThreadLimit + " ; minColumnNum="
+ this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
+ this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
"enable" :
"disable");
}
public boolean isEnable() {
// feature is enabled when parallelPutToStoreThreadLimit > 0
return this.parallelPutToStoreThreadLimit > 0;
}
Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
return preparePutToStoreMap;
}
public static final long FIXED_SIZE =
ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
}