blob: a3b7816b5bac5a4d4ca81f0fb10973e602842650 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import static com.google.common.base.Preconditions.checkArgument;
import java.lang.ref.Cleaner.Cleanable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.cleaner.CleanerUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
private static final Logger log = LoggerFactory.getLogger(TabletServerBatchReader.class);
private static final AtomicInteger nextBatchReaderInstance = new AtomicInteger(1);
private final int batchReaderInstance = nextBatchReaderInstance.getAndIncrement();
private final TableId tableId;
private final int numThreads;
private final ThreadPoolExecutor queryThreadPool;
private final ClientContext context;
private final Authorizations authorizations;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Cleanable cleanable;
private ArrayList<Range> ranges = null;
public TabletServerBatchReader(ClientContext context, TableId tableId,
Authorizations authorizations, int numQueryThreads) {
this(context, BatchScanner.class, tableId, authorizations, numQueryThreads);
}
protected TabletServerBatchReader(ClientContext context, Class<?> scopeClass, TableId tableId,
Authorizations authorizations, int numQueryThreads) {
checkArgument(context != null, "context is null");
checkArgument(tableId != null, "tableId is null");
checkArgument(authorizations != null, "authorizations is null");
this.context = context;
this.authorizations = authorizations;
this.tableId = tableId;
this.numThreads = numQueryThreads;
queryThreadPool = ThreadPools.createFixedThreadPool(numQueryThreads,
"batch scanner " + batchReaderInstance + "-", false);
// Call shutdown on this thread pool in case the caller does not call close().
cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log);
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
// Shutdown the pool
queryThreadPool.shutdownNow();
// deregister the cleaner, will not call shutdownNow() because closed is now true
cleanable.clean();
}
}
@Override
public Authorizations getAuthorizations() {
return authorizations;
}
@Override
public void setRanges(Collection<Range> ranges) {
if (ranges == null || ranges.isEmpty()) {
throw new IllegalArgumentException("ranges must be non null and contain at least 1 range");
}
if (closed.get()) {
throw new IllegalStateException("batch reader closed");
}
this.ranges = new ArrayList<>(ranges);
}
@Override
public Iterator<Entry<Key,Value>> iterator() {
if (ranges == null) {
throw new IllegalStateException("ranges not set");
}
if (closed.get()) {
throw new IllegalStateException("batch reader closed");
}
return new TabletServerBatchReaderIterator(context, tableId, authorizations, ranges, numThreads,
queryThreadPool, this, timeOut);
}
}