blob: 3ebbdce544b9f12d5949abbf891c16f4d39877f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* A simple append-only collection supporting an unbounded number of concurrent readers/writers,
* but a bounded number of items.
*
* @param <E>
*/
public class Accumulator<E>
{
private volatile int nextIndex;
private volatile int presentCount;
private final Object[] values;
private static final AtomicIntegerFieldUpdater<Accumulator> nextIndexUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, "nextIndex");
private static final AtomicIntegerFieldUpdater<Accumulator> presentCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, "presentCount");
public Accumulator(int size)
{
values = new Object[size];
}
/**
* Adds an item to the collection.
*
* Note it is not guaranteed to be visible on exiting the method, if another add was happening concurrently;
* it will be visible once all concurrent adds (which are non-blocking) complete, but it is not guaranteed
* that any size change occurs during the execution of any specific call.
*
* @param item add to collection
*/
public void add(E item)
{
int insertPos;
while (true)
{
insertPos = nextIndex;
if (insertPos >= values.length)
throw new IllegalStateException();
if (nextIndexUpdater.compareAndSet(this, insertPos, insertPos + 1))
break;
}
values[insertPos] = item;
// we then try to increase presentCount for each consecutive value that is visible after the current size;
// this should hopefully extend past us, but if it doesn't this behaviour means the lagging write will fix up
// our state for us.
//
// we piggyback off presentCountUpdater to get volatile write semantics for our update to values
boolean volatileWrite = false;
while (true)
{
int cur = presentCount;
if (cur != insertPos && (cur == values.length || values[cur] == null))
{
// ensure our item has been made visible before aborting
if (!volatileWrite && cur < insertPos && !presentCountUpdater.compareAndSet(this, cur, cur))
{
// if we fail to CAS it means an older write has completed, and may have not fixed us up
// due to our write not being visible
volatileWrite = true;
continue;
}
return;
}
presentCountUpdater.compareAndSet(this, cur, cur + 1);
volatileWrite = true;
}
}
public boolean isEmpty()
{
return presentCount == 0;
}
/**
* @return the size of guaranteed-to-be-visible portion of the list
*/
public int size()
{
return presentCount;
}
public int capacity()
{
return values.length;
}
private Iterator<E> iterator(int count)
{
return new Iterator<E>()
{
int p = 0;
public boolean hasNext()
{
return p < count;
}
public E next()
{
return (E) values[p++];
}
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
public E get(int i)
{
// we read presentCount to guarantee a volatile read of values
if (i >= presentCount)
throw new IndexOutOfBoundsException();
return (E) values[i];
}
public Collection<E> snapshot()
{
int count = presentCount;
return new AbstractCollection<E>()
{
@Override
public Iterator<E> iterator()
{
return Accumulator.this.iterator(count);
}
@Override
public int size()
{
return count;
}
};
}
/**
* Removes element at the speficied index from this accumulator.
*
* This method is not thread-safe when used concurrently with {@link #add(Object)}.
*/
public void clearUnsafe(int i)
{
values[i] = null;
}
}