blob: 5cab5bd20444c747c41064626d39d93364aacebe [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This class delegates to the BlockingQueue but wraps all HRegions in
* compaction requests that hold the priority and the date requested.
*
* Implementation Note: With an elevation time of -1 there is the potential for
* starvation of the lower priority compaction requests as long as there is a
* constant stream of high priority requests.
*/
public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
/**
* This class represents a compaction request and holds the region, priority,
* and time submitted.
*/
private class CompactionRequest implements Comparable<CompactionRequest> {
private final HRegion r;
private final int p;
private final Date date;
public CompactionRequest(HRegion r, int p) {
this(r, p, null);
}
public CompactionRequest(HRegion r, int p, Date d) {
if (r == null) {
throw new NullPointerException("HRegion cannot be null");
}
if (d == null) {
d = new Date();
}
this.r = r;
this.p = p;
this.date = d;
}
/**
* This function will define where in the priority queue the request will
* end up. Those with the highest priorities will be first. When the
* priorities are the same it will It will first compare priority then date
* to maintain a FIFO functionality.
*
* <p>Note: The date is only accurate to the millisecond which means it is
* possible that two requests were inserted into the queue within a
* millisecond. When that is the case this function will break the tie
* arbitrarily.
*/
@Override
public int compareTo(CompactionRequest request) {
//NOTE: The head of the priority queue is the least element
if (this.equals(request)) {
return 0; //they are the same request
}
int compareVal;
compareVal = p - request.p; //compare priority
if (compareVal != 0) {
return compareVal;
}
compareVal = date.compareTo(request.date);
if (compareVal != 0) {
return compareVal;
}
//break the tie arbitrarily
return -1;
}
/** Gets the HRegion for the request */
HRegion getHRegion() {
return r;
}
/** Gets the priority for the request */
int getPriority() {
return p;
}
public String toString() {
return "regionName=" + r.getRegionNameAsString() +
", priority=" + p + ", date=" + date;
}
}
/** The actual blocking queue we delegate to */
protected final BlockingQueue<CompactionRequest> queue =
new PriorityBlockingQueue<CompactionRequest>();
/** Hash map of the HRegions contained within the Compaction Queue */
private final HashMap<HRegion, CompactionRequest> regionsInQueue =
new HashMap<HRegion, CompactionRequest>();
/** Creates a new PriorityCompactionQueue with no priority elevation time */
public PriorityCompactionQueue() {
LOG.debug("Create PriorityCompactionQueue");
}
/** If the region is not already in the queue it will add it and return a
* new compaction request object. If it is already present in the queue
* then it will return null.
* @param p If null it will use the default priority
* @return returns a compaction request if it isn't already in the queue
*/
protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
CompactionRequest queuedRequest = null;
CompactionRequest newRequest = new CompactionRequest(r, p);
synchronized (regionsInQueue) {
queuedRequest = regionsInQueue.get(r);
if (queuedRequest == null ||
newRequest.getPriority() < queuedRequest.getPriority()) {
LOG.trace("Inserting region in queue. " + newRequest);
regionsInQueue.put(r, newRequest);
} else {
LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
", requested: " + newRequest);
newRequest = null; // It is already present so don't add it
}
}
if (newRequest != null && queuedRequest != null) {
// Remove the lower priority request
queue.remove(queuedRequest);
}
return newRequest;
}
/** Removes the request from the regions in queue
* @param remove
*/
protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) {
if (remove == null) return null;
synchronized (regionsInQueue) {
CompactionRequest cr = null;
cr = regionsInQueue.remove(remove.getHRegion());
if (cr != null && !cr.equals(remove))
{
//Because we don't synchronize across both this.regionsInQueue and this.queue
//a rare race condition exists where a higher priority compaction request replaces
//the lower priority request in this.regionsInQueue but the lower priority request
//is taken off this.queue before the higher can be added to this.queue.
//So if we didn't remove what we were expecting we put it back on.
regionsInQueue.put(cr.getHRegion(), cr);
}
if (cr == null) {
LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion());
}
return cr;
}
}
public boolean add(HRegion e, int p) {
CompactionRequest request = this.addToRegionsInQueue(e, p);
if (request != null) {
boolean result = queue.add(request);
return result;
} else {
return false;
}
}
@Override
public boolean add(HRegion e) {
return add(e, e.getCompactPriority());
}
public boolean offer(HRegion e, int p) {
CompactionRequest request = this.addToRegionsInQueue(e, p);
return (request != null)? queue.offer(request): false;
}
@Override
public boolean offer(HRegion e) {
return offer(e, e.getCompactPriority());
}
public void put(HRegion e, int p) throws InterruptedException {
CompactionRequest request = this.addToRegionsInQueue(e, p);
if (request != null) {
queue.put(request);
}
}
@Override
public void put(HRegion e) throws InterruptedException {
put(e, e.getCompactPriority());
}
public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
throws InterruptedException {
CompactionRequest request = this.addToRegionsInQueue(e, p);
return (request != null)? queue.offer(request, timeout, unit): false;
}
@Override
public boolean offer(HRegion e, long timeout, TimeUnit unit)
throws InterruptedException {
return offer(e, e.getCompactPriority(), timeout, unit);
}
@Override
public HRegion take() throws InterruptedException {
CompactionRequest cr = queue.take();
if (cr != null) {
removeFromRegionsInQueue(cr);
return cr.getHRegion();
}
return null;
}
@Override
public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
CompactionRequest cr = queue.poll(timeout, unit);
if (cr != null) {
removeFromRegionsInQueue(cr);
return cr.getHRegion();
}
return null;
}
@Override
public boolean remove(Object r) {
if (r instanceof CompactionRequest) {
CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r);
if (cr != null) {
return queue.remove(cr);
}
}
return false;
}
@Override
public HRegion remove() {
CompactionRequest cr = queue.remove();
if (cr != null) {
removeFromRegionsInQueue(cr);
return cr.getHRegion();
}
return null;
}
@Override
public HRegion poll() {
CompactionRequest cr = queue.poll();
if (cr != null) {
removeFromRegionsInQueue(cr);
return cr.getHRegion();
}
return null;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
@Override
public boolean contains(Object r) {
if (r instanceof HRegion) {
synchronized (regionsInQueue) {
return regionsInQueue.containsKey((HRegion) r);
}
} else if (r instanceof CompactionRequest) {
return queue.contains(r);
}
return false;
}
@Override
public HRegion element() {
CompactionRequest cr = queue.element();
return (cr != null)? cr.getHRegion(): null;
}
@Override
public HRegion peek() {
CompactionRequest cr = queue.peek();
return (cr != null)? cr.getHRegion(): null;
}
@Override
public int size() {
return queue.size();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public void clear() {
regionsInQueue.clear();
queue.clear();
}
// Unimplemented methods, collection methods
@Override
public Iterator<HRegion> iterator() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean addAll(Collection<? extends HRegion> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public int drainTo(Collection<? super HRegion> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public int drainTo(Collection<? super HRegion> c, int maxElements) {
throw new UnsupportedOperationException("Not supported.");
}
}