blob: 7cfeb9829e48511d1d9f63b8aa5f018e7b8f8979 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.S;
import static java.nio.ByteBuffer.allocate;
import static java.nio.ByteBuffer.allocateDirect;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.MAPPED;
/**
* Segmented ring byte buffer that represents multi producer/single consumer queue that can be used by multiple writer
* threads and one reader thread.
*/
public class SegmentedRingByteBuffer {
/** Open mask. */
private static final long OPEN_MASK = 0x7FFFFFFFFFFFFFFFL;
/** Close mask. */
private static final long CLOSE_MASK = 0x8000000000000000L;
/** Tail field atomic updater. */
private static final AtomicLongFieldUpdater<SegmentedRingByteBuffer> TAIL_UPD =
AtomicLongFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "tail");
/** Producers count field atomic updater. */
private static final AtomicIntegerFieldUpdater<SegmentedRingByteBuffer> PRODUCERS_CNT_UPD =
AtomicIntegerFieldUpdater.newUpdater(SegmentedRingByteBuffer.class, "producersCnt");
/** Capacity. */
private final int cap;
/** Direct. */
private final BufferMode mode;
/** Buffer. */
public final ByteBuffer buf;
/** Max segment size. */
private final long maxSegmentSize;
/** Head. */
private volatile long head;
/** Tail. */
private volatile long tail;
/**
* Producers count. Uses by consumer in order to wait for ending of data writing by all producers.
*/
private volatile int producersCnt;
/**
* Wait for consumer flag. Prevents producers from writing data to the ring buffer while consumer waiting for finish
* of all already writing producers.
*/
private volatile boolean waitForConsumer;
/** Metrics. */
private final DataStorageMetricsImpl metrics;
/**
* Creates ring buffer with given capacity.
*
* @param cap Buffer's capacity.
* @param maxSegmentSize Max segment size.
* @param mode Buffer mode.
*/
public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode) {
this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, null);
}
/**
* Creates ring buffer with given capacity.
*
* @param cap Buffer's capacity.
* @param maxSegmentSize Max segment size.
* @param mode Buffer mode.
* @param metrics Metrics.
*/
public SegmentedRingByteBuffer(int cap, long maxSegmentSize, BufferMode mode, DataStorageMetricsImpl metrics) {
this(cap, maxSegmentSize, mode == DIRECT ? allocateDirect(cap) : allocate(cap), mode, metrics);
}
/**
* Creates ring buffer with given capacity which mapped to file.
*
* @param buf {@link MappedByteBuffer} instance.
* @param metrics Metrics.
*/
public SegmentedRingByteBuffer(MappedByteBuffer buf, DataStorageMetricsImpl metrics) {
this(buf.capacity(), buf.capacity(), buf, MAPPED, metrics);
}
/**
* @param cap Capacity.
* @param maxSegmentSize Max segment size.
* @param buf Buffer.
* @param mode Mode.
* @param metrics Metrics.
*/
private SegmentedRingByteBuffer(
int cap,
long maxSegmentSize,
ByteBuffer buf,
BufferMode mode,
DataStorageMetricsImpl metrics
) {
this.cap = cap;
this.mode = mode;
this.buf = buf;
this.buf.order(ByteOrder.nativeOrder());
this.maxSegmentSize = maxSegmentSize;
this.metrics = metrics;
}
/**
* Performs initialization of ring buffer state.
*
* @param pos Position.
*/
public void init(long pos) {
head = pos;
tail = pos;
}
/**
* Returns buffer mode.
*
* @return Buffer mode.
*/
public BufferMode mode() {
return mode;
}
/**
* Returns actual buffer tail.
*
* @return Buffer tail.
*/
public long tail() {
return tail & SegmentedRingByteBuffer.OPEN_MASK;
}
/**
* Reserves {@code size} bytes in {@code SegmentedRingByteBuffer} and returns instance of {@link WriteSegment}
* class that points to wrapped {@link ByteBuffer} instance with corresponding capacity. This {@link ByteBuffer}
* instance should be used only for data writing to {@link SegmentedRingByteBuffer}.
* <p>
* Returned result can be {@code null} in case of requested amount of bytes greater then available space
* in {@code SegmentedRingByteBuffer}. Also {@link WriteSegment#buffer()} can return {@code null} in case of
* {@link #maxSegmentSize} value is exceeded. In this case buffer will be closed in order to prevent any
* concurrent threads from trying of reserve new segment.
* <p>
* This method can be invoked by many producer threads and each producer will get own {@link ByteBuffer} instance
* that mapped to own {@link SegmentedRingByteBuffer} slice.
* <p>
* Once the data has been written into the {@link ByteBuffer} client code must notify
* {@code SegmentedRingByteBuffer} instance using {@link WriteSegment#release()} method in order to provide
* possibility to consumer get data for reading.
*
* @param size Amount of bytes for reserve.
* @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
* {@code null} if buffer space is not enough.
*/
public WriteSegment offer(int size) {
return offer0(size, false);
}
/**
* Behaves like {@link #offer(int)} but in safe manner: there are no any concurrent threads and buffer in
* closed state.
*
* @param size Amount of bytes for reserve.
* @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
* {@code null} if buffer space is not enough.
*/
public WriteSegment offerSafe(int size) {
return offer0(size, true);
}
/**
* @param size Amount of bytes for reserve.
* @param safe Safe mode.
*/
private WriteSegment offer0(int size, boolean safe) {
if (size > cap)
throw new IllegalArgumentException("Record is too long [capacity=" + cap + ", size=" + size + ']');
for (;;) {
if (!waitForConsumer) {
int cur = producersCnt;
if (cur >= 0 && PRODUCERS_CNT_UPD.compareAndSet(this, cur, cur + 1))
break;
}
}
for (;;) {
long currTail = tail;
assert !safe || currTail < 0 : "Unsafe usage of segment ring byte buffer currTail=" + currTail;
if (currTail < 0) {
if (safe)
currTail &= SegmentedRingByteBuffer.OPEN_MASK;
else
return new WriteSegment(null, -1);
}
long head0 = head;
long currTailIdx = toIndex(currTail);
boolean fitsSeg = currTail + size <= maxSegmentSize;
long newTail = fitsSeg ? currTail + size : currTail;
if (head0 < newTail - cap) { // Not enough space.
PRODUCERS_CNT_UPD.decrementAndGet(this);
return null;
}
else {
// If safe we should keep buffer closed.
long tail0 = fitsSeg ? (safe ? newTail | CLOSE_MASK : newTail) : newTail | CLOSE_MASK;
boolean upd = TAIL_UPD.compareAndSet(this, safe ? currTail | CLOSE_MASK : currTail, tail0);
assert !safe || upd : "Unsafe usage of segment ring byte buffer";
if (upd) {
if (!fitsSeg)
return new WriteSegment(null, -1);
boolean wrap = cap - currTailIdx < size;
if (wrap) {
long newTailIdx = toIndex(newTail);
return new WriteSegment(currTail, newTail, newTailIdx == 0 ? newTail : currTail);
}
else {
ByteBuffer slice = slice((int)toIndex(newTail - size), size, false);
return new WriteSegment(slice, newTail);
}
}
}
}
}
/**
* Closes the buffer.
*/
public void close() {
for (;;) {
long currTail = tail;
if (currTail < 0)
return;
if(TAIL_UPD.compareAndSet(this, currTail, currTail | CLOSE_MASK))
return;
}
}
/**
* Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains all data available
* for reading from {@link SegmentedRingByteBuffer} or {@code null} if there are no available data for reading.
* <p>
* This method can be invoked only by one consumer thread.
* <p>
* Once the data has been read from the returned {@link ReadSegment} client code must notify
* {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
* space in the {@link SegmentedRingByteBuffer} and make it available for writing.
*
* @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
* there are no available data.
*/
public List<ReadSegment> poll() {
return poll(-1);
}
/**
* Retrieves list of {@link ReadSegment} instances that point to {@link ByteBuffer} that contains data
* available for reading from {@link SegmentedRingByteBuffer} limited by {@code pos} parameter or {@code null}
* if there are no available data for reading.
* <p>
* This method can be invoked only by one consumer thread.
* <p>
* Once the data has been read from the returned {@link ReadSegment} client code must notify
* {@link SegmentedRingByteBuffer} instance using {@link ReadSegment#release()} method in order to release occupied
* space in the {@link SegmentedRingByteBuffer} and make it available for writing.
*
* @param pos End position in buffer.
* @return List of {@code ReadSegment} instances with all available data for reading or {@code null} if
* there are no available data.
*/
public List<ReadSegment> poll(long pos) {
waitForConsumer = true;
int spins = 0;
for (;;) {
if (PRODUCERS_CNT_UPD.compareAndSet(this, 0, -1))
break;
spins++;
}
if (metrics != null && metrics.metricsEnabled())
metrics.onBuffPollSpin(spins);
long head = this.head;
long tail = this.tail & OPEN_MASK;
producersCnt = 0;
waitForConsumer = false;
// There are no data for reading or all data up to given position were read.
if (tail <= head || (pos >=0 && head > pos))
return null;
int headIdx = (int)toIndex(head);
int tailIdx = (int)toIndex(tail);
boolean wrapped = tailIdx <= headIdx;
if (wrapped && tailIdx != 0) {
List<ReadSegment> lst = new ArrayList<>(2);
int lim = cap - headIdx;
lst.add(new ReadSegment(slice(headIdx, lim, true), head, head + lim));
lst.add(new ReadSegment(slice(0, tailIdx, true), head + lim, tail));
return lst;
}
else
return Collections.singletonList(new ReadSegment(slice(headIdx, (int)(tail - head), true), head, tail));
}
/**
* Frees allocated memory in case of direct byte buffer.
*/
public void free() {
if (mode == DIRECT || mode == MAPPED)
GridUnsafe.cleanDirectBuffer(buf);
}
/**
* Resets the state of the buffer and returns new instance but with the same underlying buffer.
*/
public SegmentedRingByteBuffer reset() {
return new SegmentedRingByteBuffer(buf.capacity(), maxSegmentSize, buf, mode, metrics);
}
/**
* @param off Offset.
* @param len Length.
* @param readOnly Read only.
*/
private ByteBuffer slice(int off, int len, boolean readOnly) {
ByteBuffer bb = readOnly ? buf.asReadOnlyBuffer() : buf.duplicate();
bb.order(ByteOrder.nativeOrder());
bb.limit(off + len);
bb.position(off);
return bb;
}
/**
* @param globalIdx Global index of ring buffer.
* @return Index of byte array.
*/
private long toIndex(long globalIdx) {
return globalIdx % cap;
}
/**
* @param src Source.
* @param srcPos Source pos.
* @param dest Destination.
* @param destPos Destination pos.
* @param len Length.
*/
private void copy(ByteBuffer src, int srcPos, ByteBuffer dest, int destPos, int len) {
assert mode != MAPPED;
if (buf.isDirect()) {
ByteBuffer src0 = src.duplicate();
src0.limit(srcPos + len);
src0.position(srcPos);
ByteBuffer dest0 = dest.duplicate();
dest0.limit(destPos + len);
dest0.position(destPos);
dest0.put(src0);
}
else
System.arraycopy(src.array(), srcPos, buf.array(), destPos, len);
}
/**
*
*/
private abstract class Segment {
/** Buffer. */
protected final ByteBuffer seg;
/** Pos. */
protected final long pos;
/**
* @param seg Seg.
* @param pos Pos.
*/
protected Segment(ByteBuffer seg, long pos) {
this.seg = seg;
this.pos = pos;
}
/**
* Releases segment.
*/
abstract public void release();
/**
* Returns byte buffer.
*
* @return Byte buffer.
*/
abstract public ByteBuffer buffer();
/**
* Returns position.
*
* @return Position.
*/
public long position() {
return pos;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Segment.class, this);
}
}
/**
* Segment available for data writing.
*/
public class WriteSegment extends Segment {
/** Current tail. */
private final long currTail;
/** Wrap point. */
private final long wrapPnt;
/**
* @param currTail Current tail.
* @param newTail New tail.
* @param wrapPnt Wrap point.
*/
private WriteSegment(long currTail, long newTail, long wrapPnt) {
super(allocate((int)(newTail - currTail)), newTail);
this.seg.order(ByteOrder.nativeOrder());
this.currTail = currTail;
this.wrapPnt = wrapPnt;
}
/**
* @param seg Seg.
* @param pos Pos.
*/
private WriteSegment(ByteBuffer seg, long pos) {
super(seg, pos);
this.currTail = -1;
this.wrapPnt = -1;
}
/** {@inheritDoc} */
@Override public ByteBuffer buffer() {
return seg;
}
/** {@inheritDoc} */
@Override public void release() {
if (wrapPnt > -1) {
int pos = (int)toIndex(currTail);
int len = cap - pos;
copy(seg, 0, buf, pos, len);
copy(seg, len, buf, 0, seg.array().length - len);
}
assert producersCnt >= 0;
PRODUCERS_CNT_UPD.decrementAndGet(SegmentedRingByteBuffer.this);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(WriteSegment.class, this, "super", super.toString());
}
}
/**
* Segment available for data reading.
*/
public class ReadSegment extends Segment {
/** New head. */
private final long newHead;
/**
* @param seg Seg.
* @param pos Pos.
* @param newHead New head.
*/
private ReadSegment(ByteBuffer seg, long pos, long newHead) {
super(seg, pos);
this.newHead = newHead;
}
/** {@inheritDoc} */
@Override public void release() {
if (newHead >= 0)
head = newHead;
}
/** {@inheritDoc} */
@Override public ByteBuffer buffer() {
return seg;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ReadSegment.class, this, "super", super.toString());
}
}
/**
* Buffer mode.
*/
public enum BufferMode {
/** Byte buffer on-heap. */
ONHEAP,
/** Direct byte buffer off-heap */
DIRECT,
/** Byte buffer mapped to file. */
MAPPED
}
}