blob: e45e4743764ff368c3d915096f3f65ae27ee6261 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.htrace.wrappers.TraceRunnable;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
private static final Logger log = LoggerFactory.getLogger(TabletServerBatchReaderIterator.class);
private final ClientContext context;
private final TableId tableId;
private Authorizations authorizations = Authorizations.EMPTY;
private final int numThreads;
private final ExecutorService queryThreadPool;
private final ScannerOptions options;
private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
private Iterator<Entry<Key,Value>> batchIterator;
private List<Entry<Key,Value>> batch;
private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<>();
private final Object nextLock = new Object();
private long failSleepTime = 100;
private volatile Throwable fatalException = null;
private Map<String,TimeoutTracker> timeoutTrackers;
private Set<String> timedoutServers;
private final long timeout;
private TabletLocator locator;
public interface ResultReceiver {
void receive(List<Entry<Key,Value>> entries);
}
public TabletServerBatchReaderIterator(ClientContext context, TableId tableId,
Authorizations authorizations, ArrayList<Range> ranges, int numThreads,
ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
this.context = context;
this.tableId = tableId;
this.authorizations = authorizations;
this.numThreads = numThreads;
this.queryThreadPool = queryThreadPool;
this.options = new ScannerOptions(scannerOptions);
resultsQueue = new ArrayBlockingQueue<>(numThreads);
this.locator = new TimeoutTabletLocator(timeout, context, tableId);
timeoutTrackers = Collections.synchronizedMap(new HashMap<>());
timedoutServers = Collections.synchronizedSet(new HashSet<>());
this.timeout = timeout;
if (!options.fetchedColumns.isEmpty()) {
ArrayList<Range> ranges2 = new ArrayList<>(ranges.size());
for (Range range : ranges) {
ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
}
ranges = ranges2;
}
ResultReceiver rr = entries -> {
try {
resultsQueue.put(entries);
} catch (InterruptedException e) {
if (TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown())
log.debug("Failed to add Batch Scan result", e);
else
log.warn("Failed to add Batch Scan result", e);
fatalException = e;
throw new RuntimeException(e);
}
};
try {
lookup(ranges, rr);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException("Failed to create iterator", e);
}
}
@Override
public boolean hasNext() {
synchronized (nextLock) {
if (batch == LAST_BATCH)
return false;
if (batch != null && batchIterator.hasNext())
return true;
// don't have one cached, try to cache one and return success
try {
batch = null;
while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
batch = resultsQueue.poll(1, TimeUnit.SECONDS);
if (fatalException != null)
if (fatalException instanceof RuntimeException)
throw (RuntimeException) fatalException;
else
throw new RuntimeException(fatalException);
if (queryThreadPool.isShutdown()) {
String shortMsg =
"The BatchScanner was unexpectedly closed while" + " this Iterator was still in use.";
log.error("{} Ensure that a reference to the BatchScanner is retained"
+ " so that it can be closed when this Iterator is exhausted. Not"
+ " retaining a reference to the BatchScanner guarantees that you are"
+ " leaking threads in your client JVM.", shortMsg);
throw new RuntimeException(shortMsg + " Ensure proper handling of the BatchScanner.");
}
batchIterator = batch.iterator();
return batch != LAST_BATCH;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public Entry<Key,Value> next() {
// if there's one waiting, or hasNext() can get one, return it
synchronized (nextLock) {
if (hasNext())
return batchIterator.next();
else
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private synchronized void lookup(List<Range> ranges, ResultReceiver receiver)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
List<Column> columns = new ArrayList<>(options.fetchedColumns);
ranges = Range.mergeOverlapping(ranges);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
binRanges(locator, ranges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
private void binRanges(TabletLocator tabletLocator, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
int lastFailureSize = Integer.MAX_VALUE;
while (true) {
binnedRanges.clear();
List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
if (failures.isEmpty()) {
break;
} else {
// tried to only do table state checks when failures.size() == ranges.size(), however this
// did
// not work because nothing ever invalidated entries in the tabletLocator cache... so even
// though
// the table was deleted the tablet locator entries for the deleted table were not
// cleared... so
// need to always do the check when failures occur
if (failures.size() >= lastFailureSize)
if (!Tables.exists(context, tableId))
throw new TableDeletedException(tableId.canonical());
else if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
lastFailureSize = failures.size();
if (log.isTraceEnabled())
log.trace("Failed to bin {} ranges, tablet locations were null, retrying in 100ms",
failures.size());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// truncate the ranges to within the tablets... this makes it easier to know what work
// needs to be redone when failures occurs and tablets have merged or split
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
Map<KeyExtent,List<Range>> tabletMap = new HashMap<>();
binnedRanges2.put(entry.getKey(), tabletMap);
for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) {
Range tabletRange = tabletRanges.getKey().toDataRange();
List<Range> clippedRanges = new ArrayList<>();
tabletMap.put(tabletRanges.getKey(), clippedRanges);
for (Range range : tabletRanges.getValue())
clippedRanges.add(tabletRange.clip(range));
}
}
binnedRanges.clear();
binnedRanges.putAll(binnedRanges2);
}
private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver,
List<Column> columns)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (log.isTraceEnabled())
log.trace("Failed to execute multiscans against {} tablets, retrying...", failures.size());
try {
Thread.sleep(failSleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// We were interrupted (close called on batchscanner) just exit
log.debug("Exiting failure processing on interrupt");
return;
}
failSleepTime = Math.min(5000, failSleepTime * 2);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
List<Range> allRanges = new ArrayList<>();
for (List<Range> ranges : failures.values())
allRanges.addAll(ranges);
// since the first call to binRanges clipped the ranges to within a tablet, we should not get
// only
// bin to the set of failed tablets
binRanges(locator, allRanges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
private String getTableInfo() {
return Tables.getPrintableTableInfoFromId(context, tableId);
}
private class QueryTask implements Runnable {
private String tsLocation;
private Map<KeyExtent,List<Range>> tabletsRanges;
private ResultReceiver receiver;
private Semaphore semaphore = null;
private final Map<KeyExtent,List<Range>> failures;
private List<Column> columns;
private int semaphoreSize;
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) {
this.tsLocation = tsLocation;
this.tabletsRanges = tabletsRanges;
this.receiver = receiver;
this.columns = columns;
this.failures = failures;
}
void setSemaphore(Semaphore semaphore, int semaphoreSize) {
this.semaphore = semaphore;
this.semaphoreSize = semaphoreSize;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
Thread.currentThread()
.setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation);
Map<KeyExtent,List<Range>> unscanned = new HashMap<>();
Map<KeyExtent,List<Range>> tsFailures = new HashMap<>();
try {
TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
if (timeoutTracker == null) {
timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
timeoutTrackers.put(tsLocation, timeoutTracker);
}
doLookup(context, tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns,
options, authorizations, timeoutTracker);
if (!tsFailures.isEmpty()) {
locator.invalidateCache(tsFailures.keySet());
synchronized (failures) {
failures.putAll(tsFailures);
}
}
} catch (IOException e) {
if (!TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) {
synchronized (failures) {
failures.putAll(tsFailures);
failures.putAll(unscanned);
}
locator.invalidateCache(context, tsLocation);
}
log.debug("IOException thrown", e);
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug("AccumuloSecurityException thrown", e);
Tables.clearCache(context);
if (Tables.exists(context, tableId))
fatalException = e;
else
fatalException = new TableDeletedException(tableId.canonical());
} catch (SampleNotPresentException e) {
fatalException = e;
} catch (Exception t) {
if (queryThreadPool.isShutdown())
log.debug("Caught exception, but queryThreadPool is shutdown", t);
else
log.warn("Caught exception, but queryThreadPool is not shutdown", t);
fatalException = t;
} catch (Throwable t) {
fatalException = t;
throw t; // let uncaught exception handler deal with the Error
} finally {
semaphore.release();
Thread.currentThread().setName(threadName);
if (semaphore.tryAcquire(semaphoreSize)) {
// finished processing all queries
if (fatalException == null && !failures.isEmpty()) {
// there were some failures
try {
processFailures(failures, receiver, columns);
} catch (TableNotFoundException | AccumuloException e) {
log.debug("{}", e.getMessage(), e);
fatalException = e;
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug("{}", e.getMessage(), e);
fatalException = e;
} catch (Exception t) {
log.debug("{}", t.getMessage(), t);
fatalException = t;
}
if (fatalException != null) {
// we are finished with this batch query
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug(
"Could not add to result queue after seeing fatalException in processFailures",
fatalException);
}
}
} else {
// we are finished with this batch query
if (fatalException != null) {
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug("Could not add to result queue after seeing fatalException",
fatalException);
}
} else {
try {
resultsQueue.put(LAST_BATCH);
} catch (InterruptedException e) {
fatalException = e;
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug("Could not add to result queue after seeing fatalException",
fatalException);
}
}
}
}
}
}
}
}
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
final ResultReceiver receiver, List<Column> columns) {
if (timedoutServers.containsAll(binnedRanges.keySet())) {
// all servers have timed out
throw new TimedOutException(timedoutServers);
}
// when there are lots of threads and a few tablet servers
// it is good to break request to tablet servers up, the
// following code determines if this is the case
int maxTabletsPerRequest = Integer.MAX_VALUE;
if (numThreads / binnedRanges.size() > 1) {
int totalNumberOfTablets = 0;
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
totalNumberOfTablets += entry.getValue().size();
}
maxTabletsPerRequest = totalNumberOfTablets / numThreads;
if (maxTabletsPerRequest == 0) {
maxTabletsPerRequest = 1;
}
}
Map<KeyExtent,List<Range>> failures = new HashMap<>();
if (!timedoutServers.isEmpty()) {
// go ahead and fail any timed out servers
for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator =
binnedRanges.entrySet().iterator(); iterator.hasNext();) {
Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
if (timedoutServers.contains(entry.getKey())) {
failures.putAll(entry.getValue());
iterator.remove();
}
}
}
// randomize tabletserver order... this will help when there are multiple
// batch readers and writers running against accumulo
List<String> locations = new ArrayList<>(binnedRanges.keySet());
Collections.shuffle(locations);
List<QueryTask> queryTasks = new ArrayList<>();
for (final String tsLocation : locations) {
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
queryTasks.add(queryTask);
} else {
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
tabletSubset.put(entry.getKey(), entry.getValue());
if (tabletSubset.size() >= maxTabletsPerRequest) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
queryTasks.add(queryTask);
tabletSubset = new HashMap<>();
}
}
if (!tabletSubset.isEmpty()) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
queryTasks.add(queryTask);
}
}
}
final Semaphore semaphore = new Semaphore(queryTasks.size());
semaphore.acquireUninterruptibly(queryTasks.size());
for (QueryTask queryTask : queryTasks) {
queryTask.setSemaphore(semaphore, queryTasks.size());
queryThreadPool.execute(new TraceRunnable(queryTask));
}
}
static void trackScanning(Map<KeyExtent,List<Range>> failures,
Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
// translate returned failures, remove them from unscanned, and add them to failures
// @formatter:off
Map<KeyExtent, List<Range>> retFailures = scanResult.failures.entrySet().stream().collect(Collectors.toMap(
(entry) -> KeyExtent.fromThrift(entry.getKey()),
(entry) -> entry.getValue().stream().map(Range::new).collect(Collectors.toList())
));
// @formatter:on
unscanned.keySet().removeAll(retFailures.keySet());
failures.putAll(retFailures);
// translate full scans and remove them from unscanned
Set<KeyExtent> fullScans =
scanResult.fullScans.stream().map(KeyExtent::fromThrift).collect(Collectors.toSet());
unscanned.keySet().removeAll(fullScans);
// remove partial scan from unscanned
if (scanResult.partScan != null) {
KeyExtent ke = KeyExtent.fromThrift(scanResult.partScan);
Key nextKey = new Key(scanResult.partNextKey);
ListIterator<Range> iterator = unscanned.get(ke).listIterator();
while (iterator.hasNext()) {
Range range = iterator.next();
if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey())
&& scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
iterator.remove();
} else if (range.contains(nextKey)) {
iterator.remove();
Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(),
range.isEndKeyInclusive());
iterator.add(partRange);
}
}
}
}
private static class TimeoutTracker {
String server;
Set<String> badServers;
long timeOut;
long activityTime;
Long firstErrorTime = null;
TimeoutTracker(String server, Set<String> badServers, long timeOut) {
this(timeOut);
this.server = server;
this.badServers = badServers;
}
TimeoutTracker(long timeOut) {
this.timeOut = timeOut;
}
void startingScan() {
activityTime = System.currentTimeMillis();
}
void check() throws IOException {
if (System.currentTimeMillis() - activityTime > timeOut) {
badServers.add(server);
throw new IOException(
"Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
}
}
void madeProgress() {
activityTime = System.currentTimeMillis();
firstErrorTime = null;
}
void errorOccured() {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
} else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
badServers.add(server);
}
}
/**
*/
public long getTimeOut() {
return timeOut;
}
}
public static void doLookup(ClientContext context, String server,
Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns,
ScannerOptions options, Authorizations authorizations)
throws IOException, AccumuloSecurityException, AccumuloServerException {
doLookup(context, server, requested, failures, unscanned, receiver, columns, options,
authorizations, new TimeoutTracker(Long.MAX_VALUE));
}
static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested,
Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
ResultReceiver receiver, List<Column> columns, ScannerOptions options,
Authorizations authorizations, TimeoutTracker timeoutTracker)
throws IOException, AccumuloSecurityException, AccumuloServerException {
if (requested.isEmpty()) {
return;
}
// copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
ArrayList<Range> ranges = new ArrayList<>();
for (Range range : entry.getValue()) {
ranges.add(new Range(range));
}
unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
}
timeoutTracker.startingScan();
TTransport transport = null;
try {
final HostAndPort parsedServer = HostAndPort.fromString(server);
final TabletClientService.Client client;
if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis())
client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(parsedServer, context);
try {
OpTimer timer = null;
if (log.isTraceEnabled()) {
log.trace(
"tid={} Starting multi scan, tserver={} #tablets={} #ranges={} ssil={} ssio={}",
Thread.currentThread().getId(), server, requested.size(),
sumSizes(requested.values()), options.serverSideIteratorList,
options.serverSideIteratorOptions);
timer = new OpTimer().start();
}
TabletType ttype = TabletType.type(requested.keySet());
boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
// @formatter:off
Map<TKeyExtent, List<TRange>> thriftTabletRanges = requested.entrySet().stream().collect(Collectors.toMap(
(entry) -> entry.getKey().toThrift(),
(entry) -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
));
// @formatter:on
Map<String,String> execHints =
options.executionHints.isEmpty() ? null : options.executionHints;
InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(),
thriftTabletRanges, columns.stream().map(Column::toThrift).collect(Collectors.toList()),
options.serverSideIteratorList, options.serverSideIteratorOptions,
ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
options.batchTimeOut, options.classLoaderContext, execHints);
if (waitForWrites)
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
MultiScanResult scanResult = imsr.result;
if (timer != null) {
timer.stop();
log.trace("tid={} Got 1st multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), scanResult.results.size(),
(scanResult.more ? "scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
ArrayList<Entry<Key,Value>> entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
}
if (!entries.isEmpty())
receiver.receive(entries);
if (!entries.isEmpty() || !scanResult.fullScans.isEmpty())
timeoutTracker.madeProgress();
trackScanning(failures, unscanned, scanResult);
AtomicLong nextOpid = new AtomicLong();
while (scanResult.more) {
timeoutTracker.check();
if (timer != null) {
log.trace("tid={} oid={} Continuing multi scan, scanid={}",
Thread.currentThread().getId(), nextOpid.get(), imsr.scanID);
timer.reset().start();
}
scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID);
if (timer != null) {
timer.stop();
log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), nextOpid.getAndIncrement(),
scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
}
if (!entries.isEmpty())
receiver.receive(entries);
if (!entries.isEmpty() || !scanResult.fullScans.isEmpty())
timeoutTracker.madeProgress();
trackScanning(failures, unscanned, scanResult);
}
client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
} finally {
ThriftUtil.returnClient(client);
}
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
timeoutTracker.errorOccured();
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TApplicationException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloServerException(server, e);
} catch (NoSuchScanIDException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new IOException(e);
} catch (TSampleNotPresentException e) {
log.debug("Server : " + server + " msg : " + e.getMessage(), e);
String tableInfo = "?";
if (e.getExtent() != null) {
TableId tableId = KeyExtent.fromThrift(e.getExtent()).tableId();
tableInfo = Tables.getPrintableTableInfoFromId(context, tableId);
}
String message = "Table " + tableInfo + " does not have sampling configured or built";
throw new SampleNotPresentException(message, e);
} catch (TException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
timeoutTracker.errorOccured();
throw new IOException(e);
} finally {
ThriftTransportPool.getInstance().returnTransport(transport);
}
}
static int sumSizes(Collection<List<Range>> values) {
int sum = 0;
for (List<Range> list : values) {
sum += list.size();
}
return sum;
}
}