blob: 6869d6a7f32fffe69116be74ee45a830b72a0aca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class TabletLocatorImpl extends TabletLocator {
private static final Logger log = LoggerFactory.getLogger(TabletLocatorImpl.class);
// MAX_TEXT represents a TEXT object that is greater than all others. Attempted to use null for
// this purpose, but there seems to be a bug in TreeMap.tailMap with null. Therefore instead of
// using null, created MAX_TEXT.
static final Text MAX_TEXT = new Text();
static final Comparator<Text> END_ROW_COMPARATOR = (o1, o2) -> {
if (o1 == o2)
return 0;
if (o1 == MAX_TEXT)
return 1;
if (o2 == MAX_TEXT)
return -1;
return o1.compareTo(o2);
};
protected TableId tableId;
protected TabletLocator parent;
protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<>(END_ROW_COMPARATOR);
protected TabletLocationObtainer locationObtainer;
private TabletServerLockChecker lockChecker;
protected Text lastTabletRow;
private TreeSet<KeyExtent> badExtents = new TreeSet<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rLock = rwLock.readLock();
private final Lock wLock = rwLock.writeLock();
public interface TabletLocationObtainer {
/**
* @return null when unable to read information successfully
*/
TabletLocations lookupTablet(ClientContext context, TabletLocation src, Text row, Text stopRow,
TabletLocator parent) throws AccumuloSecurityException, AccumuloException;
List<TabletLocation> lookupTablets(ClientContext context, String tserver,
Map<KeyExtent,List<Range>> map, TabletLocator parent)
throws AccumuloSecurityException, AccumuloException;
}
public interface TabletServerLockChecker {
boolean isLockHeld(String tserver, String session);
void invalidateCache(String server);
}
private class LockCheckerSession {
private HashSet<Pair<String,String>> okLocks = new HashSet<>();
private HashSet<Pair<String,String>> invalidLocks = new HashSet<>();
private TabletLocation checkLock(TabletLocation tl) {
// the goal of this class is to minimize calls out to lockChecker under that assumption that
// its a resource synchronized among many threads... want to
// avoid fine grained synchronization when binning lots of mutations or ranges... remember
// decisions from the lockChecker in thread local unsynchronized
// memory
if (tl == null)
return null;
Pair<String,String> lock = new Pair<>(tl.tablet_location, tl.tablet_session);
if (okLocks.contains(lock))
return tl;
if (invalidLocks.contains(lock))
return null;
if (lockChecker.isLockHeld(tl.tablet_location, tl.tablet_session)) {
okLocks.add(lock);
return tl;
}
if (log.isTraceEnabled())
log.trace("Tablet server {} {} no longer holds its lock", tl.tablet_location,
tl.tablet_session);
invalidLocks.add(lock);
return null;
}
}
public TabletLocatorImpl(TableId tableId, TabletLocator parent, TabletLocationObtainer tlo,
TabletServerLockChecker tslc) {
this.tableId = tableId;
this.parent = parent;
this.locationObtainer = tlo;
this.lockChecker = tslc;
this.lastTabletRow = new Text(tableId.canonical());
lastTabletRow.append(new byte[] {'<'}, 0, 1);
}
@Override
public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations,
Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
OpTimer timer = null;
if (log.isTraceEnabled()) {
log.trace("tid={} Binning {} mutations for table {}", Thread.currentThread().getId(),
mutations.size(), tableId);
timer = new OpTimer().start();
}
ArrayList<T> notInCache = new ArrayList<>();
Text row = new Text();
LockCheckerSession lcSession = new LockCheckerSession();
rLock.lock();
try {
processInvalidated(context, lcSession);
// for this to be efficient rows need to be in sorted order, but always sorting is slow...
// therefore only sort the
// stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order
// For this to be efficient, need to avoid fine grained synchronization and fine grained
// logging.
// Therefore methods called by this are not synchronized and should not log.
for (T mutation : mutations) {
row.set(mutation.getRow());
TabletLocation tl = locateTabletInCache(row);
if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession))
notInCache.add(mutation);
}
} finally {
rLock.unlock();
}
if (!notInCache.isEmpty()) {
notInCache.sort((o1, o2) -> WritableComparator.compareBytes(o1.getRow(), 0,
o1.getRow().length, o2.getRow(), 0, o2.getRow().length));
wLock.lock();
try {
boolean failed = false;
for (T mutation : notInCache) {
if (failed) {
// when one table does not return a location, something is probably
// screwy, go ahead and fail everything.
failures.add(mutation);
continue;
}
row.set(mutation.getRow());
TabletLocation tl = _locateTablet(context, row, false, false, false, lcSession);
if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) {
failures.add(mutation);
failed = true;
}
}
} finally {
wLock.unlock();
}
}
if (timer != null) {
timer.stop();
log.trace("tid={} Binned {} mutations for table {} to {} tservers in {}",
Thread.currentThread().getId(), mutations.size(), tableId, binnedMutations.size(),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
}
private <T extends Mutation> boolean addMutation(
Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl,
LockCheckerSession lcSession) {
TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location);
if (tsm == null) {
// do lock check once per tserver here to make binning faster
boolean lockHeld = lcSession.checkLock(tl) != null;
if (lockHeld) {
tsm = new TabletServerMutations<>(tl.tablet_session);
binnedMutations.put(tl.tablet_location, tsm);
} else {
return false;
}
}
// its possible the same tserver could be listed with different sessions
if (tsm.getSession().equals(tl.tablet_session)) {
tsm.addMutation(tl.tablet_extent, mutation);
return true;
}
return false;
}
private List<Range> binRanges(ClientContext context, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache,
LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
List<Range> failures = new ArrayList<>();
List<TabletLocation> tabletLocations = new ArrayList<>();
boolean lookupFailed = false;
l1: for (Range range : ranges) {
tabletLocations.clear();
Text startRow;
if (range.getStartKey() != null) {
startRow = range.getStartKey().getRow();
} else
startRow = new Text();
TabletLocation tl = null;
if (useCache)
tl = lcSession.checkLock(locateTabletInCache(startRow));
else if (!lookupFailed)
tl = _locateTablet(context, startRow, false, false, false, lcSession);
if (tl == null) {
failures.add(range);
if (!useCache)
lookupFailed = true;
continue;
}
tabletLocations.add(tl);
while (tl.tablet_extent.getEndRow() != null && !range
.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
if (useCache) {
Text row = new Text(tl.tablet_extent.getEndRow());
row.append(new byte[] {0}, 0, 1);
tl = lcSession.checkLock(locateTabletInCache(row));
} else {
tl = _locateTablet(context, tl.tablet_extent.getEndRow(), true, false, false, lcSession);
}
if (tl == null) {
failures.add(range);
if (!useCache)
lookupFailed = true;
continue l1;
}
tabletLocations.add(tl);
}
for (TabletLocation tl2 : tabletLocations) {
TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
}
}
return failures;
}
@Override
public List<Range> binRanges(ClientContext context, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
/*
* For this to be efficient, need to avoid fine grained synchronization and fine grained
* logging. Therefore methods called by this are not synchronized and should not log.
*/
OpTimer timer = null;
if (log.isTraceEnabled()) {
log.trace("tid={} Binning {} ranges for table {}", Thread.currentThread().getId(),
ranges.size(), tableId);
timer = new OpTimer().start();
}
LockCheckerSession lcSession = new LockCheckerSession();
List<Range> failures;
rLock.lock();
try {
processInvalidated(context, lcSession);
// for this to be optimal, need to look ranges up in sorted order when
// ranges are not present in cache... however do not want to always
// sort ranges... therefore try binning ranges using only the cache
// and sort whatever fails and retry
failures = binRanges(context, ranges, binnedRanges, true, lcSession);
} finally {
rLock.unlock();
}
if (!failures.isEmpty()) {
// sort failures by range start key
Collections.sort(failures);
// try lookups again
wLock.lock();
try {
failures = binRanges(context, failures, binnedRanges, false, lcSession);
} finally {
wLock.unlock();
}
}
if (timer != null) {
timer.stop();
log.trace("tid={} Binned {} ranges for table {} to {} tservers in {}",
Thread.currentThread().getId(), ranges.size(), tableId, binnedRanges.size(),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
return failures;
}
@Override
public void invalidateCache(KeyExtent failedExtent) {
wLock.lock();
try {
badExtents.add(failedExtent);
} finally {
wLock.unlock();
}
if (log.isTraceEnabled())
log.trace("Invalidated extent={}", failedExtent);
}
@Override
public void invalidateCache(Collection<KeyExtent> keySet) {
wLock.lock();
try {
badExtents.addAll(keySet);
} finally {
wLock.unlock();
}
if (log.isTraceEnabled())
log.trace("Invalidated {} cache entries for table {}", keySet.size(), tableId);
}
@Override
public void invalidateCache(ClientContext context, String server) {
int invalidatedCount = 0;
wLock.lock();
try {
for (TabletLocation cacheEntry : metaCache.values())
if (cacheEntry.tablet_location.equals(server)) {
badExtents.add(cacheEntry.tablet_extent);
invalidatedCount++;
}
} finally {
wLock.unlock();
}
lockChecker.invalidateCache(server);
if (log.isTraceEnabled())
log.trace("invalidated {} cache entries table={} server={}", invalidatedCount, tableId,
server);
}
@Override
public void invalidateCache() {
int invalidatedCount;
wLock.lock();
try {
invalidatedCount = metaCache.size();
metaCache.clear();
} finally {
wLock.unlock();
}
if (log.isTraceEnabled())
log.trace("invalidated all {} cache entries for table={}", invalidatedCount, tableId);
}
@Override
public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow,
boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
OpTimer timer = null;
if (log.isTraceEnabled()) {
log.trace("tid={} Locating tablet table={} row={} skipRow={} retry={}",
Thread.currentThread().getId(), tableId, TextUtil.truncate(row), skipRow, retry);
timer = new OpTimer().start();
}
while (true) {
LockCheckerSession lcSession = new LockCheckerSession();
TabletLocation tl = _locateTablet(context, row, skipRow, retry, true, lcSession);
if (retry && tl == null) {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
if (log.isTraceEnabled())
log.trace("Failed to locate tablet containing row {} in table {}, will retry...",
TextUtil.truncate(row), tableId);
continue;
}
if (timer != null) {
timer.stop();
log.trace("tid={} Located tablet {} at {} in {}", Thread.currentThread().getId(),
(tl == null ? "null" : tl.tablet_extent), (tl == null ? "null" : tl.tablet_location),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
return tl;
}
}
private void lookupTabletLocation(ClientContext context, Text row, boolean retry,
LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Text metadataRow = new Text(tableId.canonical());
metadataRow.append(new byte[] {';'}, 0, 1);
metadataRow.append(row.getBytes(), 0, row.getLength());
TabletLocation ptl = parent.locateTablet(context, metadataRow, false, retry);
if (ptl != null) {
TabletLocations locations =
locationObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent);
while (locations != null && locations.getLocations().isEmpty()
&& locations.getLocationless().isEmpty()) {
// try the next tablet, the current tablet does not have any tablets that overlap the row
Text er = ptl.tablet_extent.getEndRow();
if (er != null && er.compareTo(lastTabletRow) < 0) {
// System.out.println("er "+er+" ltr "+lastTabletRow);
ptl = parent.locateTablet(context, er, true, retry);
if (ptl != null)
locations =
locationObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent);
else
break;
} else {
break;
}
}
if (locations == null)
return;
// cannot assume the list contains contiguous key extents... so it is probably
// best to deal with each extent individually
Text lastEndRow = null;
for (TabletLocation tabletLocation : locations.getLocations()) {
KeyExtent ke = tabletLocation.tablet_extent;
TabletLocation locToCache;
// create new location if current prevEndRow == endRow
if ((lastEndRow != null) && (ke.getPrevEndRow() != null)
&& ke.getPrevEndRow().equals(lastEndRow)) {
locToCache =
new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow),
tabletLocation.tablet_location, tabletLocation.tablet_session);
} else {
locToCache = tabletLocation;
}
// save endRow for next iteration
lastEndRow = locToCache.tablet_extent.getEndRow();
updateCache(locToCache, lcSession);
}
}
}
private void updateCache(TabletLocation tabletLocation, LockCheckerSession lcSession) {
if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
// sanity check
throw new IllegalStateException(
"Unexpected extent returned " + tableId + " " + tabletLocation.tablet_extent);
}
if (tabletLocation.tablet_location == null) {
// sanity check
throw new IllegalStateException(
"Cannot add null locations to cache " + tableId + " " + tabletLocation.tablet_extent);
}
// clear out any overlapping extents in cache
removeOverlapping(metaCache, tabletLocation.tablet_extent);
// do not add to cache unless lock is held
if (lcSession.checkLock(tabletLocation) == null)
return;
// add it to cache
Text er = tabletLocation.tablet_extent.getEndRow();
if (er == null)
er = MAX_TEXT;
metaCache.put(er, tabletLocation);
if (!badExtents.isEmpty())
removeOverlapping(badExtents, tabletLocation.tablet_extent);
}
static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
Iterator<Entry<Text,TabletLocation>> iter = null;
if (nke.getPrevEndRow() == null) {
iter = metaCache.entrySet().iterator();
} else {
Text row = rowAfterPrevRow(nke);
SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
iter = tailMap.entrySet().iterator();
}
while (iter.hasNext()) {
Entry<Text,TabletLocation> entry = iter.next();
KeyExtent ke = entry.getValue().tablet_extent;
if (stopRemoving(nke, ke)) {
break;
}
iter.remove();
}
}
private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
return ke.getPrevEndRow() != null && nke.getEndRow() != null
&& ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
}
private static Text rowAfterPrevRow(KeyExtent nke) {
Text row = new Text(nke.getPrevEndRow());
row.append(new byte[] {0}, 0, 1);
return row;
}
static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
extents.remove(overlapping);
}
}
private TabletLocation locateTabletInCache(Text row) {
Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
if (entry != null) {
KeyExtent ke = entry.getValue().tablet_extent;
if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) {
return entry.getValue();
}
}
return null;
}
protected TabletLocation _locateTablet(ClientContext context, Text row, boolean skipRow,
boolean retry, boolean lock, LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (skipRow) {
row = new Text(row);
row.append(new byte[] {0}, 0, 1);
}
TabletLocation tl;
if (lock) {
rLock.lock();
try {
tl = processInvalidatedAndCheckLock(context, lcSession, row);
} finally {
rLock.unlock();
}
} else {
tl = processInvalidatedAndCheckLock(context, lcSession, row);
}
if (tl == null) {
// not in cache, so obtain info
if (lock) {
wLock.lock();
try {
tl = lookupTabletLocationAndCheckLock(context, row, retry, lcSession);
} finally {
wLock.unlock();
}
} else {
tl = lookupTabletLocationAndCheckLock(context, row, retry, lcSession);
}
}
return tl;
}
private TabletLocation lookupTabletLocationAndCheckLock(ClientContext context, Text row,
boolean retry, LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
lookupTabletLocation(context, row, retry, lcSession);
return lcSession.checkLock(locateTabletInCache(row));
}
private TabletLocation processInvalidatedAndCheckLock(ClientContext context,
LockCheckerSession lcSession, Text row)
throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
processInvalidated(context, lcSession);
return lcSession.checkLock(locateTabletInCache(row));
}
@SuppressFBWarnings(value = {"UL_UNRELEASED_LOCK", "UL_UNRELEASED_LOCK_EXCEPTION_PATH"},
justification = "locking is confusing, but probably correct")
private void processInvalidated(ClientContext context, LockCheckerSession lcSession)
throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
if (badExtents.isEmpty())
return;
final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
try {
if (!writeLockHeld) {
rLock.unlock();
wLock.lock();
if (badExtents.isEmpty())
return;
}
List<Range> lookups = new ArrayList<>(badExtents.size());
for (KeyExtent be : badExtents) {
lookups.add(be.toMetadataRange());
removeOverlapping(metaCache, be);
}
lookups = Range.mergeOverlapping(lookups);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
parent.binRanges(context, lookups, binnedRanges);
// randomize server order
ArrayList<String> tabletServers = new ArrayList<>(binnedRanges.keySet());
Collections.shuffle(tabletServers);
for (String tserver : tabletServers) {
List<TabletLocation> locations =
locationObtainer.lookupTablets(context, tserver, binnedRanges.get(tserver), parent);
for (TabletLocation tabletLocation : locations) {
updateCache(tabletLocation, lcSession);
}
}
} finally {
if (!writeLockHeld) {
rLock.lock();
wLock.unlock();
}
}
}
protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
String location, KeyExtent ke, Range range) {
binnedRanges.computeIfAbsent(location, k -> new HashMap<>())
.computeIfAbsent(ke, k -> new ArrayList<>()).add(range);
}
}