| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.util; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| |
| /** |
| * |
| * The <code>PoolMap</code> maps a key to a collection of values, the elements |
| * of which are managed by a pool. In effect, that collection acts as a shared |
| * pool of resources, access to which is closely controlled as per the semantics |
| * of the pool. |
| * |
| * <p> |
| * In case the size of the pool is set to a non-zero positive number, that is |
| * used to cap the number of resources that a pool may contain for any given |
| * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool. |
| * </p> |
| * |
| * @param <K> |
| * the type of the key to the resource |
| * @param <V> |
| * the type of the resource being pooled |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Evolving |
| public class PoolMap<K, V> implements Map<K, V> { |
| private PoolType poolType; |
| |
| private int poolMaxSize; |
| |
| private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>(); |
| |
| public PoolMap(PoolType poolType) { |
| this.poolType = poolType; |
| } |
| |
| public PoolMap(PoolType poolType, int poolMaxSize) { |
| this.poolType = poolType; |
| this.poolMaxSize = poolMaxSize; |
| } |
| |
| @Override |
| public V get(Object key) { |
| Pool<V> pool = pools.get(key); |
| return pool != null ? pool.get() : null; |
| } |
| |
| @Override |
| public V put(K key, V value) { |
| Pool<V> pool = pools.get(key); |
| if (pool == null) { |
| pools.put(key, pool = createPool()); |
| } |
| return pool != null ? pool.put(value) : null; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public V remove(Object key) { |
| Pool<V> pool = pools.remove(key); |
| if (pool != null) { |
| remove((K) key, pool.get()); |
| } |
| return null; |
| } |
| |
| public boolean remove(K key, V value) { |
| Pool<V> pool = pools.get(key); |
| boolean res = false; |
| if (pool != null) { |
| res = pool.remove(value); |
| if (res && pool.size() == 0) { |
| pools.remove(key); |
| } |
| } |
| return res; |
| } |
| |
| @Override |
| public Collection<V> values() { |
| Collection<V> values = new ArrayList<V>(); |
| for (Pool<V> pool : pools.values()) { |
| Collection<V> poolValues = pool.values(); |
| if (poolValues != null) { |
| values.addAll(poolValues); |
| } |
| } |
| return values; |
| } |
| |
| public Collection<V> values(K key) { |
| Collection<V> values = new ArrayList<V>(); |
| Pool<V> pool = pools.get(key); |
| if (pool != null) { |
| Collection<V> poolValues = pool.values(); |
| if (poolValues != null) { |
| values.addAll(poolValues); |
| } |
| } |
| return values; |
| } |
| |
| |
| @Override |
| public boolean isEmpty() { |
| return pools.isEmpty(); |
| } |
| |
| @Override |
| public int size() { |
| return pools.size(); |
| } |
| |
| public int size(K key) { |
| Pool<V> pool = pools.get(key); |
| return pool != null ? pool.size() : 0; |
| } |
| |
| @Override |
| public boolean containsKey(Object key) { |
| return pools.containsKey(key); |
| } |
| |
| @Override |
| public boolean containsValue(Object value) { |
| if (value == null) { |
| return false; |
| } |
| for (Pool<V> pool : pools.values()) { |
| if (value.equals(pool.get())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void putAll(Map<? extends K, ? extends V> map) { |
| for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { |
| put(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| @Override |
| public void clear() { |
| for (Pool<V> pool : pools.values()) { |
| pool.clear(); |
| } |
| pools.clear(); |
| } |
| |
| @Override |
| public Set<K> keySet() { |
| return pools.keySet(); |
| } |
| |
| @Override |
| public Set<Map.Entry<K, V>> entrySet() { |
| Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>(); |
| for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) { |
| final K poolKey = poolEntry.getKey(); |
| final Pool<V> pool = poolEntry.getValue(); |
| if (pool != null) { |
| for (final V poolValue : pool.values()) { |
| entries.add(new Map.Entry<K, V>() { |
| @Override |
| public K getKey() { |
| return poolKey; |
| } |
| |
| @Override |
| public V getValue() { |
| return poolValue; |
| } |
| |
| @Override |
| public V setValue(V value) { |
| return pool.put(value); |
| } |
| }); |
| } |
| } |
| } |
| return null; |
| } |
| |
| protected interface Pool<R> { |
| public R get(); |
| |
| public R put(R resource); |
| |
| public boolean remove(R resource); |
| |
| public void clear(); |
| |
| public Collection<R> values(); |
| |
| public int size(); |
| } |
| |
| public enum PoolType { |
| Reusable, ThreadLocal, RoundRobin; |
| |
| public static PoolType valueOf(String poolTypeName, |
| PoolType defaultPoolType, PoolType... allowedPoolTypes) { |
| PoolType poolType = PoolType.fuzzyMatch(poolTypeName); |
| if (poolType != null) { |
| boolean allowedType = false; |
| if (poolType.equals(defaultPoolType)) { |
| allowedType = true; |
| } else { |
| if (allowedPoolTypes != null) { |
| for (PoolType allowedPoolType : allowedPoolTypes) { |
| if (poolType.equals(allowedPoolType)) { |
| allowedType = true; |
| break; |
| } |
| } |
| } |
| } |
| if (!allowedType) { |
| poolType = null; |
| } |
| } |
| return (poolType != null) ? poolType : defaultPoolType; |
| } |
| |
| public static String fuzzyNormalize(String name) { |
| return name != null ? name.replaceAll("-", "").trim().toLowerCase() : ""; |
| } |
| |
| public static PoolType fuzzyMatch(String name) { |
| for (PoolType poolType : values()) { |
| if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) { |
| return poolType; |
| } |
| } |
| return null; |
| } |
| } |
| |
| protected Pool<V> createPool() { |
| switch (poolType) { |
| case Reusable: |
| return new ReusablePool<V>(poolMaxSize); |
| case RoundRobin: |
| return new RoundRobinPool<V>(poolMaxSize); |
| case ThreadLocal: |
| return new ThreadLocalPool<V>(); |
| } |
| return null; |
| } |
| |
| /** |
| * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds |
| * on the {@link LinkedList} class. It essentially allows resources to be |
| * checked out, at which point it is removed from this pool. When the resource |
| * is no longer required, it should be returned to the pool in order to be |
| * reused. |
| * |
| * <p> |
| * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of |
| * the pool is unbounded. Otherwise, it caps the number of consumers that can |
| * check out a resource from this pool to the (non-zero positive) value |
| * specified in {@link #maxSize}. |
| * </p> |
| * |
| * @param <R> |
| * the type of the resource |
| */ |
| @SuppressWarnings("serial") |
| public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> { |
| private int maxSize; |
| |
| public ReusablePool(int maxSize) { |
| this.maxSize = maxSize; |
| |
| } |
| |
| @Override |
| public R get() { |
| return poll(); |
| } |
| |
| @Override |
| public R put(R resource) { |
| if (super.size() < maxSize) { |
| add(resource); |
| } |
| return null; |
| } |
| |
| @Override |
| public Collection<R> values() { |
| return this; |
| } |
| } |
| |
| /** |
| * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which |
| * stores its resources in an {@link ArrayList}. It load-balances access to |
| * its resources by returning a different resource every time a given key is |
| * looked up. |
| * |
| * <p> |
| * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of |
| * the pool is unbounded. Otherwise, it caps the number of resources in this |
| * pool to the (non-zero positive) value specified in {@link #maxSize}. |
| * </p> |
| * |
| * @param <R> |
| * the type of the resource |
| * |
| */ |
| @SuppressWarnings("serial") |
| class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> { |
| private int maxSize; |
| private int nextResource = 0; |
| |
| public RoundRobinPool(int maxSize) { |
| this.maxSize = maxSize; |
| } |
| |
| @Override |
| public R put(R resource) { |
| if (super.size() < maxSize) { |
| add(resource); |
| } |
| return null; |
| } |
| |
| @Override |
| public R get() { |
| if (super.size() < maxSize) { |
| return null; |
| } |
| nextResource %= super.size(); |
| R resource = get(nextResource++); |
| return resource; |
| } |
| |
| @Override |
| public Collection<R> values() { |
| return this; |
| } |
| |
| } |
| |
| /** |
| * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that |
| * builds on the {@link ThreadLocal} class. It essentially binds the resource |
| * to the thread from which it is accessed. |
| * |
| * <p> |
| * Note that the size of the pool is essentially bounded by the number of threads |
| * that add resources to this pool. |
| * </p> |
| * |
| * @param <R> |
| * the type of the resource |
| */ |
| static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> { |
| private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>(); |
| |
| public ThreadLocalPool() { |
| } |
| |
| @Override |
| public R put(R resource) { |
| R previousResource = get(); |
| if (previousResource == null) { |
| AtomicInteger poolSize = poolSizes.get(this); |
| if (poolSize == null) { |
| poolSizes.put(this, poolSize = new AtomicInteger(0)); |
| } |
| poolSize.incrementAndGet(); |
| } |
| this.set(resource); |
| return previousResource; |
| } |
| |
| @Override |
| public void remove() { |
| super.remove(); |
| AtomicInteger poolSize = poolSizes.get(this); |
| if (poolSize != null) { |
| poolSize.decrementAndGet(); |
| } |
| } |
| |
| @Override |
| public int size() { |
| AtomicInteger poolSize = poolSizes.get(this); |
| return poolSize != null ? poolSize.get() : 0; |
| } |
| |
| @Override |
| public boolean remove(R resource) { |
| R previousResource = super.get(); |
| if (resource != null && resource.equals(previousResource)) { |
| remove(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public void clear() { |
| super.remove(); |
| } |
| |
| @Override |
| public Collection<R> values() { |
| List<R> values = new ArrayList<R>(); |
| values.add(get()); |
| return values; |
| } |
| } |
| } |