blob: 1ebc9b114320248279d4c0cce88b45cd3d26cd49 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Creates multiple threads that write key/values into the */
public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
/**
* A temporary place to keep track of inserted/updated keys. This is written to by
* all writers and is drained on a separate thread that populates
* {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
* being inserted/updated. This queue is supposed to stay small.
*/
protected BlockingQueue<Long> wroteKeys;
/**
* This is the current key to be inserted/updated by any thread. Each thread does an
* atomic get and increment operation and inserts the current value.
*/
protected AtomicLong nextKeyToWrite = new AtomicLong();
/**
* The highest key in the contiguous range of keys .
*/
protected AtomicLong wroteUpToKey = new AtomicLong();
/** The sorted set of keys NOT inserted/updated by the writers */
protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
/**
* The total size of the temporary inserted/updated key set that have not yet lined
* up in a our contiguous sequence starting from startKey. Supposed to stay
* small.
*/
protected AtomicLong wroteKeyQueueSize = new AtomicLong();
/** Enable this if used in conjunction with a concurrent reader. */
protected boolean trackWroteKeys;
public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, String actionLetter) throws IOException {
super(dataGen, conf, tableName, actionLetter);
this.wroteKeys = createWriteKeysQueue(conf);
}
protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
return new ArrayBlockingQueue<>(10000);
}
@Override
public void start(long startKey, long endKey, int numThreads) throws IOException {
super.start(startKey, endKey, numThreads);
nextKeyToWrite.set(startKey);
wroteUpToKey.set(startKey - 1);
if (trackWroteKeys) {
new Thread(new WroteKeysTracker(),
"MultiThreadedWriterBase-WroteKeysTracker-" + System.currentTimeMillis()).start();
numThreadsWorking.incrementAndGet();
}
}
protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
cached = locator.getRegionLocation(rowKey, false);
real = locator.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null && real.getServerName() != null) {
if (cached != null && cached.getServerName() != null && real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
/**
* A thread that keeps track of the highest key in the contiguous range of
* inserted/updated keys.
*/
private class WroteKeysTracker implements Runnable {
@Override
public void run() {
Thread.currentThread().setName(getClass().getSimpleName());
try {
long expectedKey = startKey;
Queue<Long> sortedKeys = new PriorityQueue<>();
while (expectedKey < endKey) {
// Block until a new element is available.
Long k;
try {
k = wroteKeys.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Inserted key tracker thread interrupted", e);
break;
}
if (k == null) {
continue;
}
if (k == expectedKey) {
// Skip the "sorted key" queue and consume this key.
wroteUpToKey.set(k);
++expectedKey;
} else {
sortedKeys.add(k);
}
// See if we have a sequence of contiguous keys lined up.
while (!sortedKeys.isEmpty()
&& ((k = sortedKeys.peek()) == expectedKey)) {
sortedKeys.poll();
wroteUpToKey.set(k);
++expectedKey;
}
wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
}
} catch (Exception ex) {
LOG.error("Error in inserted/updaed key tracker", ex);
} finally {
numThreadsWorking.decrementAndGet();
}
}
}
public int getNumWriteFailures() {
return failedKeySet.size();
}
/**
* The max key until which all keys have been inserted/updated (successfully or not).
* @return the last key that we have inserted/updated all keys up to (inclusive)
*/
public long wroteUpToKey() {
return wroteUpToKey.get();
}
public boolean failedToWriteKey(long k) {
return failedKeySet.contains(k);
}
@Override
protected String progressInfo() {
StringBuilder sb = new StringBuilder();
appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
return sb.toString();
}
/**
* Used for a joint write/read workload. Enables tracking the last inserted/updated
* key, which requires a blocking queue and a consumer thread.
* @param enable whether to enable tracking the last inserted/updated key
*/
public void setTrackWroteKeys(boolean enable) {
trackWroteKeys = enable;
}
}