blob: 27eeee30fa395a1e67ff1d8bf637fcf72d017c94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.jetbrains.annotations.NotNull;
/**
* Concurrent pool of object based on ConcurrentLinkedDeque.
*/
public class ConcurrentStripedPool<E> implements Iterable<E> {
/** Stripe pools. */
private final ConcurrentLinkedQueue<E>[] stripePools;
/** Stripes count. */
private final int stripes;
/** Stripe pools size (calculates fast, optimistic and approximate). */
private AtomicInteger[] stripeSize;
/** Max pool size. */
private int maxPoolSize;
/**
* Constructor.
*
* @param stripes Count of stripes.
* @param maxPoolSize Max pool size.
*/
@SuppressWarnings("unchecked")
public ConcurrentStripedPool(int stripes, int maxPoolSize) {
this.stripes = stripes;
this.maxPoolSize = maxPoolSize;
stripePools = new ConcurrentLinkedQueue[stripes];
stripeSize = new AtomicInteger[stripes];
for (int i = 0; i < stripes; ++i) {
stripePools[i] = new ConcurrentLinkedQueue<>();
stripeSize[i] = new AtomicInteger();
}
}
/**
* Pushes an element onto the pool.
*
* @param e the element to push
* @throws NullPointerException if the specified element is null and this deque does not permit null elements
* @return {@code true} if the element is returned to the pool, {@code false} if the is no space at the pool.
*/
public boolean recycle(E e) {
int idx = (int)(Thread.currentThread().getId() % stripes);
if (stripeSize[idx].get() > maxPoolSize)
return false;
stripePools[idx].add(e);
stripeSize[idx].incrementAndGet();
return true;
}
/**
* Retrieves element from pool, or returns {@code null} if the pool is empty.
*
* @return the element of the pool, or {@code null} if the pool is empty.
*/
public E borrow() {
int idx = (int)(Thread.currentThread().getId() % stripes);
E r = stripePools[idx].poll();
if (r != null)
stripeSize[idx].decrementAndGet();
return r;
}
/**
* Performs the given action for each element of the pool until all elements have been processed or the action
* throws an exception. Exceptions thrown by the action are relayed to the caller.
*
* @param action The action to be performed for each element
* @throws NullPointerException if the specified action is null
*/
@Override public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
for (int i = 0; i < stripes; ++i)
stripePools[i].forEach(action);
}
/**
* Removes all of the elements from the pool..
*/
public void clear() {
for (int i = 0; i < stripes; ++i)
stripePools[i].clear();
}
/** {@inheritDoc} */
@Override public @NotNull Iterator<E> iterator() {
return new Iterator<E>() {
private int idx;
private Iterator<E> it = stripePools[idx].iterator();
@Override public boolean hasNext() {
while (true) {
if (it.hasNext())
return true;
if (++idx >= stripes)
break;
it = stripePools[idx].iterator();
}
return false;
}
@Override public E next() {
if (hasNext())
return it.next();
throw new NoSuchElementException();
}
};
}
/**
* Returns a sequential {@code Stream} of the pool.
*
* @return a sequential {@code Stream} over the elements iof the pool.
*/
public Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
/**
* @param size New max pool size.
*/
public void resize(int size) {
maxPoolSize = size;
}
}