blob: d157e864ade775e0c4c6d20915b27965b6d59320 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver.scan;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
import org.apache.accumulo.core.dataImpl.thrift.TKey;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
import org.apache.accumulo.tserver.tablet.TabletBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LookupTask extends ScanTask<MultiScanResult> {
private static final Logger log = LoggerFactory.getLogger(LookupTask.class);
private final long scanID;
public LookupTask(TabletHostingServer server, long scanID) {
super(server);
this.scanID = scanID;
}
@Override
public void run() {
MultiScanSession session = (MultiScanSession) server.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
try {
if (isCancelled() || session == null)
return;
if (!transitionToRunning())
return;
TableConfiguration acuTableConf = server.getTableConfiguration(session.threadPoolExtent);
long maxResultsSize = acuTableConf.getAsBytes(Property.TABLE_SCAN_MAXMEM);
Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser()
+ " Start: " + session.startTime + " Table: ");
long bytesAdded = 0;
long maxScanTime = 4000;
long startTime = System.currentTimeMillis();
// Copy entries in session.queries (HashMap) to queryQueue (LinkedList)
// to better control the order when unfinished ranges are returned
LinkedList<Pair<KeyExtent,List<Range>>> queryQueue = new LinkedList<>();
session.queries.entrySet()
.forEach(e -> queryQueue.addLast(new Pair<>(e.getKey(), e.getValue())));
List<KVEntry> results = new ArrayList<>();
Map<KeyExtent,List<Range>> failures = new HashMap<>();
List<KeyExtent> fullScans = new ArrayList<>();
KeyExtent partScan = null;
Key partNextKey = null;
boolean partNextKeyInclusive = false;
// check the time so that the read ahead thread is not monopolized
while (!queryQueue.isEmpty() && bytesAdded < maxResultsSize
&& (System.currentTimeMillis() - startTime) < maxScanTime) {
Pair<KeyExtent,List<Range>> extentRangePair = queryQueue.removeFirst();
KeyExtent extent = extentRangePair.getFirst();
List<Range> ranges = extentRangePair.getSecond();
session.queries.remove(extent);
// check that tablet server is serving requested tablet
TabletBase tablet = session.getTabletResolver().getTablet(extent);
if (tablet == null) {
failures.put(extent, ranges);
continue;
}
Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser()
+ " Start: " + session.startTime + " Tablet: " + extent);
LookupResult lookupResult;
try {
// do the following check to avoid a race condition
// between setting false below and the task being
// canceled
if (isCancelled())
interruptFlag.set(true);
// Create new List here to collect the results from this Tablet.lookup() call
// Ensures that the yield code in Tablet can only compare a yield position
// to the results from that call
List<KVEntry> tabletResults = new ArrayList<>();
lookupResult = tablet.lookup(ranges, tabletResults, session.scanParams,
maxResultsSize - bytesAdded, interruptFlag);
// Add results from this Tablet.lookup() to the accumulated results
results.addAll(tabletResults);
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
// the next
// lookup
interruptFlag.set(false);
} catch (IOException e) {
log.warn("lookup failed for tablet " + extent, e);
throw new RuntimeException(e);
}
bytesAdded += lookupResult.bytesAdded;
if (lookupResult.unfinishedRanges.isEmpty()) {
fullScans.add(extent);
// if this extent was previously saved but now completed, then reset these values
if (partScan != null && partScan.equals(extent)) {
partScan = null;
partNextKey = null;
partNextKeyInclusive = false;
}
} else {
if (lookupResult.closed) {
failures.put(extent, lookupResult.unfinishedRanges);
} else {
// add to beginning of queue so that we handle this extent and unfinished ranges next
// unfinished ranges will either be finished or returned as partially completed
queryQueue.addFirst(new Pair<>(extent, lookupResult.unfinishedRanges));
session.queries.put(extent, lookupResult.unfinishedRanges);
partScan = extent;
partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
}
}
}
long finishTime = System.currentTimeMillis();
session.totalLookupTime += (finishTime - startTime);
session.numEntries += results.size();
// convert everything to thrift before adding result
List<TKeyValue> retResults = new ArrayList<>();
for (KVEntry entry : results)
retResults
.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
// @formatter:off
Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().toThrift(),
entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
));
// @formatter:on
List<TKeyExtent> retFullScans =
fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
TKeyExtent retPartScan = null;
TKey retPartNextKey = null;
if (partScan != null) {
retPartScan = partScan.toThrift();
retPartNextKey = partNextKey.toThrift();
}
// add results to queue
addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan,
retPartNextKey, partNextKeyInclusive, !session.queries.isEmpty()));
} catch (IterationInterruptedException iie) {
if (!isCancelled()) {
log.warn("Iteration interrupted, when scan not cancelled", iie);
addResult(iie);
}
} catch (SampleNotPresentException e) {
addResult(e);
} catch (Exception e) {
log.warn("exception while doing multi-scan ", e);
addResult(e);
} finally {
Thread.currentThread().setName(oldThreadName);
runState.set(ScanRunState.FINISHED);
}
}
}