blob: aa7785cf7e09efdd8ab3cc3529ce00541e2ca5f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.cdc;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
/**
* Buffer that stores WAL records before {@link CdcWorker} consumes it. Buffer is a single-producer single-consumer bounded queue.
* <p>
* TODO: Optimize the queue:
* 1. by space using LinkedArrayQueue. Example: http://psy-lob-saw.blogspot.com/2016/10/linked-array-queues-part-1-spsc.html.
* It helps to avoid using AtomicLong for #size.
* 2. by performance using AtomicReference#lazySet or similar for LinkedNode#next.
*/
public class CdcBuffer {
/** Maximum size of the underlying buffer, bytes. */
private final long maxSize;
/** Reference to last consumed linked node. */
private LinkedNode consumerNode;
/** Reference to last produced linked node. */
private volatile LinkedNode producerNode;
/** Current size of the buffer, bytes. */
private final AtomicLong size = new AtomicLong();
/** If {@code true} then buffer has overflowed. */
private volatile boolean overflowed;
/** */
public CdcBuffer(long maxSize) {
assert maxSize > 0 : maxSize;
this.maxSize = maxSize;
producerNode = consumerNode = new LinkedNode(null);
}
/**
* Offers data for the queue. Performs by the single producer thread.
*
* @param data Data to store in the buffer.
*/
public boolean offer(ByteBuffer data) {
int bufSize = data.limit() - data.position();
if (bufSize == 0)
return true;
if (size.addAndGet(bufSize) > maxSize) {
overflowed = true;
return false;
}
byte[] cp = new byte[bufSize];
data.get(cp, 0, bufSize);
ByteBuffer dataBuf = ByteBuffer
.wrap(cp)
.order(data.order());
LinkedNode newNode = new LinkedNode(dataBuf);
LinkedNode oldNode = producerNode;
producerNode = newNode;
oldNode.next(newNode);
return true;
}
/**
* Polls data from the queue. Performs by single consumer thread.
*
* @return Polled data, or {@code null} if no data is available now.
*/
public ByteBuffer poll() {
LinkedNode prev = consumerNode;
LinkedNode next = prev.next;
if (next != null)
return poll(prev, next);
else if (prev != producerNode) {
while ((next = prev.next) == null) {
// No-op. New reference should appear soon.
}
return poll(prev, next);
}
return null;
}
/** @return Current buffer size. */
public long size() {
return size.get();
}
/**
* @return {@code True} if buffer is overflowed.
*/
public boolean overflowed() {
return overflowed;
}
/**
* Cleans the buffer. Must be performed by the consumer thread.
*/
public void clean() {
if (consumerNode == null)
return;
ByteBuffer data;
do {
data = poll();
}
while (data != null);
consumerNode = null;
producerNode = null;
overflowed = false;
size.set(0);
}
/**
* @param prev Previously consumed linked node.
* @param next Node to consume.
* @return Data to consume.
*/
private ByteBuffer poll(LinkedNode prev, LinkedNode next) {
ByteBuffer data = next.data;
prev.next = null;
consumerNode = next;
size.addAndGet(-(data.limit() - data.position()));
return data;
}
/** */
private static class LinkedNode {
/** */
private volatile @Nullable LinkedNode next;
/** */
private final ByteBuffer data;
/** */
LinkedNode(ByteBuffer data) {
this.data = data;
}
/** */
void next(LinkedNode next) {
this.next = next;
}
}
}